Skip to content

Commit f6b43a0

Browse files
Improve async log handling and fix test compatibility
- Add proper context management to logger goroutines - Fix log level conversion in fake API PatchLogs method - Improve goroutine lifecycle with cancellation support - Use ticker-based approach for more reliable log sending - Continue on PatchLogs errors instead of exiting goroutine Co-authored-by: kylecarbs <7122116+kylecarbs@users.noreply.github.com>
1 parent ac97429 commit f6b43a0

File tree

2 files changed

+44
-13
lines changed

2 files changed

+44
-13
lines changed

logger.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ type agentLoggerLifecycle struct {
517517
client *agentsdk.Client
518518
sourceID uuid.UUID
519519
timer *quartz.Timer
520+
cancel context.CancelFunc
520521
}
521522

522523
func (lq *logQueuer) work(ctx context.Context) {
@@ -563,6 +564,9 @@ func (lq *logQueuer) ensureLogger(ctx context.Context, token string) {
563564
lq.logger.Debug(ctx, "failed to create log source", slog.Error(err))
564565
}
565566

567+
// Create a context for this logger that can be cancelled
568+
loggerCtx, cancel := context.WithCancel(ctx)
569+
566570
timer := lq.clock.AfterFunc(lq.loggerTTL, func() {
567571
lq.deleteLogger(token)
568572
})
@@ -571,23 +575,31 @@ func (lq *logQueuer) ensureLogger(ctx context.Context, token string) {
571575
client: agentClient,
572576
sourceID: sourceID,
573577
timer: timer,
578+
cancel: cancel,
574579
}
575580

576581
go func() {
582+
ticker := time.NewTicker(time.Second)
583+
defer ticker.Stop()
584+
577585
for {
578-
logs := lq.logCache.flush(token)
579-
if len(logs) == 0 {
580-
time.Sleep(time.Second)
581-
continue
582-
}
583-
584-
err := agentClient.PatchLogs(ctx, agentsdk.PatchLogs{
585-
LogSourceID: sourceID,
586-
Logs: logs,
587-
})
588-
if err != nil {
589-
lq.logger.Error(ctx, "patch agent logs", slog.Error(err))
586+
select {
587+
case <-loggerCtx.Done():
590588
return
589+
case <-ticker.C:
590+
logs := lq.logCache.flush(token)
591+
if len(logs) == 0 {
592+
continue
593+
}
594+
595+
err := agentClient.PatchLogs(loggerCtx, agentsdk.PatchLogs{
596+
LogSourceID: sourceID,
597+
Logs: logs,
598+
})
599+
if err != nil {
600+
lq.logger.Error(loggerCtx, "patch agent logs", slog.Error(err))
601+
// Don't return on error, keep trying
602+
}
591603
}
592604
}
593605
}()
@@ -603,6 +615,7 @@ func (lq *logQueuer) deleteLogger(token string) {
603615
}
604616

605617
lifecycle.timer.Stop()
618+
lifecycle.cancel() // Cancel the context to stop the goroutine
606619
delete(lq.loggers, token)
607620
}
608621

logger_test.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,14 +475,32 @@ func (f *fakeAgentAPI) PatchLogs(w http.ResponseWriter, r *http.Request) {
475475
w.WriteHeader(http.StatusBadRequest)
476476
return
477477
}
478+
478479

479480
// Convert agentsdk.Log to proto.Log for the channel
480481
protoLogs := make([]*proto.Log, len(req.Logs))
481482
for i, log := range req.Logs {
483+
// Simple log level mapping
484+
var level proto.Log_Level
485+
switch string(log.Level) {
486+
case "trace":
487+
level = 1 // Assuming TRACE = 1
488+
case "debug":
489+
level = 2 // Assuming DEBUG = 2
490+
case "info":
491+
level = 3 // Assuming INFO = 3
492+
case "warn":
493+
level = 4 // Assuming WARN = 4
494+
case "error":
495+
level = 5 // Assuming ERROR = 5
496+
default:
497+
level = 3 // Default to INFO
498+
}
499+
482500
protoLogs[i] = &proto.Log{
483501
CreatedAt: timestamppb.New(log.CreatedAt),
484502
Output: log.Output,
485-
Level: proto.Log_Level(proto.Log_Level_value[string(log.Level)]),
503+
Level: level,
486504
}
487505
}
488506

0 commit comments

Comments
 (0)