diff --git a/pkg/logging/json_logger.go b/pkg/logging/json_logger.go index 7e2dca196fd..02862965046 100644 --- a/pkg/logging/json_logger.go +++ b/pkg/logging/json_logger.go @@ -48,8 +48,9 @@ var JSONDriverLogOpts = []string{ } type JSONLogger struct { - Opts map[string]string - logger *logrotate.Logger + Opts map[string]string + logger *logrotate.Logger + encoder *jsonfile.SyncEncoder } func JSONFileLogOptsValidate(logOptMap map[string]string) error { @@ -119,6 +120,7 @@ func (jsonLogger *JSONLogger) PreProcess(ctx context.Context, dataStore string, // MaxBackups does not include file to write logs to l.MaxBackups = maxFile - 1 jsonLogger.logger = l + jsonLogger.encoder = jsonfile.NewSyncEncoder(l) return nil } @@ -126,6 +128,14 @@ func (jsonLogger *JSONLogger) Process(stdout <-chan string, stderr <-chan string return jsonfile.Encode(stdout, stderr, jsonLogger.logger) } +// WriteLogEntry writes a single log line synchronously, implementing SyncDriver. +// Writing inline (rather than over a channel consumed by Process) ensures a +// container's final output is durable before containerd tears the logging +// process down on exit. https://github.com/containerd/nerdctl/issues/5006 +func (jsonLogger *JSONLogger) WriteLogEntry(stream, line string) error { + return jsonLogger.encoder.Encode(stream, line) +} + func (jsonLogger *JSONLogger) PostProcess() error { return nil } @@ -182,7 +192,18 @@ func viewLogsJSONFileDirect(lvopts LogViewOptions, jsonLogFilePath string, stdou for { select { case <-stopChannel: - log.L.Debug("received stop signal while re-reading JSON logfile, returning") + log.L.Debug("received stop signal while re-reading JSON logfile, draining remaining logs and returning") + // The stop signal is only sent after WaitForLogger has confirmed that the + // logger finished writing (see pkg/cmd/container.Logs). However, the + // watcher-driven read loop may not have consumed the final entries yet: + // they may have been flushed to the file while we were blocked in + // startTail, and the stop signal wins the next select iteration before + // they are read. Do a final read so that we don't drop log content that + // was written right before the container exited. + // https://github.com/containerd/nerdctl/issues/5006 + if _, err := jsonfile.Decode(stdout, stderr, fin, lvopts.Timestamps, lvopts.Since, lvopts.Until); err != nil { + log.L.WithError(err).Debugf("error draining remaining logs from JSON logfile %q", jsonLogFilePath) + } return nil default: if stop || (limitedMode && limitedNum == 0) { diff --git a/pkg/logging/json_logger_test.go b/pkg/logging/json_logger_test.go index 7b41c10a68c..7ecf183b851 100644 --- a/pkg/logging/json_logger_test.go +++ b/pkg/logging/json_logger_test.go @@ -109,6 +109,46 @@ func TestReadRotatedJSONLog(t *testing.T) { } } +// TestReadJSONLogsDrainsOnStop verifies that when the stop signal is received +// while following a log file, any log entries that were flushed but not yet +// read are still drained and written out before returning. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestReadJSONLogsDrainsOnStop(t *testing.T) { + file, err := os.CreateTemp("", "TestDrainOnStop") + if err != nil { + t.Fatalf("unable to create temp file") + } + defer os.Remove(file.Name()) + // A final entry without a trailing newline in the log text, mirroring the + // scenario in the linked issue. + file.WriteString(`{"log":"Hello World!\n","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + file.WriteString(`{"log":"There is no newline","stream":"stdout","time":"2024-07-12T03:09:24.916296732Z"}` + "\n") + + // Pre-load the stop signal so the first select iteration takes the stop + // branch while the file still has unread content. This deterministically + // reproduces the race where the stop signal wins before the final entries + // are read. + stopChan := make(chan os.Signal, 1) + stopChan <- os.Interrupt + + stdoutBuf := bytes.NewBuffer(nil) + stderrBuf := bytes.NewBuffer(nil) + lvOpts := LogViewOptions{ + LogPath: file.Name(), + Follow: true, + } + if err := viewLogsJSONFileDirect(lvOpts, file.Name(), stdoutBuf, stderrBuf, stopChan); err != nil { + t.Fatal(err.Error()) + } + if stderrBuf.Len() > 0 { + t.Fatalf("Stderr: %v", stderrBuf.String()) + } + const expected = "Hello World!\nThere is no newline" + if actual := stdoutBuf.String(); expected != actual { + t.Fatalf("Actual output does not match expected.\nActual: %q\nExpected: %q\n", actual, expected) + } +} + func TestReadJSONLogs(t *testing.T) { file, err := os.CreateTemp("", "TestFollowLogs") if err != nil { diff --git a/pkg/logging/jsonfile/jsonfile.go b/pkg/logging/jsonfile/jsonfile.go index 22afc6f3442..e2c2829b70c 100644 --- a/pkg/logging/jsonfile/jsonfile.go +++ b/pkg/logging/jsonfile/jsonfile.go @@ -43,6 +43,30 @@ func Path(dataStore, ns, id string) string { return filepath.Join(dataStore, "containers", ns, id, id+"-json.log") } +// SyncEncoder writes individual json-file log entries to a writer. Its Encode +// method is safe for concurrent use, so it can be shared between the goroutines +// reading a container's stdout and stderr. +type SyncEncoder struct { + mu sync.Mutex + enc *json.Encoder +} + +// NewSyncEncoder returns a SyncEncoder that writes to w. +func NewSyncEncoder(w io.Writer) *SyncEncoder { + return &SyncEncoder{enc: json.NewEncoder(w)} +} + +// Encode writes a single log entry for the given stream. +func (s *SyncEncoder) Encode(stream, line string) error { + s.mu.Lock() + defer s.mu.Unlock() + return s.enc.Encode(&Entry{ + Stream: stream, + Log: line, + Time: time.Now().UTC(), + }) +} + func Encode(stdout <-chan string, stderr <-chan string, writer io.Writer) error { enc := json.NewEncoder(writer) var encMu sync.Mutex diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..a93594bd8c0 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -17,7 +17,7 @@ package logging import ( - "bufio" + "bytes" "context" "encoding/json" "errors" @@ -28,6 +28,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/fsnotify/fsnotify" @@ -52,6 +53,11 @@ const ( Labels = "labels" ) +const ( + streamStdout = "stdout" + streamStderr = "stderr" +) + type Driver interface { Init(dataStore, ns, id string) error PreProcess(ctx context.Context, dataStore string, config *logging.Config) error @@ -59,6 +65,22 @@ type Driver interface { PostProcess() error } +// SyncDriver is an optional capability for a Driver whose log entries can be +// written synchronously and cheaply (e.g. to a local file). When a driver +// implements it, the logger writes each entry by calling WriteLogEntry directly +// from the goroutine that reads the container's stdio, rather than handing it to +// Process over a buffered channel. This makes a container's final output (in +// particular a trailing chunk with no newline) durable before containerd tears +// the logging process down on exit. Drivers that may block, such as +// network-backed ones, should not implement it so that the buffered channel +// keeps them from blocking the container. https://github.com/containerd/nerdctl/issues/5006 +type SyncDriver interface { + Driver + // WriteLogEntry writes a single log line for the given stream ("stdout" or + // "stderr"). It may be called concurrently for different streams. + WriteLogEntry(stream, line string) error +} + type DriverFactory func(map[string]string, string) (Driver, error) type LogOptsValidateFunc func(logOptMap map[string]string) error @@ -165,7 +187,15 @@ func WaitForLogger(dataStore, ns, id string) error { }) } -func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { +// alreadyExited returns a channel that immediately reports an exit, used when +// we have determined that the container's task is already gone. +func alreadyExited() <-chan containerd.ExitStatus { + ch := make(chan containerd.ExitStatus, 1) + ch <- containerd.ExitStatus{} + return ch +} + +func getContainerWait(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace)) if err != nil { return nil, err @@ -183,8 +213,19 @@ func getContainerWait(ctx context.Context, address string, config *logging.Confi return nil, err } - // If task was not found, it's possible that the container runtime is still being created. - // Retry every 100ms. + // The task was not found. containerd starts this logging process while + // setting up the container's IO, i.e. before the task is created, so a + // NotFound here usually just means the task has not been created yet: retry + // until it appears. + // + // However, for a short-lived container the task may instead have already + // exited and been removed before we ever observed it (this is more likely + // when this logger process is slow to start, e.g. under gomodjail). In that + // case the task will never appear and waiting for it would hang the logger + // forever, holding the logger lock and truncating the container's final + // output. Once we have seen the container produce output we therefore know + // it has run, so a still-missing task means it has already exited. + // https://github.com/containerd/nerdctl/issues/5006 ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() @@ -194,18 +235,20 @@ func getContainerWait(ctx context.Context, address string, config *logging.Confi return nil, errors.New("timed out waiting for container task to start") case <-ticker.C: task, err = con.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - continue - } + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { return nil, err } - return task.Wait(ctx) + if outputSeen() { + return alreadyExited(), nil + } } } } -type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) +type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { @@ -226,60 +269,122 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres stderrR.Cancel() }() - // initialize goroutines to copy stdout and stderr streams to a closable pipe - pipeStdoutR, pipeStdoutW := io.Pipe() - pipeStderrR, pipeStderrW := io.Pipe() - copyStream := func(reader io.Reader, writer *io.PipeWriter) { - // copy using a buffer of size 32K - buf := make([]byte, 32<<10) - _, err := io.CopyBuffer(writer, reader, buf) - if err != nil { - log.G(ctx).Errorf("failed to copy stream: %s", err) - } - } - go copyStream(stdoutR, pipeStdoutW) - go copyStream(stderrR, pipeStderrW) + // copiedBytes counts how much container output has been read so far. It lets + // getContainerWait tell "the task has not been created yet" apart from "the + // task has already exited and been removed" when it sees a missing task. + var copiedBytes atomic.Int64 + outputSeen := func() bool { return copiedBytes.Load() > 0 } - var wg sync.WaitGroup - wg.Add(3) stdout := make(chan string, 10000) stderr := make(chan string, 10000) - processLogFunc := func(reader io.Reader, dataChan chan string) { - defer wg.Done() - defer close(dataChan) - r := bufio.NewReader(reader) - - var err error - for err == nil { - var s string - s, err = r.ReadString('\n') - if len(s) > 0 { - dataChan <- s + // If the driver can write synchronously, emit writes each log entry directly + // from the goroutine that reads the container's stdio. Otherwise it hands the + // entry to the driver's Process method over a buffered channel, which keeps a + // slow (e.g. network-backed) driver from blocking the container. + // + // The synchronous path matters because, when a container exits, containerd + // closes the stdio FIFOs and then tears the logging process down almost + // immediately. Handing the final chunk to another goroutine to write races + // that teardown and can lose a trailing chunk that has no newline; writing it + // inline does not. https://github.com/containerd/nerdctl/issues/5006 + syncDriver, isSync := driver.(SyncDriver) + emit := func(stream, line string) { + if isSync { + if err := syncDriver.WriteLogEntry(stream, line); err != nil { + log.G(ctx).WithError(err).Error("failed to write log entry") } + return + } + if stream == streamStdout { + stdout <- line + } else { + stderr <- line + } + } + + var wg sync.WaitGroup - if err != nil && err != io.EOF { - log.L.WithError(err).Error("failed to read log") + // processStream reads a container stdio FIFO directly and emits its output + // split into newline-terminated lines. Complete lines are emitted as they are + // read; a trailing fragment without a newline is buffered until more output + // arrives (so a long line is not split) and emitted when the stream ends. + processStream := func(stream string, reader io.Reader, dataChan chan string) { + defer wg.Done() + if !isSync { + defer close(dataChan) + } + buf := make([]byte, 32<<10) + var pending []byte + // emitLines emits each complete (newline-terminated) line, leaving any + // trailing fragment buffered in pending so that a single logical line is + // not split across log entries. + emitLines := func() { + for { + i := bytes.IndexByte(pending, '\n') + if i < 0 { + break + } + emit(stream, string(pending[:i+1])) + pending = pending[i+1:] + } + } + for { + nr, err := reader.Read(buf) + if nr > 0 { + copiedBytes.Add(int64(nr)) + pending = append(pending, buf[:nr]...) + emitLines() + // For a synchronous driver, emit a trailing fragment immediately + // instead of buffering it until the stream ends. The fragment is + // then written to the log before the container's abrupt teardown + // on exit can lose it; for a streaming (channel) driver this would + // only split lines, so it is left buffered there. + if isSync && len(pending) > 0 { + emit(stream, string(pending)) + pending = pending[:0] + } + } + if err != nil { + emitLines() + // The stream has ended: emit any final fragment that did not end + // in a newline. + if len(pending) > 0 { + emit(stream, string(pending)) + } + if !errors.Is(err, io.EOF) && !errors.Is(err, cancelreader.ErrCanceled) { + log.L.WithError(err).Error("failed to read log") + } + return } } } - go processLogFunc(pipeStdoutR, stdout) - go processLogFunc(pipeStderrR, stderr) - go func() { - defer wg.Done() - driver.Process(stdout, stderr) - }() + wg.Add(2) + go processStream(streamStdout, stdoutR, stdout) + go processStream(streamStderr, stderrR, stderr) + if !isSync { + wg.Add(1) + go func() { + defer wg.Done() + driver.Process(stdout, stderr) + }() + } go func() { - // close pipeStdoutW and pipeStderrW upon container exit - defer pipeStdoutW.Close() - defer pipeStderrW.Close() - - exitCh, err := getContainerWait(ctx, address, config) + // Wait for the container to exit, then cancel the readers. containerd + // keeps the stdio FIFO write ends open (so the container can be + // restarted), so the FIFOs may not reach EOF on exit; without this the + // read goroutines, and therefore the logger, could block forever. + exitCh, err := getContainerWait(ctx, address, config, outputSeen) if err != nil { + // We could not determine when the container exits. Do not cancel the + // readers: they will finish on their own when the FIFO reaches EOF. + // Cancelling here could truncate a still-running container. log.G(ctx).Errorf("failed to get container task wait channel: %v", err) return } <-exitCh + stdoutR.Cancel() + stderrR.Cancel() }() wg.Wait() return driver.PostProcess() diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index da0d535b074..ecab183bebc 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -21,7 +21,9 @@ import ( "bytes" "context" "math/rand" + "os" "strings" + "sync" "testing" "time" @@ -58,6 +60,34 @@ func (m *MockDriver) PostProcess() error { return nil } +// SyncMockDriver implements SyncDriver, recording the entries written to it. +type SyncMockDriver struct { + mu sync.Mutex + receivedStdout []string + receivedStderr []string +} + +func (m *SyncMockDriver) Init(dataStore, ns, id string) error { return nil } +func (m *SyncMockDriver) PreProcess(ctx context.Context, dataStore string, config *logging.Config) error { + return nil +} +func (m *SyncMockDriver) Process(stdout <-chan string, stderr <-chan string) error { + // Not used on the synchronous path (the logger calls WriteLogEntry instead), + // but must satisfy the Driver interface. + return nil +} +func (m *SyncMockDriver) PostProcess() error { return nil } +func (m *SyncMockDriver) WriteLogEntry(stream, line string) error { + m.mu.Lock() + defer m.mu.Unlock() + if stream == streamStdout { + m.receivedStdout = append(m.receivedStdout, line) + } else { + m.receivedStderr = append(m.receivedStderr, line) + } + return nil +} + func TestLoggingProcessAdapter(t *testing.T) { // Will process a normal String to stdout and a bigger one to stderr normalString := generateRandomString(1024) @@ -79,7 +109,7 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { exitChan := make(chan containerd.ExitStatus, 1) time.Sleep(50 * time.Millisecond) exitChan <- containerd.ExitStatus{} @@ -112,6 +142,104 @@ func TestLoggingProcessAdapter(t *testing.T) { } } +// TestLoggingProcessAdapterTrailingChunk verifies that the logger forwards all +// of the container's output, including a final chunk that has no trailing +// newline, rather than holding that chunk back until something closes the +// stream. The container's stdio FIFOs are modelled with os.Pipe; closing the +// write end models the container exiting and containerd closing the FIFO. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestLoggingProcessAdapterTrailingChunk(t *testing.T) { + const expected = "'Hello World!\nThere is no newline'" + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stdoutR.Close() + stderrR, stderrW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stderrR.Close() + + driver := &MockDriver{} + config := &logging.Config{ + Stdout: stdoutR, + Stderr: stderrR, + } + + // Write the container's output, including a trailing chunk without a newline, + // then close the write ends to model the container exiting. + if _, err := stdoutW.WriteString(expected); err != nil { + t.Fatal(err) + } + stdoutW.Close() + stderrW.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // getContainerWait never reports an exit here: completion is driven by the + // FIFOs reaching EOF, as it usually is in practice. + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { + return make(chan containerd.ExitStatus), nil + } + + if err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config); err != nil { + t.Fatal(err) + } + + if actual := strings.Join(driver.receivedStdout, ""); actual != expected { + t.Fatalf("stdout is %q, expected %q", actual, expected) + } +} + +// TestLoggingProcessAdapterSyncTrailingChunk verifies the same trailing-chunk +// behaviour for a driver that writes synchronously (SyncDriver), which is the +// path that protects the final chunk from the container's abrupt teardown. +// Regression test for https://github.com/containerd/nerdctl/issues/5006 +func TestLoggingProcessAdapterSyncTrailingChunk(t *testing.T) { + const expected = "'Hello World!\nThere is no newline'" + + stdoutR, stdoutW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stdoutR.Close() + stderrR, stderrW, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer stderrR.Close() + + driver := &SyncMockDriver{} + config := &logging.Config{ + Stdout: stdoutR, + Stderr: stderrR, + } + + if _, err := stdoutW.WriteString(expected); err != nil { + t.Fatal(err) + } + stdoutW.Close() + stderrW.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config, outputSeen func() bool) (<-chan containerd.ExitStatus, error) { + return make(chan containerd.ExitStatus), nil + } + + if err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config); err != nil { + t.Fatal(err) + } + + if actual := strings.Join(driver.receivedStdout, ""); actual != expected { + t.Fatalf("stdout is %q, expected %q", actual, expected) + } +} + // generateRandomString creates a random string of the given size. func generateRandomString(size int) string { characters := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"