added cooldown to logger buffered writer

This commit is contained in:
Sergey Chubaryan 2024-08-25 20:06:12 +03:00
parent 64759c2227
commit 49346b05fa
2 changed files with 53 additions and 37 deletions

View File

@ -2,32 +2,68 @@ package logger
import ( import (
"bufio" "bufio"
"context"
"io"
"sync" "sync"
"time"
) )
const FlushInterval = 500 * time.Millisecond
type bufioWrapper struct { type bufioWrapper struct {
*bufio.Writer writer *bufio.Writer
m *sync.RWMutex ticker *time.Ticker
mutex *sync.RWMutex
}
func newWrapper(writer io.Writer) *bufioWrapper {
ticker := time.NewTicker(FlushInterval)
ticker.Stop()
return &bufioWrapper{
writer: bufio.NewWriterSize(writer, 128*1024),
mutex: &sync.RWMutex{},
ticker: ticker,
}
}
func (b *bufioWrapper) FlushRoutine(ctx context.Context) {
go func() {
b.ticker.Reset(FlushInterval)
defer b.ticker.Stop()
for {
b.flush()
select {
case <-ctx.Done():
b.flush()
return
case <-b.ticker.C:
}
}
}()
} }
func (b *bufioWrapper) Write(p []byte) (nn int, err error) { func (b *bufioWrapper) Write(p []byte) (nn int, err error) {
// TODO: try replace mutex, improve logging perfomance // TODO: try replace mutex, improve logging perfomance
b.m.RLock() b.mutex.RLock()
defer b.m.RUnlock() defer b.mutex.RUnlock()
return b.Writer.Write(p) if len(p) > b.writer.Available() {
b.ticker.Reset(FlushInterval)
} }
func (b *bufioWrapper) Flush() error { return b.writer.Write(p)
b.m.Lock() }
defer b.m.Unlock()
return b.Writer.Flush() func (b *bufioWrapper) flush() error {
b.mutex.Lock()
defer b.mutex.Unlock()
return b.writer.Flush()
} }
func (b *bufioWrapper) Close() error { func (b *bufioWrapper) Close() error {
b.m.Lock() return b.flush()
defer b.m.Unlock()
return b.Writer.Flush()
} }

View File

@ -1,12 +1,9 @@
package logger package logger
import ( import (
"bufio"
"context" "context"
"io" "io"
"os" "os"
"sync"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
@ -25,7 +22,7 @@ type NewLoggerOpts struct {
// OutputStream OutputStream // OutputStream OutputStream
} }
func New(opts NewLoggerOpts) (Logger, error) { func New(ctx context.Context, opts NewLoggerOpts) (Logger, error) {
// TODO: pass output streams from opts // TODO: pass output streams from opts
writers := []io.Writer{} writers := []io.Writer{}
writers = append(writers, os.Stderr) writers = append(writers, os.Stderr)
@ -45,26 +42,9 @@ func New(opts NewLoggerOpts) (Logger, error) {
} }
// TODO: move to wrapper, determine optimal buffer size // TODO: move to wrapper, determine optimal buffer size
writer := bufio.NewWriterSize(io.MultiWriter(writers...), 8*1024) writer := io.MultiWriter(writers...)
wrapper := &bufioWrapper{writer, &sync.RWMutex{}} wrapper := newWrapper(writer)
wrapper.FlushRoutine(ctx)
// Periodically flush buffer
go func() {
// TODO: add cooldown if flush was triggered by overfow
tmr := time.NewTicker(500 * time.Millisecond)
defer tmr.Stop()
for {
wrapper.Flush()
select {
case <-context.Background().Done():
wrapper.Flush()
return
case <-tmr.C:
}
}
}()
l := zerolog.New(wrapper).Level(level).With().Timestamp().Logger() l := zerolog.New(wrapper).Level(level).With().Timestamp().Logger()
return &logger{ return &logger{