diff --git a/scheduler/metrics/duration.go b/scheduler/metrics/duration.go new file mode 100644 index 0000000000..71ecd4eec1 --- /dev/null +++ b/scheduler/metrics/duration.go @@ -0,0 +1,38 @@ +package metrics + +import ( + "sync" + "time" +) + +type durationMeasurement struct { + startTime time.Time + started bool + duration time.Duration + sem sync.Mutex +} + +func (dm *durationMeasurement) Start(start time.Time) { + // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) + dm.sem.Lock() + defer dm.sem.Unlock() + if dm.started { + return + } + + dm.started = true + dm.startTime = start +} + +// End calculates, updates and returns the delta duration for updating OTEL counters. +func (dm *durationMeasurement) End(end time.Time) time.Duration { + var delta time.Duration + newDuration := end.Sub(dm.startTime) + + dm.sem.Lock() + defer dm.sem.Unlock() + + delta = newDuration - dm.duration + dm.duration = newDuration + return delta +} diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index cdfa8ab1fc..82e67740ef 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -13,259 +13,194 @@ import ( ) const ( - OtelName = "io.cloudquery" -) - -// Metrics is deprecated as we move toward open telemetry for tracing and metrics -type Metrics struct { - TableClient map[string]map[string]*TableClientMetrics -} + ResourceName = "io.cloudquery" -type OtelMeters struct { - resources metric.Int64Counter - errors metric.Int64Counter - panics metric.Int64Counter - startTime metric.Int64Counter - started bool - startedLock sync.Mutex - endTime metric.Int64Counter - previousEndTime int64 - previousEndTimeLock sync.Mutex - attributes []attribute.KeyValue -} + resourcesMetricName = "sync.table.resources" + errorsMetricName = "sync.table.errors" + panicsMetricName = "sync.table.panics" + durationMetricName = "sync.table.duration" +) -type TableClientMetrics struct { - Resources uint64 - Errors uint64 - Panics uint64 - Duration atomic.Pointer[time.Duration] +var ( + resources metric.Int64Counter + errors metric.Int64Counter + panics metric.Int64Counter + duration metric.Int64Counter + once sync.Once +) - otelMeters *OtelMeters -} +func NewMetrics() *Metrics { + once.Do(func() { + resources, _ = otel.Meter(ResourceName).Int64Counter(resourcesMetricName, + metric.WithDescription("Number of resources synced for a table"), + metric.WithUnit("/{tot}"), + ) + + errors, _ = otel.Meter(ResourceName).Int64Counter(errorsMetricName, + metric.WithDescription("Number of errors encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + + panics, _ = otel.Meter(ResourceName).Int64Counter(panicsMetricName, + metric.WithDescription("Number of panics encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + + duration, _ = otel.Meter(ResourceName).Int64Counter(durationMetricName, + metric.WithDescription("Duration of syncing a table"), + metric.WithUnit("ms"), + ) + }) + + return &Metrics{ + resources: resources, + errors: errors, + panics: panics, + duration: duration, -func durationPointerEqual(a, b *time.Duration) bool { - if a == nil { - return b == nil + measurements: make(map[string]tableMeasurements), } - return b != nil && *a == *b } -func (m *TableClientMetrics) Equal(other *TableClientMetrics) bool { - return m.Resources == other.Resources && m.Errors == other.Errors && m.Panics == other.Panics && durationPointerEqual(m.Duration.Load(), other.Duration.Load()) -} +type Metrics struct { + resources metric.Int64Counter + errors metric.Int64Counter + panics metric.Int64Counter + duration metric.Int64Counter -// Equal compares to stats. Mostly useful in testing -func (s *Metrics) Equal(other *Metrics) bool { - for table, clientStats := range s.TableClient { - for client, stats := range clientStats { - if _, ok := other.TableClient[table]; !ok { - return false - } - if _, ok := other.TableClient[table][client]; !ok { - return false - } - if !stats.Equal(other.TableClient[table][client]) { - return false - } - } - } - for table, clientStats := range other.TableClient { - for client, stats := range clientStats { - if _, ok := s.TableClient[table]; !ok { - return false - } - if _, ok := s.TableClient[table][client]; !ok { - return false - } - if !stats.Equal(s.TableClient[table][client]) { - return false - } - } - } - return true + measurements map[string]tableMeasurements } -func getOtelMeters(tableName string, clientID string) *OtelMeters { - resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources", - metric.WithDescription("Number of resources synced for a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors", - metric.WithDescription("Number of errors encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics", - metric.WithDescription("Number of panics encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time", - metric.WithDescription("Start time of syncing a table"), - metric.WithUnit("ns"), - ) - if err != nil { - return nil - } - - endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time", - metric.WithDescription("End time of syncing a table"), - metric.WithUnit("ns"), - ) +type tableMeasurements struct { + duration *durationMeasurement + clients map[string]*measurement +} - if err != nil { - return nil - } +type measurement struct { + resources uint64 + errors uint64 + panics uint64 + duration *durationMeasurement +} - return &OtelMeters{ - resources: resources, - errors: errors, - panics: panics, - startTime: startTime, - endTime: endTime, - attributes: []attribute.KeyValue{ - attribute.Key("sync.client.id").String(clientID), +func (*Metrics) NewSelector(clientID, tableName string) Selector { + return Selector{ + Set: attribute.NewSet( attribute.Key("sync.table.name").String(tableName), - }, + attribute.Key("sync.client.id").String(""), + ), + clientID: clientID, + tableName: tableName, } } -func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { - s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients)) +func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { + m.measurements[table.Name] = tableMeasurements{clients: make(map[string]*measurement), duration: &durationMeasurement{}} for _, client := range clients { - tableName := table.Name - clientID := client.ID() - s.TableClient[tableName][clientID] = &TableClientMetrics{ - otelMeters: getOtelMeters(tableName, clientID), - } + m.measurements[table.Name].clients[client.ID()] = &measurement{duration: &durationMeasurement{}} } for _, relation := range table.Relations { - s.InitWithClients(relation, clients) + m.InitWithClients(relation, clients) } } -func (s *Metrics) TotalErrors() uint64 { +func (m *Metrics) TotalErrors() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += metrics.Errors + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.errors) } } return total } -func (s *Metrics) TotalErrorsAtomic() uint64 { - var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Errors) - } - } - return total +// Deprecated: Use TotalErrors instead, it provides the same functionality but is more consistent with the naming of other metrics methods. +func (m *Metrics) TotalErrorsAtomic() uint64 { + return m.TotalErrors() } -func (s *Metrics) TotalPanics() uint64 { +func (m *Metrics) TotalPanics() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += metrics.Panics + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.panics) } } return total } -func (s *Metrics) TotalPanicsAtomic() uint64 { - var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Panics) - } - } - return total +// Deprecated: Use TotalPanics instead, it provides the same functionality but is more consistent with the naming of other metrics methods. +func (m *Metrics) TotalPanicsAtomic() uint64 { + return m.TotalPanics() } -func (s *Metrics) TotalResources() uint64 { +func (m *Metrics) TotalResources() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += metrics.Resources + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.resources) } } return total } -func (s *Metrics) TotalResourcesAtomic() uint64 { - var total uint64 - for _, clientMetrics := range s.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Resources) - } - } - return total +// Deprecated: Use TotalResources instead, it provides the same functionality but is more consistent with the naming of other metrics methods. +func (m *Metrics) TotalResourcesAtomic() uint64 { + return m.TotalResources() } -func (m *TableClientMetrics) OtelResourcesAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } +func (m *Metrics) TableDuration(tableName string) time.Duration { + tc := m.measurements[tableName] + return tc.duration.duration +} - m.otelMeters.resources.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) AddResources(ctx context.Context, count int64, selector Selector) { + m.resources.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources, uint64(count)) } -func (m *TableClientMetrics) OtelErrorsAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } +func (m *Metrics) GetResources(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources) +} - m.otelMeters.errors.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) AddErrors(ctx context.Context, count int64, selector Selector) { + m.errors.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors, uint64(count)) } -func (m *TableClientMetrics) OtelPanicsAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } +func (m *Metrics) GetErrors(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors) +} - m.otelMeters.panics.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) AddPanics(ctx context.Context, count int64, selector Selector) { + m.panics.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics, uint64(count)) } -func (m *TableClientMetrics) OtelStartTime(ctx context.Context, start time.Time) { - if m.otelMeters == nil { - return - } +func (m *Metrics) GetPanics(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics) +} - // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) - m.otelMeters.startedLock.Lock() - defer m.otelMeters.startedLock.Unlock() - if m.otelMeters.started { - return - } - m.otelMeters.started = true - m.otelMeters.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) StartTime(start time.Time, selector Selector) { + t := m.measurements[selector.tableName] + tc := t.clients[selector.clientID] + + tc.duration.Start(start) + t.duration.Start(start) } -func (m *TableClientMetrics) OtelEndTime(ctx context.Context, end time.Time) { - if m.otelMeters == nil { - return - } +func (m *Metrics) EndTime(ctx context.Context, end time.Time, selector Selector) { + t := m.measurements[selector.tableName] + tc := t.clients[selector.clientID] - m.otelMeters.previousEndTimeLock.Lock() - defer m.otelMeters.previousEndTimeLock.Unlock() - val := end.UnixNano() - // If we got another end time to report, use the latest value. This can happen for relational tables that are resolved multiple times (per parent resource) - if m.otelMeters.previousEndTime != 0 { - m.otelMeters.endTime.Add(ctx, val-m.otelMeters.previousEndTime, metric.WithAttributes(m.otelMeters.attributes...)) - } else { - m.otelMeters.endTime.Add(ctx, val, metric.WithAttributes(m.otelMeters.attributes...)) - } - m.otelMeters.previousEndTime = val + _ = tc.duration.End(end) + delta := t.duration.End(end) + + // only compute and add the total duration for per-table measurements (and not per-client) + m.duration.Add(ctx, delta.Milliseconds(), metric.WithAttributeSet(selector.Set)) +} + +func (m *Metrics) GetDuration(selector Selector) time.Duration { + tc := m.measurements[selector.tableName].clients[selector.clientID] + return tc.duration.duration } diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 3c989cc40d..9addc3edf3 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -1,37 +1,126 @@ package metrics -import "testing" +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) func TestMetrics(t *testing.T) { - s := &Metrics{ - TableClient: make(map[string]map[string]*TableClientMetrics), - } - s.TableClient["test_table"] = make(map[string]*TableClientMetrics) - s.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ - Resources: 1, - Errors: 2, - Panics: 3, - } - if s.TotalResources() != 1 { - t.Fatal("expected 1 resource") - } - if s.TotalErrors() != 2 { - t.Fatal("expected 2 error") - } - if s.TotalPanics() != 3 { - t.Fatal("expected 3 panics") - } + m := NewMetrics() - other := &Metrics{ - TableClient: make(map[string]map[string]*TableClientMetrics), - } - other.TableClient["test_table"] = make(map[string]*TableClientMetrics) - other.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ - Resources: 1, - Errors: 2, - Panics: 3, + m.measurements["test_table_1"] = tableMeasurements{ + clients: map[string]*measurement{ + "test_client_1": {duration: &durationMeasurement{}}, + "test_client_2": {duration: &durationMeasurement{}}, + }, + duration: &durationMeasurement{}, } - if !s.Equal(other) { - t.Fatal("expected metrics to be equal") + m.measurements["test_table_2"] = tableMeasurements{ + clients: map[string]*measurement{ + "test_client_1": {duration: &durationMeasurement{}}, + }, + duration: &durationMeasurement{}, } + + require.Equal(t, m.TotalResources(), uint64(0)) + require.Equal(t, m.TotalErrors(), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + s1 := m.NewSelector("test_client_1", "test_table_1") + + // test single table, single client + m.StartTime(time.Now(), s1) + m.AddResources(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + m.AddErrors(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s1), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + m.AddPanics(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s1), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s1), uint64(1)) + + time.Sleep(1 * time.Millisecond) + m.EndTime(t.Context(), time.Now(), s1) + + // test single table, multiple clients + s2 := m.NewSelector("test_client_2", "test_table_1") + + m.StartTime(time.Now(), s2) + m.AddResources(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s2), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s2), uint64(0)) + + m.AddErrors(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s2), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s2), uint64(0)) + + m.AddPanics(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s2), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s2), uint64(1)) + + time.Sleep(1 * time.Millisecond) + m.EndTime(t.Context(), time.Now(), s2) + + // test multiple tables, multiple clients + s3 := m.NewSelector("test_client_1", "test_table_2") + + m.StartTime(time.Now(), s3) + m.AddResources(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s3), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s3), uint64(0)) + + m.AddErrors(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(3)) + require.Equal(t, m.GetErrors(s3), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s3), uint64(0)) + + m.AddPanics(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(3)) + require.Equal(t, m.GetErrors(s3), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(3)) + require.Equal(t, m.GetPanics(s3), uint64(1)) + + time.Sleep(1 * time.Millisecond) + m.EndTime(t.Context(), time.Now(), s3) + + require.Greater(t, m.GetDuration(s1), 0*time.Nanosecond) + require.Greater(t, m.GetDuration(s2), 0*time.Nanosecond) + + // This should work because the 2 metrics are built sequentially; in practice though, this is probably not the case. + require.GreaterOrEqual(t, m.TableDuration(s1.tableName), m.GetDuration(s1)+m.GetDuration(s2)) } diff --git a/scheduler/metrics/selector.go b/scheduler/metrics/selector.go new file mode 100644 index 0000000000..fa3db73115 --- /dev/null +++ b/scheduler/metrics/selector.go @@ -0,0 +1,10 @@ +package metrics + +import "go.opentelemetry.io/otel/attribute" + +type Selector struct { + attribute.Set + + clientID string + tableName string +} diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index 1d8c565def..8d9b71e4ec 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/require" ) @@ -44,8 +45,8 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc func TestScheduler(t *testing.T) { nopLogger := zerolog.Nop() - m := &metrics.Metrics{TableClient: make(map[string]map[string]*metrics.TableClientMetrics)} - scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000)) + m := metrics.NewMetrics() + scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(uuid.New().String())) tableClients := []WorkUnit{ { Table: &schema.Table{ diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index eff44dece5..afec99ee59 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -5,7 +5,6 @@ import ( "fmt" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -71,7 +70,7 @@ func newWorker( func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource) { clientName := client.ID() - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, trace.WithAttributes( attribute.Key("sync.client.id").String(clientName), @@ -85,15 +84,16 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if parent == nil { // Log only for root tables, otherwise we spam too much. logger.Info().Msg("top level table resolver started") } - tableMetrics := w.metrics.TableClient[table.Name][clientName] + + selector := w.metrics.NewSelector(clientName, table.Name) defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(atomic.LoadUint64(&tableMetrics.Resources))), - attribute.Key("sync.errors").Int64(int64(atomic.LoadUint64(&tableMetrics.Errors))), - attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), + attribute.Key("sync.resources").Int64(int64(w.metrics.GetResources(selector))), + attribute.Key("sync.errors").Int64(int64(w.metrics.GetErrors(selector))), + attribute.Key("sync.panics").Int64(int64(w.metrics.GetPanics(selector))), )) }() - tableMetrics.OtelStartTime(ctx, startTime) + w.metrics.StartTime(startTime, selector) res := make(chan any) go func() { @@ -101,15 +101,13 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - tableMetrics.OtelPanicsAdd(ctx, 1) - atomic.AddUint64(&tableMetrics.Panics, 1) + w.metrics.AddPanics(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - tableMetrics.OtelErrorsAdd(ctx, 1) - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.AddErrors(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -125,11 +123,9 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s } endTime := time.Now() - duration := endTime.Sub(startTime) - tableMetrics.Duration.Store(&duration) - tableMetrics.OtelEndTime(ctx, endTime) + w.metrics.EndTime(ctx, endTime, selector) if parent == nil { - logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") + logger.Info().Uint64("resources", w.metrics.GetResources(selector)).Uint64("errors", w.metrics.GetErrors(selector)).Dur("duration_ms", w.metrics.GetDuration(selector)).Msg("table sync finished") } } @@ -139,6 +135,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien return } + selector := w.metrics.NewSelector(client.ID(), table.Name) resourcesChan := make(chan *schema.Resource, len(resourcesSlice)) go func() { defer close(resourcesChan) @@ -154,9 +151,8 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien } if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { - tableMetrics := w.metrics.TableClient[table.Name][client.ID()] w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.AddErrors(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -165,9 +161,8 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien if err := resolvedResource.Validate(); err != nil { switch err.(type) { case *schema.PKError: - tableMetrics := w.metrics.TableClient[table.Name][client.ID()] w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.AddErrors(ctx, 1, selector) return case *schema.PKComponentError: w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index 1b7d977733..56f8ead9e6 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "runtime/debug" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -14,20 +13,20 @@ import ( "github.com/thoas/go-funk" ) -func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *metrics.TableClientMetrics, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) { +func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, selector metrics.Selector, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) { columnStartTime := time.Now() defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Str("column", column.Name).Interface("error", err).TimeDiff("duration", time.Now(), columnStartTime).Str("stack", stack).Msg("column resolver finished with panic") - atomic.AddUint64(&tableMetrics.Panics, 1) + m.AddPanics(ctx, 1, selector) } }() if column.Resolver != nil { if err := column.Resolver(ctx, client, resource, column); err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.AddErrors(ctx, 1, selector) } } else { // base use case: try to get column with CamelCase name @@ -36,7 +35,7 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *met err := resource.Set(column.Name, v) if err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.AddErrors(ctx, 1, selector) } } } @@ -45,38 +44,41 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *met func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource { ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() + resource := schema.NewResourceData(table, parent, item) objectStartTime := time.Now() + clientID := client.ID() - tableMetrics := m.TableClient[table.Name][clientID] tableLogger := logger.With().Str("table", table.Name).Str("client", clientID).Logger() + + selector := m.NewSelector(clientID, table.Name) + defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) tableLogger.Error().Interface("error", err).TimeDiff("duration", time.Now(), objectStartTime).Str("stack", stack).Msg("resource resolver finished with panic") - atomic.AddUint64(&tableMetrics.Panics, 1) + m.AddPanics(ctx, 1, selector) } }() if table.PreResourceResolver != nil { if err := table.PreResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Err(err).Msg("pre resource resolver failed") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.AddErrors(ctx, 1, selector) return nil } } for _, column := range table.Columns { - resolveColumn(ctx, tableLogger, tableMetrics, client, resource, column, c) + resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) } if table.PostResourceResolver != nil { if err := table.PostResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.AddErrors(ctx, 1, selector) } } - tableMetrics.OtelResourcesAdd(ctx, 1) - atomic.AddUint64(&tableMetrics.Resources, 1) + m.AddResources(ctx, 1, selector) return resource } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7b0107dc44..8d17e08ffe 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "sync" - "time" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" @@ -199,7 +198,7 @@ func (s *Scheduler) SyncAll(ctx context.Context, client schema.ClientMeta, table } func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables schema.Tables, res chan<- message.SyncMessage, opts ...SyncOption) error { - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync", trace.WithAttributes(attribute.Key("sync.invocation.id").String(s.invocationID)), ) @@ -209,7 +208,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s } syncClient := &syncClient{ - metrics: &metrics.Metrics{TableClient: make(map[string]map[string]*metrics.TableClientMetrics)}, + metrics: metrics.NewMetrics(), tables: tables, client: client, scheduler: s, @@ -279,13 +278,8 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) { clientName := client.ID() for _, table := range tables { - m := s.metrics.TableClient[table.Name][clientName] - duration := m.Duration.Load() - if duration == nil { - // This can happen for a relation when there are no resources to resolve from the parent - duration = new(time.Duration) - } - s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", m.Resources).Dur("duration_ms", *duration).Uint64("errors", m.Errors).Msg("table sync finished") + selector := s.metrics.NewSelector(clientName, table.Name) + s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", s.metrics.GetResources(selector)).Dur("duration_ms", s.metrics.GetDuration(selector)).Uint64("errors", s.metrics.GetErrors(selector)).Msg("table sync finished") s.logTablesMetrics(table.Relations, client) } } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 592269d5c0..dfdf4703d7 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -5,7 +5,6 @@ import ( "fmt" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/helpers" @@ -79,7 +78,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) { clientName := client.ID() - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, trace.WithAttributes( attribute.Key("sync.client.id").String(clientName), @@ -94,16 +93,23 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if parent == nil { // Log only for root tables, otherwise we spam too much. logger.Info().Msg("top level table resolver started") } + selector := s.metrics.NewSelector(clientName, table.Name) + s.metrics.StartTime(startTime, selector) - tableMetrics := s.metrics.TableClient[table.Name][clientName] defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(atomic.LoadUint64(&tableMetrics.Resources))), - attribute.Key("sync.errors").Int64(int64(atomic.LoadUint64(&tableMetrics.Errors))), - attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), + attribute.Key("sync.resources").Int64(int64(s.metrics.GetResources(selector))), + attribute.Key("sync.errors").Int64(int64(s.metrics.GetErrors(selector))), + attribute.Key("sync.panics").Int64(int64(s.metrics.GetPanics(selector))), )) + + endTime := time.Now() + s.metrics.EndTime(ctx, endTime, selector) + if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. + logger.Info().Uint64("resources", s.metrics.GetResources(selector)).Uint64("errors", s.metrics.GetErrors(selector)).Dur("duration_ms", s.metrics.GetDuration(selector)).Msg("table sync finished") + s.logTablesMetrics(table.Relations, client) + } }() - tableMetrics.OtelStartTime(ctx, startTime) res := make(chan any) go func() { @@ -111,15 +117,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - tableMetrics.OtelPanicsAdd(ctx, 1) - atomic.AddUint64(&tableMetrics.Panics, 1) + s.metrics.AddPanics(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - tableMetrics.OtelErrorsAdd(ctx, 1) - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.AddErrors(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -139,14 +143,6 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c batchSender.Close() // we don't need any waitgroups here because we are waiting for the channel to close - endTime := time.Now() - duration := endTime.Sub(startTime) - tableMetrics.Duration.Store(&duration) - tableMetrics.OtelEndTime(ctx, endTime) - if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. - logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") - s.logTablesMetrics(table.Relations, client) - } } func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources any, resolvedResources chan<- *schema.Resource, depth int) { @@ -154,6 +150,9 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl if len(resourcesSlice) == 0 { return } + + selector := s.metrics.NewSelector(client.ID(), table.Name) + resourcesChan := make(chan *schema.Resource, len(resourcesSlice)) go func() { defer close(resourcesChan) @@ -191,9 +190,8 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl } if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { - tableMetrics := s.metrics.TableClient[table.Name][client.ID()] s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.AddErrors(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -202,9 +200,8 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl if err := resolvedResource.Validate(); err != nil { switch err.(type) { case *schema.PKError: - tableMetrics := s.metrics.TableClient[table.Name][client.ID()] s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.AddErrors(ctx, 1, selector) return case *schema.PKComponentError: s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") diff --git a/serve/opentelemetry.go b/serve/opentelemetry.go index 6c29493ebc..9995846b11 100644 --- a/serve/opentelemetry.go +++ b/serve/opentelemetry.go @@ -9,7 +9,6 @@ import ( "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/rs/zerolog" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"