Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions pkg/logging/json_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ var JSONDriverLogOpts = []string{
}

type JSONLogger struct {
Opts map[string]string
logger *logrotate.Logger
Opts map[string]string
logger *logrotate.Logger
encoder *jsonfile.SyncEncoder
}

func JSONFileLogOptsValidate(logOptMap map[string]string) error {
Expand Down Expand Up @@ -119,13 +120,22 @@ func (jsonLogger *JSONLogger) PreProcess(ctx context.Context, dataStore string,
// MaxBackups does not include file to write logs to
l.MaxBackups = maxFile - 1
jsonLogger.logger = l
jsonLogger.encoder = jsonfile.NewSyncEncoder(l)
return nil
}

func (jsonLogger *JSONLogger) Process(stdout <-chan string, stderr <-chan string) error {
return jsonfile.Encode(stdout, stderr, jsonLogger.logger)
}

// WriteLogEntry writes a single log line synchronously, implementing SyncDriver.
// Writing inline (rather than over a channel consumed by Process) ensures a
// container's final output is durable before containerd tears the logging
// process down on exit. https://github.com/containerd/nerdctl/issues/5006
func (jsonLogger *JSONLogger) WriteLogEntry(stream, line string) error {
return jsonLogger.encoder.Encode(stream, line)
}

func (jsonLogger *JSONLogger) PostProcess() error {
return nil
}
Expand Down Expand Up @@ -182,7 +192,18 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou
for {
select {
case <-stopChannel:
log.L.Debug("received stop signal while re-reading JSON logfile, returning")
log.L.Debug("received stop signal while re-reading JSON logfile, draining remaining logs and returning")
// The stop signal is only sent after WaitForLogger has confirmed that the
// logger finished writing (see pkg/cmd/container.Logs). However, the
// watcher-driven read loop may not have consumed the final entries yet:
// they may have been flushed to the file while we were blocked in
// startTail, and the stop signal wins the next select iteration before
// they are read. Do a final read so that we don't drop log content that
// was written right before the container exited.
// https://github.com/containerd/nerdctl/issues/5006
if _, err := jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until); err != nil {
log.L.WithError(err).Debugf("error draining remaining logs from JSON logfile %q", jsonLogFilePath)
}
return nil
default:
if stop || (limitedMode && limitedNum == 0) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/logging/json_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,46 @@ func TestReadRotatedJSONLog(t *testing.T) {
}
}

// TestReadJSONLogsDrainsOnStop verifies that when the stop signal is received
// while following a log file, any log entries that were flushed but not yet
// read are still drained and written out before returning.
// Regression test for https://github.com/containerd/nerdctl/issues/5006
func TestReadJSONLogsDrainsOnStop(t *testing.T) {
file, err := os.CreateTemp("", "TestDrainOnStop")
if err != nil {
t.Fatalf("unable to create temp file")
}
defer os.Remove(file.Name())
// A final entry without a trailing newline in the log text, mirroring the
// scenario in the linked issue.
file.WriteString(`{"log":"Hello World!\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")
file.WriteString(`{"log":"There is no newline","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n")

// Pre-load the stop signal so the first select iteration takes the stop
// branch while the file still has unread content. This deterministically
// reproduces the race where the stop signal wins before the final entries
// are read.
stopChan := make(chan os.Signal, 1)
stopChan <- os.Interrupt

stdoutBuf := bytes.NewBuffer(nil)
stderrBuf := bytes.NewBuffer(nil)
lvOpts := LogViewOptions{
LogPath: file.Name(),
Follow: true,
}
if err := viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, stopChan); err != nil {
t.Fatal(err.Error())
}
if stderrBuf.Len() > 0 {
t.Fatalf("Stderr: %v", stderrBuf.String())
}
const expected = "Hello World!\nThere is no newline"
if actual := stdoutBuf.String(); expected != actual {
t.Fatalf("Actual output does not match expected.\nActual: %q\nExpected: %q\n", actual, expected)
}
}

func TestReadJSONLogs(t *testing.T) {
file, err := os.CreateTemp("", "TestFollowLogs")
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions pkg/logging/jsonfile/jsonfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ func Path(dataStore, ns, id string) string {
return filepath.Join(dataStore, "containers", ns, id, id+"-json.log")
}

// SyncEncoder writes individual json-file log entries to a writer. Its Encode
// method is safe for concurrent use, so it can be shared between the goroutines
// reading a container's stdout and stderr.
type SyncEncoder struct {
mu sync.Mutex
enc *json.Encoder
}

// NewSyncEncoder returns a SyncEncoder that writes to w.
func NewSyncEncoder(w io.Writer) *SyncEncoder {
return &SyncEncoder{enc: json.NewEncoder(w)}
}

// Encode writes a single log entry for the given stream.
func (s *SyncEncoder) Encode(stream, line string) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.enc.Encode(&Entry{
Stream: stream,
Log: line,
Time: time.Now().UTC(),
})
}

func Encode(stdout <-chan string, stderr <-chan string, writer io.Writer) error {
enc := json.NewEncoder(writer)
var encMu sync.Mutex
Expand Down
Loading
Loading