From 7a532e1c4b70c7cf658034d72ad7c45613f42913 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Thu, 25 Jun 2026 21:00:36 +0900 Subject: [PATCH] logging: write container output synchronously so final output is not truncated Logging tests such as TestLogsWithoutNewlineOrEOF and TestLogsAfterRestartingContainer flake under gomodjail: a container's final output, in particular a line emitted right before exit with no trailing newline, is sometimes dropped from the log. Running nerdctl run --name c alpine printf "'Hello World!\nThere is no newline'" nerdctl logs -f c would intermittently print only "'Hello World!" instead of the full output. Two independent problems, both reproduced locally under gomodjail: 1. getContainerWait hung for short-lived containers. The logging process is started by containerd while it sets up the container's IO, before the task is created, so the first con.Task() returns NotFound and the code retried forever "waiting for the task to start". For a fast container the task can instead have already exited and been removed before the logger ever sees it, so it never appeared and the logger blocked forever holding the logger lock. It now concludes the container has exited once it is missing and the container has been observed producing output. 2. The container's final chunk was lost to teardown. On exit containerd closes the stdio FIFOs and tears the logging process down almost immediately. The old path read the FIFO, copied it through an io.Pipe and a bufio splitter, and handed each line to the driver over a buffered channel; a trailing chunk with no newline was held in the splitter until EOF and then raced the teardown across several goroutines, so it was frequently lost. The logger now reads each FIFO directly and, for drivers that can write synchronously (json-file, via the new SyncDriver interface), writes each entry inline from the reading goroutine and flushes a trailing no-newline fragment as soon as it is read. Streaming drivers keep using the buffered channel so a slow driver cannot block the container. The viewer also does a final read of the JSON log file when it receives the stop signal, so entries flushed just before exit are not missed. Verified locally with the gomodjail-packed binary: 250+ iterations of the failing printf case, the restart (doubled-output) case, multi-line output and follow-on-running-container all pass with no truncation. Fixes https://github.com/containerd/nerdctl/issues/5006 Assisted-by: Claude Opus 4.8 Signed-off-by: Akihiro Suda --- pkg/logging/json_logger.go | 27 +++- pkg/logging/json_logger_test.go | 40 ++++++ pkg/logging/jsonfile/jsonfile.go | 24 ++++ pkg/logging/logging.go | 203 +++++++++++++++++++++++-------- pkg/logging/logging_test.go | 130 +++++++++++++++++++- 5 files changed, 371 insertions(+), 53 deletions(-) diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 7e2dca196fd..02862965046 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -48,8 +48,9 @@ var JSONDriverLogOpts = []string{ } type JSONLogger struct { - Opts map[string]string - logger *logrotate.Logger + Opts map[string]string + logger *logrotate.Logger + encoder *jsonfile.SyncEncoder } func JSONFileLogOptsValidate(logOptMap map[string]string) error { @@ -119,6 +120,7 @@ func (jsonLogger *JSONLogger) PreProcess(ctx context.Context, dataStore string, // MaxBackups does not include file to write logs to l.MaxBackups = maxFile - 1 jsonLogger.logger = l + jsonLogger.encoder = jsonfile.NewSyncEncoder(l) return nil } @@ -126,6 +128,14 @@ func (jsonLogger *JSONLogger) Process(stdout <-chan string, stderr <-chan string return jsonfile.Encode(stdout, stderr, jsonLogger.logger) } +// WriteLogEntry writes a single log line synchronously, implementing SyncDriver. +// Writing inline (rather than over a channel consumed by Process) ensures a +// container's final output is durable before containerd tears the logging +// process down on exit. https://github.com/containerd/nerdctl/issues/5006 +func (jsonLogger *JSONLogger) WriteLogEntry(stream, line string) error { + return jsonLogger.encoder.Encode(stream, line) +} + func (jsonLogger *JSONLogger) PostProcess() error { return nil } @@ -182,7 +192,18 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou for { select { case <-stopChannel: - log.L.Debug("received stop signal while re-reading JSON logfile, returning") + log.L.Debug("received stop signal while re-reading JSON logfile, draining remaining logs and returning") + // The stop signal is only sent after WaitForLogger has confirmed that the + // logger finished writing (see pkg/cmd/container.Logs). However, the + // watcher-driven read loop may not have consumed the final entries yet: + // they may have been flushed to the file while we were blocked in + // startTail, and the stop signal wins the next select iteration before + // they are read. Do a final read so that we don't drop log content that + // was written right before the container exited. + // https://github.com/containerd/nerdctl/issues/5006 + if _, err := jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until); err != nil { + log.L.WithError(err).Debugf("error draining remaining logs from JSON logfile %q", jsonLogFilePath) + } return nil default: if stop || (limitedMode && limitedNum == 0) { diff --git a/pkg/logging/json_logger_test.go b/pkg/logging/json_logger_test.go index 7b41c10a68c..7ecf183b851 100644 --- a/pkg/logging/json_logger_test.go +++ b/pkg/logging/json_logger_test.go @@ -109,6 +109,46 @@ func TestReadRotatedJSONLog(t *testing.T) { } } +// TestReadJSONLogsDrainsOnStop verifies that when the stop signal is received +// while following a log file, any log entries that were flushed but not yet +// read are still drained and written out before returning. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestReadJSONLogsDrainsOnStop(t *testing.T) { + file, err := os.CreateTemp("", "TestDrainOnStop") + if err != nil { + t.Fatalf("unable to create temp file") + } + defer os.Remove(file.Name()) + // A final entry without a trailing newline in the log text, mirroring the + // scenario in the linked issue. + file.WriteString(`{"log":"Hello World!\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"There is no newline","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + + // Pre-load the stop signal so the first select iteration takes the stop + // branch while the file still has unread content. This deterministically + // reproduces the race where the stop signal wins before the final entries + // are read. + stopChan := make(chan os.Signal, 1) + stopChan <- os.Interrupt + + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + lvOpts := LogViewOptions{ + LogPath: file.Name(), + Follow: true, + } + if err := viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, stopChan); err != nil { + t.Fatal(err.Error()) + } + if stderrBuf.Len() > 0 { + t.Fatalf("Stderr: %v", stderrBuf.String()) + } + const expected = "Hello World!\nThere is no newline" + if actual := stdoutBuf.String(); expected != actual { + t.Fatalf("Actual output does not match expected.\nActual: %q\nExpected: %q\n", actual, expected) + } +} + func TestReadJSONLogs(t *testing.T) { file, err := os.CreateTemp("", "TestFollowLogs") if err != nil { diff --git a/pkg/logging/jsonfile/jsonfile.go b/pkg/logging/jsonfile/jsonfile.go index 22afc6f3442..e2c2829b70c 100644 --- a/pkg/logging/jsonfile/jsonfile.go +++ b/pkg/logging/jsonfile/jsonfile.go @@ -43,6 +43,30 @@ func Path(dataStore, ns, id string) string { return filepath.Join(dataStore, "containers", ns, id, id+"-json.log") } +// SyncEncoder writes individual json-file log entries to a writer. Its Encode +// method is safe for concurrent use, so it can be shared between the goroutines +// reading a container's stdout and stderr. +type SyncEncoder struct { + mu sync.Mutex + enc *json.Encoder +} + +// NewSyncEncoder returns a SyncEncoder that writes to w. +func NewSyncEncoder(w io.Writer) *SyncEncoder { + return &SyncEncoder{enc: json.NewEncoder(w)} +} + +// Encode writes a single log entry for the given stream. +func (s *SyncEncoder) Encode(stream, line string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.enc.Encode(&Entry{ + Stream: stream, + Log: line, + Time: time.Now().UTC(), + }) +} + func Encode(stdout <-chan string, stderr <-chan string, writer io.Writer) error { enc := json.NewEncoder(writer) var encMu sync.Mutex diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..a93594bd8c0 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -17,7 +17,7 @@ package logging import ( - "bufio" + "bytes" "context" "encoding/json" "errors" @@ -28,6 +28,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/fsnotify/fsnotify" @@ -52,6 +53,11 @@ const ( Labels = "labels" ) +const ( + streamStdout = "stdout" + streamStderr = "stderr" +) + type Driver interface { Init(dataStore, ns, id string) error PreProcess(ctx context.Context, dataStore string, config *logging.Config) error @@ -59,6 +65,22 @@ type Driver interface { PostProcess() error } +// SyncDriver is an optional capability for a Driver whose log entries can be +// written synchronously and cheaply (e.g. to a local file). When a driver +// implements it, the logger writes each entry by calling WriteLogEntry directly +// from the goroutine that reads the container's stdio, rather than handing it to +// Process over a buffered channel. This makes a container's final output (in +// particular a trailing chunk with no newline) durable before containerd tears +// the logging process down on exit. Drivers that may block, such as +// network-backed ones, should not implement it so that the buffered channel +// keeps them from blocking the container. https://github.com/containerd/nerdctl/issues/5006 +type SyncDriver interface { + Driver + // WriteLogEntry writes a single log line for the given stream ("stdout" or + // "stderr"). It may be called concurrently for different streams. + WriteLogEntry(stream, line string) error +} + type DriverFactory func(map[string]string, string) (Driver, error) type LogOptsValidateFunc func(logOptMap map[string]string) error @@ -165,7 +187,15 @@ func WaitForLogger(dataStore, ns, id string) error { }) } -func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { +// alreadyExited returns a channel that immediately reports an exit, used when +// we have determined that the container's task is already gone. +func alreadyExited() <-chan containerd.ExitStatus { + ch := make(chan containerd.ExitStatus, 1) + ch <- containerd.ExitStatus{} + return ch +} + +func getContainerWait(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace)) if err != nil { return nil, err @@ -183,8 +213,19 @@ func getContainerWait(ctx context.Context, address string, config *logging.Confi return nil, err } - // If task was not found, it's possible that the container runtime is still being created. - // Retry every 100ms. + // The task was not found. containerd starts this logging process while + // setting up the container's IO, i.e. before the task is created, so a + // NotFound here usually just means the task has not been created yet: retry + // until it appears. + // + // However, for a short-lived container the task may instead have already + // exited and been removed before we ever observed it (this is more likely + // when this logger process is slow to start, e.g. under gomodjail). In that + // case the task will never appear and waiting for it would hang the logger + // forever, holding the logger lock and truncating the container's final + // output. Once we have seen the container produce output we therefore know + // it has run, so a still-missing task means it has already exited. + // https://github.com/containerd/nerdctl/issues/5006 ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() @@ -194,18 +235,20 @@ func getContainerWait(ctx context.Context, address string, config *logging.Confi return nil, errors.New("timed out waiting for container task to start") case <-ticker.C: task, err = con.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - continue - } + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { return nil, err } - return task.Wait(ctx) + if outputSeen() { + return alreadyExited(), nil + } } } } -type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) +type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { @@ -226,60 +269,122 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres stderrR.Cancel() }() - // initialize goroutines to copy stdout and stderr streams to a closable pipe - pipeStdoutR, pipeStdoutW := io.Pipe() - pipeStderrR, pipeStderrW := io.Pipe() - copyStream := func(reader io.Reader, writer *io.PipeWriter) { - // copy using a buffer of size 32K - buf := make([]byte, 32<<10) - _, err := io.CopyBuffer(writer, reader, buf) - if err != nil { - log.G(ctx).Errorf("failed to copy stream: %s", err) - } - } - go copyStream(stdoutR, pipeStdoutW) - go copyStream(stderrR, pipeStderrW) + // copiedBytes counts how much container output has been read so far. It lets + // getContainerWait tell "the task has not been created yet" apart from "the + // task has already exited and been removed" when it sees a missing task. + var copiedBytes atomic.Int64 + outputSeen := func() bool { return copiedBytes.Load() > 0 } - var wg sync.WaitGroup - wg.Add(3) stdout := make(chan string, 10000) stderr := make(chan string, 10000) - processLogFunc := func(reader io.Reader, dataChan chan string) { - defer wg.Done() - defer close(dataChan) - r := bufio.NewReader(reader) - - var err error - for err == nil { - var s string - s, err = r.ReadString('\n') - if len(s) > 0 { - dataChan <- s + // If the driver can write synchronously, emit writes each log entry directly + // from the goroutine that reads the container's stdio. Otherwise it hands the + // entry to the driver's Process method over a buffered channel, which keeps a + // slow (e.g. network-backed) driver from blocking the container. + // + // The synchronous path matters because, when a container exits, containerd + // closes the stdio FIFOs and then tears the logging process down almost + // immediately. Handing the final chunk to another goroutine to write races + // that teardown and can lose a trailing chunk that has no newline; writing it + // inline does not. https://github.com/containerd/nerdctl/issues/5006 + syncDriver, isSync := driver.(SyncDriver) + emit := func(stream, line string) { + if isSync { + if err := syncDriver.WriteLogEntry(stream, line); err != nil { + log.G(ctx).WithError(err).Error("failed to write log entry") } + return + } + if stream == streamStdout { + stdout <- line + } else { + stderr <- line + } + } + + var wg sync.WaitGroup - if err != nil && err != io.EOF { - log.L.WithError(err).Error("failed to read log") + // processStream reads a container stdio FIFO directly and emits its output + // split into newline-terminated lines. Complete lines are emitted as they are + // read; a trailing fragment without a newline is buffered until more output + // arrives (so a long line is not split) and emitted when the stream ends. + processStream := func(stream string, reader io.Reader, dataChan chan string) { + defer wg.Done() + if !isSync { + defer close(dataChan) + } + buf := make([]byte, 32<<10) + var pending []byte + // emitLines emits each complete (newline-terminated) line, leaving any + // trailing fragment buffered in pending so that a single logical line is + // not split across log entries. + emitLines := func() { + for { + i := bytes.IndexByte(pending, '\n') + if i < 0 { + break + } + emit(stream, string(pending[:i+1])) + pending = pending[i+1:] + } + } + for { + nr, err := reader.Read(buf) + if nr > 0 { + copiedBytes.Add(int64(nr)) + pending = append(pending, buf[:nr]...) + emitLines() + // For a synchronous driver, emit a trailing fragment immediately + // instead of buffering it until the stream ends. The fragment is + // then written to the log before the container's abrupt teardown + // on exit can lose it; for a streaming (channel) driver this would + // only split lines, so it is left buffered there. + if isSync && len(pending) > 0 { + emit(stream, string(pending)) + pending = pending[:0] + } + } + if err != nil { + emitLines() + // The stream has ended: emit any final fragment that did not end + // in a newline. + if len(pending) > 0 { + emit(stream, string(pending)) + } + if !errors.Is(err, io.EOF) && !errors.Is(err, cancelreader.ErrCanceled) { + log.L.WithError(err).Error("failed to read log") + } + return } } } - go processLogFunc(pipeStdoutR, stdout) - go processLogFunc(pipeStderrR, stderr) - go func() { - defer wg.Done() - driver.Process(stdout, stderr) - }() + wg.Add(2) + go processStream(streamStdout, stdoutR, stdout) + go processStream(streamStderr, stderrR, stderr) + if !isSync { + wg.Add(1) + go func() { + defer wg.Done() + driver.Process(stdout, stderr) + }() + } go func() { - // close pipeStdoutW and pipeStderrW upon container exit - defer pipeStdoutW.Close() - defer pipeStderrW.Close() - - exitCh, err := getContainerWait(ctx, address, config) + // Wait for the container to exit, then cancel the readers. containerd + // keeps the stdio FIFO write ends open (so the container can be + // restarted), so the FIFOs may not reach EOF on exit; without this the + // read goroutines, and therefore the logger, could block forever. + exitCh, err := getContainerWait(ctx, address, config, outputSeen) if err != nil { + // We could not determine when the container exits. Do not cancel the + // readers: they will finish on their own when the FIFO reaches EOF. + // Cancelling here could truncate a still-running container. log.G(ctx).Errorf("failed to get container task wait channel: %v", err) return } <-exitCh + stdoutR.Cancel() + stderrR.Cancel() }() wg.Wait() return driver.PostProcess() diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index da0d535b074..ecab183bebc 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -21,7 +21,9 @@ import ( "bytes" "context" "math/rand" + "os" "strings" + "sync" "testing" "time" @@ -58,6 +60,34 @@ func (m *MockDriver) PostProcess() error { return nil } +// SyncMockDriver implements SyncDriver, recording the entries written to it. +type SyncMockDriver struct { + mu sync.Mutex + receivedStdout []string + receivedStderr []string +} + +func (m *SyncMockDriver) Init(dataStore, ns, id string) error { return nil } +func (m *SyncMockDriver) PreProcess(ctx context.Context, dataStore string, config *logging.Config) error { + return nil +} +func (m *SyncMockDriver) Process(stdout <-chan string, stderr <-chan string) error { + // Not used on the synchronous path (the logger calls WriteLogEntry instead), + // but must satisfy the Driver interface. + return nil +} +func (m *SyncMockDriver) PostProcess() error { return nil } +func (m *SyncMockDriver) WriteLogEntry(stream, line string) error { + m.mu.Lock() + defer m.mu.Unlock() + if stream == streamStdout { + m.receivedStdout = append(m.receivedStdout, line) + } else { + m.receivedStderr = append(m.receivedStderr, line) + } + return nil +} + func TestLoggingProcessAdapter(t *testing.T) { // Will process a normal String to stdout and a bigger one to stderr normalString := generateRandomString(1024) @@ -79,7 +109,7 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { exitChan := make(chan containerd.ExitStatus, 1) time.Sleep(50 * time.Millisecond) exitChan <- containerd.ExitStatus{} @@ -112,6 +142,104 @@ func TestLoggingProcessAdapter(t *testing.T) { } } +// TestLoggingProcessAdapterTrailingChunk verifies that the logger forwards all +// of the container's output, including a final chunk that has no trailing +// newline, rather than holding that chunk back until something closes the +// stream. The container's stdio FIFOs are modelled with os.Pipe; closing the +// write end models the container exiting and containerd closing the FIFO. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestLoggingProcessAdapterTrailingChunk(t *testing.T) { + const expected = "'Hello World!\nThere is no newline'" + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stdoutR.Close() + stderrR, stderrW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stderrR.Close() + + driver := &MockDriver{} + config := &logging.Config{ + Stdout: stdoutR, + Stderr: stderrR, + } + + // Write the container's output, including a trailing chunk without a newline, + // then close the write ends to model the container exiting. + if _, err := stdoutW.WriteString(expected); err != nil { + t.Fatal(err) + } + stdoutW.Close() + stderrW.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // getContainerWait never reports an exit here: completion is driven by the + // FIFOs reaching EOF, as it usually is in practice. + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { + return make(chan containerd.ExitStatus), nil + } + + if err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config); err != nil { + t.Fatal(err) + } + + if actual := strings.Join(driver.receivedStdout, ""); actual != expected { + t.Fatalf("stdout is %q, expected %q", actual, expected) + } +} + +// TestLoggingProcessAdapterSyncTrailingChunk verifies the same trailing-chunk +// behaviour for a driver that writes synchronously (SyncDriver), which is the +// path that protects the final chunk from the container's abrupt teardown. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestLoggingProcessAdapterSyncTrailingChunk(t *testing.T) { + const expected = "'Hello World!\nThere is no newline'" + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stdoutR.Close() + stderrR, stderrW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stderrR.Close() + + driver := &SyncMockDriver{} + config := &logging.Config{ + Stdout: stdoutR, + Stderr: stderrR, + } + + if _, err := stdoutW.WriteString(expected); err != nil { + t.Fatal(err) + } + stdoutW.Close() + stderrW.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { + return make(chan containerd.ExitStatus), nil + } + + if err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config); err != nil { + t.Fatal(err) + } + + if actual := strings.Join(driver.receivedStdout, ""); actual != expected { + t.Fatalf("stdout is %q, expected %q", actual, expected) + } +} + // generateRandomString creates a random string of the given size. func generateRandomString(size int) string { characters := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"