From 8ab884118c0b5bf2a069bc51bab26fe16addc7aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Wed, 4 Jun 2025 19:31:46 +0300 Subject: [PATCH 1/9] fix: Improve telemetry allocations --- go.mod | 8 + go.sum | 16 ++ scheduler/metrics/metrics.go | 234 +++++++++++++++--------------- scheduler/metrics/metrics_test.go | 8 +- scheduler/queue/scheduler_test.go | 2 +- scheduler/queue/worker.go | 8 +- scheduler/resolvers/resolvers.go | 2 +- scheduler/scheduler.go | 2 +- scheduler/scheduler_dfs.go | 8 +- serve/opentelemetry.go | 18 +++ 10 files changed, 172 insertions(+), 134 deletions(-) diff --git a/go.mod b/go.mod index b1366402c9..0a4de9fcb7 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.8 github.com/invopop/jsonschema v0.13.0 github.com/mitchellh/hashstructure/v2 v2.0.2 + github.com/prometheus/client_golang v1.22.0 github.com/rs/zerolog v1.34.0 github.com/samber/lo v1.49.1 github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 @@ -32,6 +33,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 + go.opentelemetry.io/otel/exporters/prometheus v0.58.0 go.opentelemetry.io/otel/log v0.12.2 go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 @@ -62,8 +64,10 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect github.com/aws/smithy-go v1.22.4 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v5 v5.0.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -77,9 +81,13 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.64.0 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect diff --git a/go.sum b/go.sum index 376de6b4ec..5b28e8fbd9 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/aws/smithy-go v1.22.4 h1:uqXzVZNuNexwc/xrh6Tb56u89WDlJY6HS+KC0S4QSjw= github.com/aws/smithy-go v1.22.4/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M= github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= @@ -115,6 +117,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -129,6 +133,8 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= @@ -136,6 +142,14 @@ github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= +github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4= +github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -186,6 +200,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 h1:dNzwXjZKpMpE2JhmO+9 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0/go.mod h1:90PoxvaEB5n6AOdZvi+yWJQoE95U8Dhhw2bSyRqnTD0= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 h1:nRVXXvf78e00EwY6Wp0YII8ww2JVWshZ20HfTlE11AM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0/go.mod h1:r49hO7CgrxY9Voaj3Xe8pANWtr0Oq916d0XAmOoCZAQ= +go.opentelemetry.io/otel/exporters/prometheus v0.58.0 h1:CJAxWKFIqdBennqxJyOgnt5LqkeFRT+Mz3Yjz3hL+h8= +go.opentelemetry.io/otel/exporters/prometheus v0.58.0/go.mod h1:7qo/4CLI+zYSNbv0GMNquzuss2FVZo3OYrGh96n4HNc= go.opentelemetry.io/otel/log v0.12.2 h1:yob9JVHn2ZY24byZeaXpTVoPS6l+UrrxmxmPKohXTwc= go.opentelemetry.io/otel/log v0.12.2/go.mod h1:ShIItIxSYxufUMt+1H5a2wbckGli3/iCfuEbVZi/98E= go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index cdfa8ab1fc..b5f940fe0c 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -16,22 +16,77 @@ const ( OtelName = "io.cloudquery" ) +func NewMetrics() *Metrics { + resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources", + metric.WithDescription("Number of resources synced for a table"), + metric.WithUnit("/{tot}"), + ) + if err != nil { + return nil + } + + errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors", + metric.WithDescription("Number of errors encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + if err != nil { + return nil + } + + panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics", + metric.WithDescription("Number of panics encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + if err != nil { + return nil + } + + startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time", + metric.WithDescription("Start time of syncing a table"), + metric.WithUnit("ns"), + ) + if err != nil { + return nil + } + + endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time", + metric.WithDescription("End time of syncing a table"), + metric.WithUnit("ns"), + ) + + if err != nil { + return nil + } + + return &Metrics{ + TableClient: make(map[string]map[string]*TableClientMetrics), + resources: resources, + errors: errors, + panics: panics, + startTime: startTime, + endTime: endTime, + } +} + // Metrics is deprecated as we move toward open telemetry for tracing and metrics type Metrics struct { - TableClient map[string]map[string]*TableClientMetrics -} + resources metric.Int64Counter + errors metric.Int64Counter + panics metric.Int64Counter + + startTime metric.Int64Counter + started bool + startedLock sync.Mutex -type OtelMeters struct { - resources metric.Int64Counter - errors metric.Int64Counter - panics metric.Int64Counter - startTime metric.Int64Counter - started bool - startedLock sync.Mutex endTime metric.Int64Counter previousEndTime int64 previousEndTimeLock sync.Mutex - attributes []attribute.KeyValue + + TableClient map[string]map[string]*TableClientMetrics +} + +type OtelMeters struct { + attributes []attribute.KeyValue } type TableClientMetrics struct { @@ -55,8 +110,8 @@ func (m *TableClientMetrics) Equal(other *TableClientMetrics) bool { } // Equal compares to stats. Mostly useful in testing -func (s *Metrics) Equal(other *Metrics) bool { - for table, clientStats := range s.TableClient { +func (m *Metrics) Equal(other *Metrics) bool { + for table, clientStats := range m.TableClient { for client, stats := range clientStats { if _, ok := other.TableClient[table]; !ok { return false @@ -71,13 +126,13 @@ func (s *Metrics) Equal(other *Metrics) bool { } for table, clientStats := range other.TableClient { for client, stats := range clientStats { - if _, ok := s.TableClient[table]; !ok { + if _, ok := m.TableClient[table]; !ok { return false } - if _, ok := s.TableClient[table][client]; !ok { + if _, ok := m.TableClient[table][client]; !ok { return false } - if !stats.Equal(s.TableClient[table][client]) { + if !stats.Equal(m.TableClient[table][client]) { return false } } @@ -85,78 +140,30 @@ func (s *Metrics) Equal(other *Metrics) bool { return true } -func getOtelMeters(tableName string, clientID string) *OtelMeters { - resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources", - metric.WithDescription("Number of resources synced for a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors", - metric.WithDescription("Number of errors encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics", - metric.WithDescription("Number of panics encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time", - metric.WithDescription("Start time of syncing a table"), - metric.WithUnit("ns"), - ) - if err != nil { - return nil - } - - endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time", - metric.WithDescription("End time of syncing a table"), - metric.WithUnit("ns"), - ) - - if err != nil { - return nil - } - - return &OtelMeters{ - resources: resources, - errors: errors, - panics: panics, - startTime: startTime, - endTime: endTime, - attributes: []attribute.KeyValue{ - attribute.Key("sync.client.id").String(clientID), - attribute.Key("sync.table.name").String(tableName), - }, +func GetOtelAttributeSet(tableName string, clientID string) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.Key("sync.client.id").String(clientID), + attribute.Key("sync.table.name").String(tableName), } } -func (s *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { - s.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients)) +func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { + m.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients)) for _, client := range clients { tableName := table.Name clientID := client.ID() - s.TableClient[tableName][clientID] = &TableClientMetrics{ - otelMeters: getOtelMeters(tableName, clientID), + m.TableClient[tableName][clientID] = &TableClientMetrics{ + otelMeters: &OtelMeters{attributes: GetOtelAttributeSet(tableName, clientID)}, } } for _, relation := range table.Relations { - s.InitWithClients(relation, clients) + m.InitWithClients(relation, clients) } } -func (s *Metrics) TotalErrors() uint64 { +func (m *Metrics) TotalErrors() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += metrics.Errors } @@ -164,9 +171,9 @@ func (s *Metrics) TotalErrors() uint64 { return total } -func (s *Metrics) TotalErrorsAtomic() uint64 { +func (m *Metrics) TotalErrorsAtomic() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += atomic.LoadUint64(&metrics.Errors) } @@ -174,9 +181,9 @@ func (s *Metrics) TotalErrorsAtomic() uint64 { return total } -func (s *Metrics) TotalPanics() uint64 { +func (m *Metrics) TotalPanics() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += metrics.Panics } @@ -184,9 +191,9 @@ func (s *Metrics) TotalPanics() uint64 { return total } -func (s *Metrics) TotalPanicsAtomic() uint64 { +func (m *Metrics) TotalPanicsAtomic() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += atomic.LoadUint64(&metrics.Panics) } @@ -194,9 +201,9 @@ func (s *Metrics) TotalPanicsAtomic() uint64 { return total } -func (s *Metrics) TotalResources() uint64 { +func (m *Metrics) TotalResources() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += metrics.Resources } @@ -204,9 +211,9 @@ func (s *Metrics) TotalResources() uint64 { return total } -func (s *Metrics) TotalResourcesAtomic() uint64 { +func (m *Metrics) TotalResourcesAtomic() uint64 { var total uint64 - for _, clientMetrics := range s.TableClient { + for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { total += atomic.LoadUint64(&metrics.Resources) } @@ -214,58 +221,51 @@ func (s *Metrics) TotalResourcesAtomic() uint64 { return total } -func (m *TableClientMetrics) OtelResourcesAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } - - m.otelMeters.resources.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) OtelResourcesAdd(ctx context.Context, count int64, tc *TableClientMetrics) { + m.resources.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) + atomic.AddUint64(&tc.Resources, uint64(count)) } -func (m *TableClientMetrics) OtelErrorsAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } - - m.otelMeters.errors.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) OtelErrorsAdd(ctx context.Context, count int64, tc *TableClientMetrics) { + m.errors.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) + atomic.AddUint64(&tc.Errors, uint64(count)) } -func (m *TableClientMetrics) OtelPanicsAdd(ctx context.Context, count int64) { - if m.otelMeters == nil { - return - } - - m.otelMeters.panics.Add(ctx, count, metric.WithAttributes(m.otelMeters.attributes...)) +func (m *Metrics) OtelPanicsAdd(ctx context.Context, count int64, tc *TableClientMetrics) { + m.panics.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) + atomic.AddUint64(&tc.Panics, uint64(count)) } -func (m *TableClientMetrics) OtelStartTime(ctx context.Context, start time.Time) { - if m.otelMeters == nil { +func (m *Metrics) OtelStartTime(ctx context.Context, start time.Time, tc *TableClientMetrics) { + if m.startTime == nil { return } // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) - m.otelMeters.startedLock.Lock() - defer m.otelMeters.startedLock.Unlock() - if m.otelMeters.started { + m.startedLock.Lock() + defer m.startedLock.Unlock() + if m.started { return } - m.otelMeters.started = true - m.otelMeters.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(m.otelMeters.attributes...)) + + m.started = true + m.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(tc.otelMeters.attributes...)) } -func (m *TableClientMetrics) OtelEndTime(ctx context.Context, end time.Time) { - if m.otelMeters == nil { +func (m *Metrics) OtelEndTime(ctx context.Context, end time.Time, tc *TableClientMetrics) { + if m.endTime == nil { return } - m.otelMeters.previousEndTimeLock.Lock() - defer m.otelMeters.previousEndTimeLock.Unlock() + m.previousEndTimeLock.Lock() + defer m.previousEndTimeLock.Unlock() val := end.UnixNano() + // If we got another end time to report, use the latest value. This can happen for relational tables that are resolved multiple times (per parent resource) - if m.otelMeters.previousEndTime != 0 { - m.otelMeters.endTime.Add(ctx, val-m.otelMeters.previousEndTime, metric.WithAttributes(m.otelMeters.attributes...)) + if m.previousEndTime != 0 { + m.endTime.Add(ctx, val-m.previousEndTime, metric.WithAttributes(tc.otelMeters.attributes...)) } else { - m.otelMeters.endTime.Add(ctx, val, metric.WithAttributes(m.otelMeters.attributes...)) + m.endTime.Add(ctx, val, metric.WithAttributes(tc.otelMeters.attributes...)) } - m.otelMeters.previousEndTime = val + m.previousEndTime = val } diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 3c989cc40d..33a8cdd4e4 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -3,9 +3,7 @@ package metrics import "testing" func TestMetrics(t *testing.T) { - s := &Metrics{ - TableClient: make(map[string]map[string]*TableClientMetrics), - } + s := NewMetrics() s.TableClient["test_table"] = make(map[string]*TableClientMetrics) s.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ Resources: 1, @@ -22,9 +20,7 @@ func TestMetrics(t *testing.T) { t.Fatal("expected 3 panics") } - other := &Metrics{ - TableClient: make(map[string]map[string]*TableClientMetrics), - } + other := NewMetrics() other.TableClient["test_table"] = make(map[string]*TableClientMetrics) other.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ Resources: 1, diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index 1d8c565def..ae63fb2ff5 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -44,7 +44,7 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc func TestScheduler(t *testing.T) { nopLogger := zerolog.Nop() - m := &metrics.Metrics{TableClient: make(map[string]map[string]*metrics.TableClientMetrics)} + m := metrics.NewMetrics() scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000)) tableClients := []WorkUnit{ { diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index eff44dece5..cf0fa88a02 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -93,7 +93,7 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), )) }() - tableMetrics.OtelStartTime(ctx, startTime) + w.metrics.OtelStartTime(ctx, startTime, tableMetrics) res := make(chan any) go func() { @@ -101,14 +101,14 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - tableMetrics.OtelPanicsAdd(ctx, 1) + w.metrics.OtelPanicsAdd(ctx, 1, tableMetrics) atomic.AddUint64(&tableMetrics.Panics, 1) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - tableMetrics.OtelErrorsAdd(ctx, 1) + w.metrics.OtelErrorsAdd(ctx, 1, tableMetrics) atomic.AddUint64(&tableMetrics.Errors, 1) // Send SyncError message syncErrorMsg := &message.SyncError{ @@ -127,7 +127,7 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s endTime := time.Now() duration := endTime.Sub(startTime) tableMetrics.Duration.Store(&duration) - tableMetrics.OtelEndTime(ctx, endTime) + w.metrics.OtelEndTime(ctx, endTime, tableMetrics) if parent == nil { logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") } diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index 1b7d977733..935fca7721 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -76,7 +76,7 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric } } - tableMetrics.OtelResourcesAdd(ctx, 1) + m.OtelResourcesAdd(ctx, 1, tableMetrics) atomic.AddUint64(&tableMetrics.Resources, 1) return resource } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7b0107dc44..c98e06c024 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -209,7 +209,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s } syncClient := &syncClient{ - metrics: &metrics.Metrics{TableClient: make(map[string]map[string]*metrics.TableClientMetrics)}, + metrics: metrics.NewMetrics(), tables: tables, client: client, scheduler: s, diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 592269d5c0..65ed9835e4 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -103,7 +103,7 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), )) }() - tableMetrics.OtelStartTime(ctx, startTime) + s.metrics.OtelStartTime(ctx, startTime, tableMetrics) res := make(chan any) go func() { @@ -111,14 +111,14 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - tableMetrics.OtelPanicsAdd(ctx, 1) + s.metrics.OtelPanicsAdd(ctx, 1, tableMetrics) atomic.AddUint64(&tableMetrics.Panics, 1) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - tableMetrics.OtelErrorsAdd(ctx, 1) + s.metrics.OtelErrorsAdd(ctx, 1, tableMetrics) atomic.AddUint64(&tableMetrics.Errors, 1) // Send SyncError message syncErrorMsg := &message.SyncError{ @@ -142,7 +142,7 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c endTime := time.Now() duration := endTime.Sub(startTime) tableMetrics.Duration.Store(&duration) - tableMetrics.OtelEndTime(ctx, endTime) + s.metrics.OtelEndTime(ctx, endTime, tableMetrics) if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") s.logTablesMetrics(table.Relations, client) diff --git a/serve/opentelemetry.go b/serve/opentelemetry.go index 6c29493ebc..680aeaa2eb 100644 --- a/serve/opentelemetry.go +++ b/serve/opentelemetry.go @@ -4,11 +4,14 @@ import ( "context" "encoding/json" "fmt" + "net/http" "reflect" "time" "github.com/cloudquery/plugin-sdk/v4/plugin" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" + "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -120,6 +123,11 @@ func getLogsProcessor(ctx context.Context, opts otelConfig) (*log.BatchProcessor } func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, otelEndpoint string, otelEndpointInsecure bool) (shutdown func(), err error) { + exporter, err := prometheus.New() + if err != nil { + return nil, err + } + if otelEndpoint == "" { return nil, nil } @@ -150,6 +158,7 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote mt := metric.NewMeterProvider( metric.WithReader(metricReader), + metric.WithReader(exporter), metric.WithResource(pluginResource), ) @@ -165,6 +174,15 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote otel.SetMeterProvider(mt) logglobal.SetLoggerProvider(lp) + go func() { + logger.Info().Msg("serving metrics at localhost:9121/metrics") + http.Handle("/metrics", promhttp.Handler()) + if err := http.ListenAndServe(":9121", nil); err != nil { + fmt.Printf("error serving http: %v", err) + return + } + }() + shutdown = func() { if err := tp.Shutdown(context.Background()); err != nil { logger.Error().Err(err).Msg("failed to shutdown OTLP trace provider") From a1a5179bba7e53ab964644c015a33f38f82025ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Fri, 6 Jun 2025 15:53:13 +0300 Subject: [PATCH 2/9] fix: Compute counters per-table and not per table per client --- scheduler/metrics/metrics.go | 183 +++++++++++++-------------- scheduler/metrics/metrics_test.go | 24 ++-- scheduler/metrics/selector.go | 10 ++ scheduler/queue/scheduler_test.go | 8 +- scheduler/queue/worker.go | 37 +++--- scheduler/resolvers/resolvers.go | 26 ++-- scheduler/scheduler.go | 11 +- scheduler/scheduler_debug.go | 2 +- scheduler/scheduler_dfs.go | 37 +++--- scheduler/scheduler_round_robin.go | 2 +- scheduler/scheduler_shuffle.go | 2 +- scheduler/scheduler_shuffle_queue.go | 2 +- serve/opentelemetry.go | 7 +- 13 files changed, 179 insertions(+), 172 deletions(-) create mode 100644 scheduler/metrics/selector.go diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index b5f940fe0c..5c15102dda 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -13,11 +13,16 @@ import ( ) const ( - OtelName = "io.cloudquery" + ResourceName = "io.cloudquery" + + resourcesMetricName = "sync.table.resources" + errorsMetricName = "sync.table.errors" + panicsMetricName = "sync.table.panics" + durationMetricName = "sync.table.duration" ) -func NewMetrics() *Metrics { - resources, err := otel.Meter(OtelName).Int64Counter("sync.table.resources", +func NewMetrics(invocationID string) *Metrics { + resources, err := otel.Meter(ResourceName).Int64Counter(resourcesMetricName, metric.WithDescription("Number of resources synced for a table"), metric.WithUnit("/{tot}"), ) @@ -25,7 +30,7 @@ func NewMetrics() *Metrics { return nil } - errors, err := otel.Meter(OtelName).Int64Counter("sync.table.errors", + errors, err := otel.Meter(ResourceName).Int64Counter(errorsMetricName, metric.WithDescription("Number of errors encountered while syncing a table"), metric.WithUnit("/{tot}"), ) @@ -33,7 +38,7 @@ func NewMetrics() *Metrics { return nil } - panics, err := otel.Meter(OtelName).Int64Counter("sync.table.panics", + panics, err := otel.Meter(ResourceName).Int64Counter(panicsMetricName, metric.WithDescription("Number of panics encountered while syncing a table"), metric.WithUnit("/{tot}"), ) @@ -41,61 +46,46 @@ func NewMetrics() *Metrics { return nil } - startTime, err := otel.Meter(OtelName).Int64Counter("sync.table.start_time", - metric.WithDescription("Start time of syncing a table"), + duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName, + metric.WithDescription("Duration of syncing a table"), metric.WithUnit("ns"), ) if err != nil { return nil } - endTime, err := otel.Meter(OtelName).Int64Counter("sync.table.end_time", - metric.WithDescription("End time of syncing a table"), - metric.WithUnit("ns"), - ) + return &Metrics{ + invocationID: invocationID, - if err != nil { - return nil - } + resources: resources, + errors: errors, + panics: panics, + duration: duration, - return &Metrics{ - TableClient: make(map[string]map[string]*TableClientMetrics), - resources: resources, - errors: errors, - panics: panics, - startTime: startTime, - endTime: endTime, + TableClient: make(map[string]map[string]*tableClientMetrics), } } -// Metrics is deprecated as we move toward open telemetry for tracing and metrics type Metrics struct { + invocationID string + resources metric.Int64Counter errors metric.Int64Counter panics metric.Int64Counter + duration metric.Int64Counter - startTime metric.Int64Counter - started bool - startedLock sync.Mutex - - endTime metric.Int64Counter - previousEndTime int64 - previousEndTimeLock sync.Mutex - - TableClient map[string]map[string]*TableClientMetrics -} - -type OtelMeters struct { - attributes []attribute.KeyValue + TableClient map[string]map[string]*tableClientMetrics } -type TableClientMetrics struct { - Resources uint64 - Errors uint64 - Panics uint64 - Duration atomic.Pointer[time.Duration] +type tableClientMetrics struct { + resources uint64 + errors uint64 + panics uint64 + duration atomic.Pointer[time.Duration] - otelMeters *OtelMeters + startTime time.Time + started bool + startedLock sync.Mutex } func durationPointerEqual(a, b *time.Duration) bool { @@ -105,8 +95,8 @@ func durationPointerEqual(a, b *time.Duration) bool { return b != nil && *a == *b } -func (m *TableClientMetrics) Equal(other *TableClientMetrics) bool { - return m.Resources == other.Resources && m.Errors == other.Errors && m.Panics == other.Panics && durationPointerEqual(m.Duration.Load(), other.Duration.Load()) +func (m *tableClientMetrics) Equal(other *tableClientMetrics) bool { + return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && durationPointerEqual(m.duration.Load(), other.duration.Load()) } // Equal compares to stats. Mostly useful in testing @@ -140,24 +130,24 @@ func (m *Metrics) Equal(other *Metrics) bool { return true } -func GetOtelAttributeSet(tableName string, clientID string) []attribute.KeyValue { - return []attribute.KeyValue{ - attribute.Key("sync.client.id").String(clientID), - attribute.Key("sync.table.name").String(tableName), +func (m *Metrics) NewSelector(clientID, tableName string) Selector { + return Selector{ + Set: attribute.NewSet( + attribute.Key("sync.invocation.id").String(m.invocationID), + attribute.Key("sync.table.name").String(tableName), + ), + clientID: clientID, + tableName: tableName, } } -func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { - m.TableClient[table.Name] = make(map[string]*TableClientMetrics, len(clients)) +func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) { + m.TableClient[table.Name] = make(map[string]*tableClientMetrics, len(clients)) for _, client := range clients { - tableName := table.Name - clientID := client.ID() - m.TableClient[tableName][clientID] = &TableClientMetrics{ - otelMeters: &OtelMeters{attributes: GetOtelAttributeSet(tableName, clientID)}, - } + m.TableClient[table.Name][client.ID()] = &tableClientMetrics{} } for _, relation := range table.Relations { - m.InitWithClients(relation, clients) + m.InitWithClients(invocationID, relation, clients) } } @@ -165,7 +155,7 @@ func (m *Metrics) TotalErrors() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += metrics.Errors + total += metrics.errors } } return total @@ -175,7 +165,7 @@ func (m *Metrics) TotalErrorsAtomic() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Errors) + total += atomic.LoadUint64(&metrics.errors) } } return total @@ -185,7 +175,7 @@ func (m *Metrics) TotalPanics() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += metrics.Panics + total += metrics.panics } } return total @@ -195,7 +185,7 @@ func (m *Metrics) TotalPanicsAtomic() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Panics) + total += atomic.LoadUint64(&metrics.panics) } } return total @@ -205,7 +195,7 @@ func (m *Metrics) TotalResources() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += metrics.Resources + total += metrics.resources } } return total @@ -215,57 +205,64 @@ func (m *Metrics) TotalResourcesAtomic() uint64 { var total uint64 for _, clientMetrics := range m.TableClient { for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.Resources) + total += atomic.LoadUint64(&metrics.resources) } } return total } -func (m *Metrics) OtelResourcesAdd(ctx context.Context, count int64, tc *TableClientMetrics) { - m.resources.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) - atomic.AddUint64(&tc.Resources, uint64(count)) +func (m *Metrics) ResourcesAdd(ctx context.Context, count int64, selector Selector) { + m.resources.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].resources, uint64(count)) } -func (m *Metrics) OtelErrorsAdd(ctx context.Context, count int64, tc *TableClientMetrics) { - m.errors.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) - atomic.AddUint64(&tc.Errors, uint64(count)) +func (m *Metrics) ResourcesGet(selector Selector) uint64 { + return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].resources) } -func (m *Metrics) OtelPanicsAdd(ctx context.Context, count int64, tc *TableClientMetrics) { - m.panics.Add(ctx, count, metric.WithAttributes(tc.otelMeters.attributes...)) - atomic.AddUint64(&tc.Panics, uint64(count)) +func (m *Metrics) ErrorsAdd(ctx context.Context, count int64, selector Selector) { + m.errors.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].errors, uint64(count)) } -func (m *Metrics) OtelStartTime(ctx context.Context, start time.Time, tc *TableClientMetrics) { - if m.startTime == nil { - return - } +func (m *Metrics) ErrorsGet(selector Selector) uint64 { + return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].errors) +} - // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) - m.startedLock.Lock() - defer m.startedLock.Unlock() - if m.started { - return - } +func (m *Metrics) PanicsAdd(ctx context.Context, count int64, selector Selector) { + m.panics.Add(ctx, count, metric.WithAttributeSet(selector.Set)) + atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].panics, uint64(count)) +} - m.started = true - m.startTime.Add(ctx, start.UnixNano(), metric.WithAttributes(tc.otelMeters.attributes...)) +func (m *Metrics) PanicsGet(selector Selector) uint64 { + return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].panics) } -func (m *Metrics) OtelEndTime(ctx context.Context, end time.Time, tc *TableClientMetrics) { - if m.endTime == nil { +func (m *Metrics) StartTime(start time.Time, selector Selector) { + tc := m.TableClient[selector.tableName][selector.clientID] + + // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) + tc.startedLock.Lock() + defer tc.startedLock.Unlock() + if tc.started { return } - m.previousEndTimeLock.Lock() - defer m.previousEndTimeLock.Unlock() - val := end.UnixNano() + tc.started = true + tc.startTime = start +} - // If we got another end time to report, use the latest value. This can happen for relational tables that are resolved multiple times (per parent resource) - if m.previousEndTime != 0 { - m.endTime.Add(ctx, val-m.previousEndTime, metric.WithAttributes(tc.otelMeters.attributes...)) - } else { - m.endTime.Add(ctx, val, metric.WithAttributes(tc.otelMeters.attributes...)) +func (m *Metrics) EndTime(ctx context.Context, end time.Time, selector Selector) { + tc := m.TableClient[selector.tableName][selector.clientID] + duration := time.Duration(end.UnixNano() - tc.startTime.UnixNano()) + tc.duration.Store(&duration) + m.duration.Add(ctx, duration.Nanoseconds(), metric.WithAttributeSet(selector.Set)) +} + +func (m *Metrics) DurationGet(selector Selector) *time.Duration { + tc := m.TableClient[selector.tableName][selector.clientID] + if tc == nil { + return nil } - m.previousEndTime = val + return tc.duration.Load() } diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 33a8cdd4e4..18ee454c66 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -3,12 +3,12 @@ package metrics import "testing" func TestMetrics(t *testing.T) { - s := NewMetrics() - s.TableClient["test_table"] = make(map[string]*TableClientMetrics) - s.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ - Resources: 1, - Errors: 2, - Panics: 3, + s := NewMetrics("test_invocation_id") + s.TableClient["test_table"] = make(map[string]*tableClientMetrics) + s.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{ + resources: 1, + errors: 2, + panics: 3, } if s.TotalResources() != 1 { t.Fatal("expected 1 resource") @@ -20,12 +20,12 @@ func TestMetrics(t *testing.T) { t.Fatal("expected 3 panics") } - other := NewMetrics() - other.TableClient["test_table"] = make(map[string]*TableClientMetrics) - other.TableClient["test_table"]["testExecutionClient"] = &TableClientMetrics{ - Resources: 1, - Errors: 2, - Panics: 3, + other := NewMetrics("test_invocation_id") + other.TableClient["test_table"] = make(map[string]*tableClientMetrics) + other.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{ + resources: 1, + errors: 2, + panics: 3, } if !s.Equal(other) { t.Fatal("expected metrics to be equal") diff --git a/scheduler/metrics/selector.go b/scheduler/metrics/selector.go new file mode 100644 index 0000000000..fa3db73115 --- /dev/null +++ b/scheduler/metrics/selector.go @@ -0,0 +1,10 @@ +package metrics + +import "go.opentelemetry.io/otel/attribute" + +type Selector struct { + attribute.Set + + clientID string + tableName string +} diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index ae63fb2ff5..792b39b01e 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/cloudquery/plugin-sdk/v4/transformers" + "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/require" ) @@ -44,8 +45,9 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc func TestScheduler(t *testing.T) { nopLogger := zerolog.Nop() - m := metrics.NewMetrics() - scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000)) + invocationID := uuid.New().String() + m := metrics.NewMetrics(invocationID) + scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(invocationID)) tableClients := []WorkUnit{ { Table: &schema.Table{ @@ -80,7 +82,7 @@ func TestScheduler(t *testing.T) { } for _, tc := range tableClients { - m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}) + m.InitWithClients(scheduler.invocationID, tc.Table, []schema.ClientMeta{tc.Client}) } resolvedResources := make(chan *schema.Resource) diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index cf0fa88a02..8a9df3a79e 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -5,7 +5,6 @@ import ( "fmt" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -71,7 +70,7 @@ func newWorker( func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource) { clientName := client.ID() - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, trace.WithAttributes( attribute.Key("sync.client.id").String(clientName), @@ -85,15 +84,16 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if parent == nil { // Log only for root tables, otherwise we spam too much. logger.Info().Msg("top level table resolver started") } - tableMetrics := w.metrics.TableClient[table.Name][clientName] + + selector := w.metrics.NewSelector(clientName, table.Name) defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(atomic.LoadUint64(&tableMetrics.Resources))), - attribute.Key("sync.errors").Int64(int64(atomic.LoadUint64(&tableMetrics.Errors))), - attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), + attribute.Key("sync.resources").Int64(int64(w.metrics.ResourcesGet(selector))), + attribute.Key("sync.errors").Int64(int64(w.metrics.ErrorsGet(selector))), + attribute.Key("sync.panics").Int64(int64(w.metrics.PanicsGet(selector))), )) }() - w.metrics.OtelStartTime(ctx, startTime, tableMetrics) + w.metrics.StartTime(startTime, selector) res := make(chan any) go func() { @@ -101,15 +101,13 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - w.metrics.OtelPanicsAdd(ctx, 1, tableMetrics) - atomic.AddUint64(&tableMetrics.Panics, 1) + w.metrics.PanicsAdd(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - w.metrics.OtelErrorsAdd(ctx, 1, tableMetrics) - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.ErrorsAdd(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -125,11 +123,13 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s } endTime := time.Now() - duration := endTime.Sub(startTime) - tableMetrics.Duration.Store(&duration) - w.metrics.OtelEndTime(ctx, endTime, tableMetrics) + w.metrics.EndTime(ctx, endTime, selector) + duration := w.metrics.DurationGet(selector) + if duration == nil { + duration = new(time.Duration) + } if parent == nil { - logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") + logger.Info().Uint64("resources", w.metrics.ResourcesGet(selector)).Uint64("errors", w.metrics.ErrorsGet(selector)).Dur("duration_ms", *duration).Msg("table sync finished") } } @@ -139,6 +139,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien return } + selector := w.metrics.NewSelector(client.ID(), table.Name) resourcesChan := make(chan *schema.Resource, len(resourcesSlice)) go func() { defer close(resourcesChan) @@ -154,9 +155,8 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien } if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { - tableMetrics := w.metrics.TableClient[table.Name][client.ID()] w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.ErrorsAdd(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -165,9 +165,8 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien if err := resolvedResource.Validate(); err != nil { switch err.(type) { case *schema.PKError: - tableMetrics := w.metrics.TableClient[table.Name][client.ID()] w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + w.metrics.ErrorsAdd(ctx, 1, selector) return case *schema.PKComponentError: w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index 935fca7721..a544c7e432 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "runtime/debug" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/caser" @@ -14,20 +13,20 @@ import ( "github.com/thoas/go-funk" ) -func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *metrics.TableClientMetrics, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) { +func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, selector metrics.Selector, client schema.ClientMeta, resource *schema.Resource, column schema.Column, c *caser.Caser) { columnStartTime := time.Now() defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Str("column", column.Name).Interface("error", err).TimeDiff("duration", time.Now(), columnStartTime).Str("stack", stack).Msg("column resolver finished with panic") - atomic.AddUint64(&tableMetrics.Panics, 1) + m.PanicsAdd(ctx, 1, selector) } }() if column.Resolver != nil { if err := column.Resolver(ctx, client, resource, column); err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.ErrorsAdd(ctx, 1, selector) } } else { // base use case: try to get column with CamelCase name @@ -36,7 +35,7 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *met err := resource.Set(column.Name, v) if err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.ErrorsAdd(ctx, 1, selector) } } } @@ -45,38 +44,41 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, tableMetrics *met func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource { ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() + resource := schema.NewResourceData(table, parent, item) objectStartTime := time.Now() + clientID := client.ID() - tableMetrics := m.TableClient[table.Name][clientID] tableLogger := logger.With().Str("table", table.Name).Str("client", clientID).Logger() + + selector := m.NewSelector(clientID, table.Name) + defer func() { if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) tableLogger.Error().Interface("error", err).TimeDiff("duration", time.Now(), objectStartTime).Str("stack", stack).Msg("resource resolver finished with panic") - atomic.AddUint64(&tableMetrics.Panics, 1) + m.PanicsAdd(ctx, 1, selector) } }() if table.PreResourceResolver != nil { if err := table.PreResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Err(err).Msg("pre resource resolver failed") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.ErrorsAdd(ctx, 1, selector) return nil } } for _, column := range table.Columns { - resolveColumn(ctx, tableLogger, tableMetrics, client, resource, column, c) + resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) } if table.PostResourceResolver != nil { if err := table.PostResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") - atomic.AddUint64(&tableMetrics.Errors, 1) + m.ErrorsAdd(ctx, 1, selector) } } - m.OtelResourcesAdd(ctx, 1, tableMetrics) - atomic.AddUint64(&tableMetrics.Resources, 1) + m.ResourcesAdd(ctx, 1, selector) return resource } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index c98e06c024..9154fa17b5 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -199,7 +199,7 @@ func (s *Scheduler) SyncAll(ctx context.Context, client schema.ClientMeta, table } func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables schema.Tables, res chan<- message.SyncMessage, opts ...SyncOption) error { - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync", trace.WithAttributes(attribute.Key("sync.invocation.id").String(s.invocationID)), ) @@ -209,7 +209,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s } syncClient := &syncClient{ - metrics: metrics.NewMetrics(), + metrics: metrics.NewMetrics(s.invocationID), tables: tables, client: client, scheduler: s, @@ -279,13 +279,14 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) { clientName := client.ID() for _, table := range tables { - m := s.metrics.TableClient[table.Name][clientName] - duration := m.Duration.Load() + selector := s.metrics.NewSelector(clientName, table.Name) + duration := s.metrics.DurationGet(selector) if duration == nil { // This can happen for a relation when there are no resources to resolve from the parent duration = new(time.Duration) } - s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", m.Resources).Dur("duration_ms", *duration).Uint64("errors", m.Errors).Msg("table sync finished") + + s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", s.metrics.ResourcesGet(selector)).Dur("duration_ms", *duration).Uint64("errors", s.metrics.ErrorsGet(selector)).Msg("table sync finished") s.logTablesMetrics(table.Relations, client) } } diff --git a/scheduler/scheduler_debug.go b/scheduler/scheduler_debug.go index 93639d936a..3b122b4037 100644 --- a/scheduler/scheduler_debug.go +++ b/scheduler/scheduler_debug.go @@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR preInitialisedClients[i] = clients // we do this here to avoid locks so we initialize the metrics structure once in the main goroutine // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(table, clients) + s.metrics.InitWithClients(s.invocationID, table, clients) } // First interleave the tables like in round-robin diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 65ed9835e4..0b5cb3ddd7 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -5,7 +5,6 @@ import ( "fmt" "runtime/debug" "sync" - "sync/atomic" "time" "github.com/cloudquery/plugin-sdk/v4/helpers" @@ -41,7 +40,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(table, clients) + s.metrics.InitWithClients(s.invocationID, table, clients) } tableClients := make([]tableClient, 0) @@ -79,7 +78,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resolvedResources chan<- *schema.Resource, depth int) { clientName := client.ID() - ctx, span := otel.Tracer(metrics.OtelName).Start(ctx, + ctx, span := otel.Tracer(metrics.ResourceName).Start(ctx, "sync.table."+table.Name, trace.WithAttributes( attribute.Key("sync.client.id").String(clientName), @@ -95,15 +94,15 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c logger.Info().Msg("top level table resolver started") } - tableMetrics := s.metrics.TableClient[table.Name][clientName] + selector := s.metrics.NewSelector(clientName, table.Name) defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(atomic.LoadUint64(&tableMetrics.Resources))), - attribute.Key("sync.errors").Int64(int64(atomic.LoadUint64(&tableMetrics.Errors))), - attribute.Key("sync.panics").Int64(int64(atomic.LoadUint64(&tableMetrics.Panics))), + attribute.Key("sync.resources").Int64(int64(s.metrics.ResourcesGet(selector))), + attribute.Key("sync.errors").Int64(int64(s.metrics.ErrorsGet(selector))), + attribute.Key("sync.panics").Int64(int64(s.metrics.PanicsGet(selector))), )) }() - s.metrics.OtelStartTime(ctx, startTime, tableMetrics) + s.metrics.StartTime(startTime, selector) res := make(chan any) go func() { @@ -111,15 +110,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - s.metrics.OtelPanicsAdd(ctx, 1, tableMetrics) - atomic.AddUint64(&tableMetrics.Panics, 1) + s.metrics.PanicsAdd(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - s.metrics.OtelErrorsAdd(ctx, 1, tableMetrics) - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.ErrorsAdd(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -140,11 +137,10 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c // we don't need any waitgroups here because we are waiting for the channel to close endTime := time.Now() - duration := endTime.Sub(startTime) - tableMetrics.Duration.Store(&duration) - s.metrics.OtelEndTime(ctx, endTime, tableMetrics) + s.metrics.EndTime(ctx, endTime, selector) + duration := s.metrics.DurationGet(selector) if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. - logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished") + logger.Info().Uint64("resources", s.metrics.ResourcesGet(selector)).Uint64("errors", s.metrics.ErrorsGet(selector)).Dur("duration_ms", *duration).Msg("table sync finished") s.logTablesMetrics(table.Relations, client) } } @@ -154,6 +150,9 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl if len(resourcesSlice) == 0 { return } + + selector := s.metrics.NewSelector(client.ID(), table.Name) + resourcesChan := make(chan *schema.Resource, len(resourcesSlice)) go func() { defer close(resourcesChan) @@ -191,9 +190,8 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl } if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { - tableMetrics := s.metrics.TableClient[table.Name][client.ID()] s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.ErrorsAdd(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -202,9 +200,8 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl if err := resolvedResource.Validate(); err != nil { switch err.(type) { case *schema.PKError: - tableMetrics := s.metrics.TableClient[table.Name][client.ID()] s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - atomic.AddUint64(&tableMetrics.Errors, 1) + s.metrics.ErrorsAdd(ctx, 1, selector) return case *schema.PKComponentError: s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") diff --git a/scheduler/scheduler_round_robin.go b/scheduler/scheduler_round_robin.go index fa830f29fc..11a71a544c 100644 --- a/scheduler/scheduler_round_robin.go +++ b/scheduler/scheduler_round_robin.go @@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan< preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(table, clients) + s.metrics.InitWithClients(s.invocationID, table, clients) } tableClients := roundRobinInterleave(s.tables, preInitialisedClients) diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index ad91fe7532..49f7d3eba0 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(table, clients) + s.metrics.InitWithClients(s.invocationID, table, clients) } // First interleave the tables like in round-robin diff --git a/scheduler/scheduler_shuffle_queue.go b/scheduler/scheduler_shuffle_queue.go index 0bc5867cd7..db5f4b5c30 100644 --- a/scheduler/scheduler_shuffle_queue.go +++ b/scheduler/scheduler_shuffle_queue.go @@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(table, clients) + s.metrics.InitWithClients(s.invocationID, table, clients) } tableClients := roundRobinInterleave(s.tables, preInitialisedClients) diff --git a/serve/opentelemetry.go b/serve/opentelemetry.go index 680aeaa2eb..b65eebe8c1 100644 --- a/serve/opentelemetry.go +++ b/serve/opentelemetry.go @@ -11,14 +11,13 @@ import ( "github.com/cloudquery/plugin-sdk/v4/plugin" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" - "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/prometheus" otellog "go.opentelemetry.io/otel/log" logglobal "go.opentelemetry.io/otel/log/global" "go.opentelemetry.io/otel/sdk/log" @@ -123,7 +122,7 @@ func getLogsProcessor(ctx context.Context, opts otelConfig) (*log.BatchProcessor } func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, otelEndpoint string, otelEndpointInsecure bool) (shutdown func(), err error) { - exporter, err := prometheus.New() + prometheusReader, err := prometheus.New() if err != nil { return nil, err } @@ -158,7 +157,7 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote mt := metric.NewMeterProvider( metric.WithReader(metricReader), - metric.WithReader(exporter), + metric.WithReader(prometheusReader), metric.WithResource(pluginResource), ) From b8496f9dee1c2af15b634a22dc3936867cd6d5b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Sun, 8 Jun 2025 20:55:13 +0300 Subject: [PATCH 3/9] fix: working per-table & per-client duration --- scheduler/metrics/duration.go | 38 +++++++ scheduler/metrics/metrics.go | 164 +++++++++++++------------------ scheduler/queue/worker.go | 20 ++-- scheduler/resolvers/resolvers.go | 14 +-- scheduler/scheduler.go | 9 +- scheduler/scheduler_dfs.go | 32 +++--- 6 files changed, 141 insertions(+), 136 deletions(-) create mode 100644 scheduler/metrics/duration.go diff --git a/scheduler/metrics/duration.go b/scheduler/metrics/duration.go new file mode 100644 index 0000000000..71ecd4eec1 --- /dev/null +++ b/scheduler/metrics/duration.go @@ -0,0 +1,38 @@ +package metrics + +import ( + "sync" + "time" +) + +type durationMeasurement struct { + startTime time.Time + started bool + duration time.Duration + sem sync.Mutex +} + +func (dm *durationMeasurement) Start(start time.Time) { + // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) + dm.sem.Lock() + defer dm.sem.Unlock() + if dm.started { + return + } + + dm.started = true + dm.startTime = start +} + +// End calculates, updates and returns the delta duration for updating OTEL counters. +func (dm *durationMeasurement) End(end time.Time) time.Duration { + var delta time.Duration + newDuration := end.Sub(dm.startTime) + + dm.sem.Lock() + defer dm.sem.Unlock() + + delta = newDuration - dm.duration + dm.duration = newDuration + return delta +} diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 5c15102dda..d665f54c36 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -2,7 +2,6 @@ package metrics import ( "context" - "sync" "sync/atomic" "time" @@ -48,7 +47,7 @@ func NewMetrics(invocationID string) *Metrics { duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName, metric.WithDescription("Duration of syncing a table"), - metric.WithUnit("ns"), + metric.WithUnit("ms"), ) if err != nil { return nil @@ -62,7 +61,7 @@ func NewMetrics(invocationID string) *Metrics { panics: panics, duration: duration, - TableClient: make(map[string]map[string]*tableClientMetrics), + measurements: make(map[string]tableMeasurements), } } @@ -74,55 +73,49 @@ type Metrics struct { panics metric.Int64Counter duration metric.Int64Counter - TableClient map[string]map[string]*tableClientMetrics + measurements map[string]tableMeasurements } -type tableClientMetrics struct { +type tableMeasurements struct { + duration *durationMeasurement + clients map[string]*measurement +} + +type measurement struct { resources uint64 errors uint64 panics uint64 - duration atomic.Pointer[time.Duration] - - startTime time.Time - started bool - startedLock sync.Mutex -} - -func durationPointerEqual(a, b *time.Duration) bool { - if a == nil { - return b == nil - } - return b != nil && *a == *b + duration *durationMeasurement } -func (m *tableClientMetrics) Equal(other *tableClientMetrics) bool { - return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && durationPointerEqual(m.duration.Load(), other.duration.Load()) +func (m *measurement) Equal(other *measurement) bool { + return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && m.duration == other.duration } // Equal compares to stats. Mostly useful in testing func (m *Metrics) Equal(other *Metrics) bool { - for table, clientStats := range m.TableClient { - for client, stats := range clientStats { - if _, ok := other.TableClient[table]; !ok { + for table, clientStats := range m.measurements { + for client, stats := range clientStats.clients { + if _, ok := other.measurements[table]; !ok { return false } - if _, ok := other.TableClient[table][client]; !ok { + if _, ok := other.measurements[table].clients[client]; !ok { return false } - if !stats.Equal(other.TableClient[table][client]) { + if !stats.Equal(other.measurements[table].clients[client]) { return false } } } - for table, clientStats := range other.TableClient { - for client, stats := range clientStats { - if _, ok := m.TableClient[table]; !ok { + for table, clientStats := range other.measurements { + for client, stats := range clientStats.clients { + if _, ok := m.measurements[table]; !ok { return false } - if _, ok := m.TableClient[table][client]; !ok { + if _, ok := m.measurements[table].clients[client]; !ok { return false } - if !stats.Equal(m.TableClient[table][client]) { + if !stats.Equal(m.measurements[table].clients[client]) { return false } } @@ -142,9 +135,9 @@ func (m *Metrics) NewSelector(clientID, tableName string) Selector { } func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) { - m.TableClient[table.Name] = make(map[string]*tableClientMetrics, len(clients)) + m.measurements[table.Name] = tableMeasurements{clients: make(map[string]*measurement), duration: &durationMeasurement{}} for _, client := range clients { - m.TableClient[table.Name][client.ID()] = &tableClientMetrics{} + m.measurements[table.Name].clients[client.ID()] = &measurement{duration: &durationMeasurement{}} } for _, relation := range table.Relations { m.InitWithClients(invocationID, relation, clients) @@ -153,116 +146,101 @@ func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clie func (m *Metrics) TotalErrors() uint64 { var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += metrics.errors + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.errors) } } return total } +// Deprecated: Use TotalErrors instead, it provides the same functionality but is more consistent with the naming of other metrics methods. func (m *Metrics) TotalErrorsAtomic() uint64 { - var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.errors) - } - } - return total + return m.TotalErrors() } func (m *Metrics) TotalPanics() uint64 { var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += metrics.panics + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.panics) } } return total } +// Deprecated: Use TotalPanics instead, it provides the same functionality but is more consistent with the naming of other metrics methods. func (m *Metrics) TotalPanicsAtomic() uint64 { - var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.panics) - } - } - return total + return m.TotalPanics() } func (m *Metrics) TotalResources() uint64 { var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += metrics.resources + for _, clientMetrics := range m.measurements { + for _, metrics := range clientMetrics.clients { + total += atomic.LoadUint64(&metrics.resources) } } return total } +// Deprecated: Use TotalResources instead, it provides the same functionality but is more consistent with the naming of other metrics methods. func (m *Metrics) TotalResourcesAtomic() uint64 { - var total uint64 - for _, clientMetrics := range m.TableClient { - for _, metrics := range clientMetrics { - total += atomic.LoadUint64(&metrics.resources) - } - } - return total + return m.TotalResources() } -func (m *Metrics) ResourcesAdd(ctx context.Context, count int64, selector Selector) { +func (m *Metrics) TableDuration(tableName string) time.Duration { + tc := m.measurements[tableName] + return tc.duration.duration +} + +func (m *Metrics) AddResources(ctx context.Context, count int64, selector Selector) { m.resources.Add(ctx, count, metric.WithAttributeSet(selector.Set)) - atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].resources, uint64(count)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources, uint64(count)) } -func (m *Metrics) ResourcesGet(selector Selector) uint64 { - return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].resources) +func (m *Metrics) GetResources(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].resources) } -func (m *Metrics) ErrorsAdd(ctx context.Context, count int64, selector Selector) { +func (m *Metrics) AddErrors(ctx context.Context, count int64, selector Selector) { m.errors.Add(ctx, count, metric.WithAttributeSet(selector.Set)) - atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].errors, uint64(count)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors, uint64(count)) } -func (m *Metrics) ErrorsGet(selector Selector) uint64 { - return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].errors) +func (m *Metrics) GetErrors(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].errors) } -func (m *Metrics) PanicsAdd(ctx context.Context, count int64, selector Selector) { +func (m *Metrics) AddPanics(ctx context.Context, count int64, selector Selector) { m.panics.Add(ctx, count, metric.WithAttributeSet(selector.Set)) - atomic.AddUint64(&m.TableClient[selector.tableName][selector.clientID].panics, uint64(count)) + atomic.AddUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics, uint64(count)) } -func (m *Metrics) PanicsGet(selector Selector) uint64 { - return atomic.LoadUint64(&m.TableClient[selector.tableName][selector.clientID].panics) +func (m *Metrics) GetPanics(selector Selector) uint64 { + return atomic.LoadUint64(&m.measurements[selector.tableName].clients[selector.clientID].panics) } func (m *Metrics) StartTime(start time.Time, selector Selector) { - tc := m.TableClient[selector.tableName][selector.clientID] - - // If we have already started, don't start again. This can happen for relational tables that are resolved multiple times (per parent resource) - tc.startedLock.Lock() - defer tc.startedLock.Unlock() - if tc.started { - return - } + t := m.measurements[selector.tableName] + tc := t.clients[selector.clientID] - tc.started = true - tc.startTime = start + tc.duration.Start(start) + t.duration.Start(start) } func (m *Metrics) EndTime(ctx context.Context, end time.Time, selector Selector) { - tc := m.TableClient[selector.tableName][selector.clientID] - duration := time.Duration(end.UnixNano() - tc.startTime.UnixNano()) - tc.duration.Store(&duration) - m.duration.Add(ctx, duration.Nanoseconds(), metric.WithAttributeSet(selector.Set)) + t := m.measurements[selector.tableName] + tc := t.clients[selector.clientID] + + _ = tc.duration.End(end) + delta := t.duration.End(end) + + // only compute and add the total duration for per-table measurements (and not per-client) + m.duration.Add(ctx, delta.Milliseconds(), metric.WithAttributeSet(selector.Set)) } -func (m *Metrics) DurationGet(selector Selector) *time.Duration { - tc := m.TableClient[selector.tableName][selector.clientID] - if tc == nil { - return nil - } - return tc.duration.Load() +func (m *Metrics) GetDuration(selector Selector) time.Duration { + tc := m.measurements[selector.tableName].clients[selector.clientID] + return tc.duration.duration } diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index 8a9df3a79e..afec99ee59 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -88,9 +88,9 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s selector := w.metrics.NewSelector(clientName, table.Name) defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(w.metrics.ResourcesGet(selector))), - attribute.Key("sync.errors").Int64(int64(w.metrics.ErrorsGet(selector))), - attribute.Key("sync.panics").Int64(int64(w.metrics.PanicsGet(selector))), + attribute.Key("sync.resources").Int64(int64(w.metrics.GetResources(selector))), + attribute.Key("sync.errors").Int64(int64(w.metrics.GetErrors(selector))), + attribute.Key("sync.panics").Int64(int64(w.metrics.GetPanics(selector))), )) }() w.metrics.StartTime(startTime, selector) @@ -101,13 +101,13 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - w.metrics.PanicsAdd(ctx, 1, selector) + w.metrics.AddPanics(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - w.metrics.ErrorsAdd(ctx, 1, selector) + w.metrics.AddErrors(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -124,12 +124,8 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s endTime := time.Now() w.metrics.EndTime(ctx, endTime, selector) - duration := w.metrics.DurationGet(selector) - if duration == nil { - duration = new(time.Duration) - } if parent == nil { - logger.Info().Uint64("resources", w.metrics.ResourcesGet(selector)).Uint64("errors", w.metrics.ErrorsGet(selector)).Dur("duration_ms", *duration).Msg("table sync finished") + logger.Info().Uint64("resources", w.metrics.GetResources(selector)).Uint64("errors", w.metrics.GetErrors(selector)).Dur("duration_ms", w.metrics.GetDuration(selector)).Msg("table sync finished") } } @@ -156,7 +152,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - w.metrics.ErrorsAdd(ctx, 1, selector) + w.metrics.AddErrors(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -166,7 +162,7 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien switch err.(type) { case *schema.PKError: w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - w.metrics.ErrorsAdd(ctx, 1, selector) + w.metrics.AddErrors(ctx, 1, selector) return case *schema.PKComponentError: w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index a544c7e432..56f8ead9e6 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -19,14 +19,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Str("column", column.Name).Interface("error", err).TimeDiff("duration", time.Now(), columnStartTime).Str("stack", stack).Msg("column resolver finished with panic") - m.PanicsAdd(ctx, 1, selector) + m.AddPanics(ctx, 1, selector) } }() if column.Resolver != nil { if err := column.Resolver(ctx, client, resource, column); err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - m.ErrorsAdd(ctx, 1, selector) + m.AddErrors(ctx, 1, selector) } } else { // base use case: try to get column with CamelCase name @@ -35,7 +35,7 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric err := resource.Set(column.Name, v) if err != nil { logger.Error().Err(err).Msg("column resolver finished with error") - m.ErrorsAdd(ctx, 1, selector) + m.AddErrors(ctx, 1, selector) } } } @@ -57,13 +57,13 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) tableLogger.Error().Interface("error", err).TimeDiff("duration", time.Now(), objectStartTime).Str("stack", stack).Msg("resource resolver finished with panic") - m.PanicsAdd(ctx, 1, selector) + m.AddPanics(ctx, 1, selector) } }() if table.PreResourceResolver != nil { if err := table.PreResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Err(err).Msg("pre resource resolver failed") - m.ErrorsAdd(ctx, 1, selector) + m.AddErrors(ctx, 1, selector) return nil } } @@ -75,10 +75,10 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric if table.PostResourceResolver != nil { if err := table.PostResourceResolver(ctx, client, resource); err != nil { tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") - m.ErrorsAdd(ctx, 1, selector) + m.AddErrors(ctx, 1, selector) } } - m.ResourcesAdd(ctx, 1, selector) + m.AddResources(ctx, 1, selector) return resource } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 9154fa17b5..59c7e666f3 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "sync" - "time" "github.com/cloudquery/plugin-sdk/v4/caser" "github.com/cloudquery/plugin-sdk/v4/message" @@ -280,13 +279,7 @@ func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) { clientName := client.ID() for _, table := range tables { selector := s.metrics.NewSelector(clientName, table.Name) - duration := s.metrics.DurationGet(selector) - if duration == nil { - // This can happen for a relation when there are no resources to resolve from the parent - duration = new(time.Duration) - } - - s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", s.metrics.ResourcesGet(selector)).Dur("duration_ms", *duration).Uint64("errors", s.metrics.ErrorsGet(selector)).Msg("table sync finished") + s.logger.Info().Str("table", table.Name).Str("client", clientName).Uint64("resources", s.metrics.GetResources(selector)).Dur("duration_ms", s.metrics.GetDuration(selector)).Uint64("errors", s.metrics.GetErrors(selector)).Msg("table sync finished") s.logTablesMetrics(table.Relations, client) } } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index 0b5cb3ddd7..c48c4345a2 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -93,16 +93,23 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if parent == nil { // Log only for root tables, otherwise we spam too much. logger.Info().Msg("top level table resolver started") } - selector := s.metrics.NewSelector(clientName, table.Name) + s.metrics.StartTime(startTime, selector) + defer func() { span.AddEvent("sync.finish.stats", trace.WithAttributes( - attribute.Key("sync.resources").Int64(int64(s.metrics.ResourcesGet(selector))), - attribute.Key("sync.errors").Int64(int64(s.metrics.ErrorsGet(selector))), - attribute.Key("sync.panics").Int64(int64(s.metrics.PanicsGet(selector))), + attribute.Key("sync.resources").Int64(int64(s.metrics.GetResources(selector))), + attribute.Key("sync.errors").Int64(int64(s.metrics.GetErrors(selector))), + attribute.Key("sync.panics").Int64(int64(s.metrics.GetPanics(selector))), )) + + endTime := time.Now() + s.metrics.EndTime(ctx, endTime, selector) + if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. + logger.Info().Uint64("resources", s.metrics.GetResources(selector)).Uint64("errors", s.metrics.GetErrors(selector)).Dur("duration_ms", s.metrics.GetDuration(selector)).Msg("table sync finished") + s.logTablesMetrics(table.Relations, client) + } }() - s.metrics.StartTime(startTime, selector) res := make(chan any) go func() { @@ -110,13 +117,13 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c if err := recover(); err != nil { stack := fmt.Sprintf("%s\n%s", err, string(debug.Stack())) logger.Error().Interface("error", err).Str("stack", stack).Msg("table resolver finished with panic") - s.metrics.PanicsAdd(ctx, 1, selector) + s.metrics.AddPanics(ctx, 1, selector) } close(res) }() if err := table.Resolver(ctx, client, parent, res); err != nil { logger.Error().Err(err).Msg("table resolver finished with error") - s.metrics.ErrorsAdd(ctx, 1, selector) + s.metrics.AddErrors(ctx, 1, selector) // Send SyncError message syncErrorMsg := &message.SyncError{ TableName: table.Name, @@ -136,13 +143,6 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c batchSender.Close() // we don't need any waitgroups here because we are waiting for the channel to close - endTime := time.Now() - s.metrics.EndTime(ctx, endTime, selector) - duration := s.metrics.DurationGet(selector) - if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table. - logger.Info().Uint64("resources", s.metrics.ResourcesGet(selector)).Uint64("errors", s.metrics.ErrorsGet(selector)).Dur("duration_ms", *duration).Msg("table sync finished") - s.logTablesMetrics(table.Relations, client) - } } func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, resources any, resolvedResources chan<- *schema.Resource, depth int) { @@ -191,7 +191,7 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - s.metrics.ErrorsAdd(ctx, 1, selector) + s.metrics.AddErrors(ctx, 1, selector) return } if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { @@ -201,7 +201,7 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl switch err.(type) { case *schema.PKError: s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") - s.metrics.ErrorsAdd(ctx, 1, selector) + s.metrics.AddErrors(ctx, 1, selector) return case *schema.PKComponentError: s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") From 457f17adc783bad81ce4f370b1ce9c77ab1f64ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Mon, 9 Jun 2025 13:32:09 +0300 Subject: [PATCH 4/9] fix: tests, remove prometheus client --- go.mod | 8 -- go.sum | 16 ---- scheduler/metrics/metrics_test.go | 137 ++++++++++++++++++++++++------ serve/opentelemetry.go | 18 ---- 4 files changed, 112 insertions(+), 67 deletions(-) diff --git a/go.mod b/go.mod index 0a4de9fcb7..b1366402c9 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.8 github.com/invopop/jsonschema v0.13.0 github.com/mitchellh/hashstructure/v2 v2.0.2 - github.com/prometheus/client_golang v1.22.0 github.com/rs/zerolog v1.34.0 github.com/samber/lo v1.49.1 github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 @@ -33,7 +32,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.36.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 - go.opentelemetry.io/otel/exporters/prometheus v0.58.0 go.opentelemetry.io/otel/log v0.12.2 go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/sdk v1.36.0 @@ -64,10 +62,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect github.com/aws/smithy-go v1.22.4 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/beorn7/perks v1.0.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v5 v5.0.2 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -81,13 +77,9 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oapi-codegen/runtime v1.1.1 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.64.0 // indirect - github.com/prometheus/procfs v0.16.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect diff --git a/go.sum b/go.sum index 5b28e8fbd9..376de6b4ec 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,6 @@ github.com/aws/smithy-go v1.22.4 h1:uqXzVZNuNexwc/xrh6Tb56u89WDlJY6HS+KC0S4QSjw= github.com/aws/smithy-go v1.22.4/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= -github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M= github.com/bradleyjkemp/cupaloy/v2 v2.8.0/go.mod h1:bm7JXdkRd4BHJk9HpwqAI8BoAY1lps46Enkdqw6aRX0= @@ -117,8 +115,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -133,8 +129,6 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= @@ -142,14 +136,6 @@ github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= -github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= -github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= -github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= -github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4= -github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= -github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= -github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= @@ -200,8 +186,6 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0 h1:dNzwXjZKpMpE2JhmO+9 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.36.0/go.mod h1:90PoxvaEB5n6AOdZvi+yWJQoE95U8Dhhw2bSyRqnTD0= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0 h1:nRVXXvf78e00EwY6Wp0YII8ww2JVWshZ20HfTlE11AM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.36.0/go.mod h1:r49hO7CgrxY9Voaj3Xe8pANWtr0Oq916d0XAmOoCZAQ= -go.opentelemetry.io/otel/exporters/prometheus v0.58.0 h1:CJAxWKFIqdBennqxJyOgnt5LqkeFRT+Mz3Yjz3hL+h8= -go.opentelemetry.io/otel/exporters/prometheus v0.58.0/go.mod h1:7qo/4CLI+zYSNbv0GMNquzuss2FVZo3OYrGh96n4HNc= go.opentelemetry.io/otel/log v0.12.2 h1:yob9JVHn2ZY24byZeaXpTVoPS6l+UrrxmxmPKohXTwc= go.opentelemetry.io/otel/log v0.12.2/go.mod h1:ShIItIxSYxufUMt+1H5a2wbckGli3/iCfuEbVZi/98E= go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 18ee454c66..65e8347946 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -1,33 +1,120 @@ package metrics -import "testing" +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) func TestMetrics(t *testing.T) { - s := NewMetrics("test_invocation_id") - s.TableClient["test_table"] = make(map[string]*tableClientMetrics) - s.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{ - resources: 1, - errors: 2, - panics: 3, - } - if s.TotalResources() != 1 { - t.Fatal("expected 1 resource") - } - if s.TotalErrors() != 2 { - t.Fatal("expected 2 error") - } - if s.TotalPanics() != 3 { - t.Fatal("expected 3 panics") - } + m := NewMetrics("test_invocation_id") - other := NewMetrics("test_invocation_id") - other.TableClient["test_table"] = make(map[string]*tableClientMetrics) - other.TableClient["test_table"]["testExecutionClient"] = &tableClientMetrics{ - resources: 1, - errors: 2, - panics: 3, + m.measurements["test_table_1"] = tableMeasurements{ + clients: map[string]*measurement{ + "test_client_1": {duration: &durationMeasurement{}}, + "test_client_2": {duration: &durationMeasurement{}}, + }, + duration: &durationMeasurement{}, } - if !s.Equal(other) { - t.Fatal("expected metrics to be equal") + m.measurements["test_table_2"] = tableMeasurements{ + clients: map[string]*measurement{ + "test_client_1": {duration: &durationMeasurement{}}, + }, + duration: &durationMeasurement{}, } + + require.Equal(t, m.TotalResources(), uint64(0)) + require.Equal(t, m.TotalErrors(), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + s1 := m.NewSelector("test_client_1", "test_table_1") + + // test single table, single client + m.StartTime(time.Now(), s1) + m.AddResources(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + m.AddErrors(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s1), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(0)) + + m.AddPanics(t.Context(), 1, s1) + require.Equal(t, m.TotalResources(), uint64(1)) + require.Equal(t, m.GetResources(s1), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s1), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s1), uint64(1)) + m.EndTime(t.Context(), time.Now(), s1) + + // test single table, multiple clients + s2 := m.NewSelector("test_client_2", "test_table_1") + + m.StartTime(time.Now(), s2) + m.AddResources(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(1)) + require.Equal(t, m.GetErrors(s2), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s2), uint64(0)) + + m.AddErrors(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s2), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(1)) + require.Equal(t, m.GetPanics(s2), uint64(0)) + + m.AddPanics(t.Context(), 1, s2) + require.Equal(t, m.TotalResources(), uint64(2)) + require.Equal(t, m.GetResources(s2), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s2), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s2), uint64(1)) + m.EndTime(t.Context(), time.Now(), s2) + + // test multiple tables, multiple clients + s3 := m.NewSelector("test_client_1", "test_table_2") + + m.StartTime(time.Now(), s3) + m.AddResources(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(2)) + require.Equal(t, m.GetErrors(s3), uint64(0)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s3), uint64(0)) + + m.AddErrors(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(3)) + require.Equal(t, m.GetErrors(s3), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(2)) + require.Equal(t, m.GetPanics(s3), uint64(0)) + + m.AddPanics(t.Context(), 1, s3) + require.Equal(t, m.TotalResources(), uint64(3)) + require.Equal(t, m.GetResources(s3), uint64(1)) + require.Equal(t, m.TotalErrors(), uint64(3)) + require.Equal(t, m.GetErrors(s3), uint64(1)) + require.Equal(t, m.TotalPanics(), uint64(3)) + require.Equal(t, m.GetPanics(s3), uint64(1)) + m.EndTime(t.Context(), time.Now(), s3) + + require.Greater(t, m.GetDuration(s1), 0*time.Millisecond) + require.Greater(t, m.GetDuration(s2), 0*time.Millisecond) + + // This should work because the 2 metrics are built sequentially; in practice though, this is probably not the case. + require.GreaterOrEqual(t, m.TableDuration(s1.tableName), m.GetDuration(s1)+m.GetDuration(s2)) } diff --git a/serve/opentelemetry.go b/serve/opentelemetry.go index b65eebe8c1..9995846b11 100644 --- a/serve/opentelemetry.go +++ b/serve/opentelemetry.go @@ -4,12 +4,10 @@ import ( "context" "encoding/json" "fmt" - "net/http" "reflect" "time" "github.com/cloudquery/plugin-sdk/v4/plugin" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -17,7 +15,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/exporters/prometheus" otellog "go.opentelemetry.io/otel/log" logglobal "go.opentelemetry.io/otel/log/global" "go.opentelemetry.io/otel/sdk/log" @@ -122,11 +119,6 @@ func getLogsProcessor(ctx context.Context, opts otelConfig) (*log.BatchProcessor } func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, otelEndpoint string, otelEndpointInsecure bool) (shutdown func(), err error) { - prometheusReader, err := prometheus.New() - if err != nil { - return nil, err - } - if otelEndpoint == "" { return nil, nil } @@ -157,7 +149,6 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote mt := metric.NewMeterProvider( metric.WithReader(metricReader), - metric.WithReader(prometheusReader), metric.WithResource(pluginResource), ) @@ -173,15 +164,6 @@ func setupOtel(ctx context.Context, logger zerolog.Logger, p *plugin.Plugin, ote otel.SetMeterProvider(mt) logglobal.SetLoggerProvider(lp) - go func() { - logger.Info().Msg("serving metrics at localhost:9121/metrics") - http.Handle("/metrics", promhttp.Handler()) - if err := http.ListenAndServe(":9121", nil); err != nil { - fmt.Printf("error serving http: %v", err) - return - } - }() - shutdown = func() { if err := tp.Shutdown(context.Background()); err != nil { logger.Error().Err(err).Msg("failed to shutdown OTLP trace provider") From c1c41cd1d64316e35c7583b1bf08fdcca1a2e12e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Thu, 10 Jul 2025 14:42:06 +0300 Subject: [PATCH 5/9] fix: lower durations in tests --- scheduler/metrics/metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 65e8347946..dd958d445f 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -112,8 +112,8 @@ func TestMetrics(t *testing.T) { require.Equal(t, m.GetPanics(s3), uint64(1)) m.EndTime(t.Context(), time.Now(), s3) - require.Greater(t, m.GetDuration(s1), 0*time.Millisecond) - require.Greater(t, m.GetDuration(s2), 0*time.Millisecond) + require.Greater(t, m.GetDuration(s1), 0*time.Nanosecond) + require.Greater(t, m.GetDuration(s2), 0*time.Nanosecond) // This should work because the 2 metrics are built sequentially; in practice though, this is probably not the case. require.GreaterOrEqual(t, m.TableDuration(s1.tableName), m.GetDuration(s1)+m.GetDuration(s2)) From 0fb408d34342164ccdadcd59719dec8d6f218e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Thu, 10 Jul 2025 14:56:37 +0300 Subject: [PATCH 6/9] fix: Ensure operations take longer than 1ms --- scheduler/metrics/metrics_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index dd958d445f..2d7dcfc1df 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -52,6 +52,8 @@ func TestMetrics(t *testing.T) { require.Equal(t, m.GetErrors(s1), uint64(1)) require.Equal(t, m.TotalPanics(), uint64(1)) require.Equal(t, m.GetPanics(s1), uint64(1)) + + time.Sleep(1 * time.Millisecond) m.EndTime(t.Context(), time.Now(), s1) // test single table, multiple clients @@ -81,6 +83,8 @@ func TestMetrics(t *testing.T) { require.Equal(t, m.GetErrors(s2), uint64(1)) require.Equal(t, m.TotalPanics(), uint64(2)) require.Equal(t, m.GetPanics(s2), uint64(1)) + + time.Sleep(1 * time.Millisecond) m.EndTime(t.Context(), time.Now(), s2) // test multiple tables, multiple clients @@ -110,6 +114,8 @@ func TestMetrics(t *testing.T) { require.Equal(t, m.GetErrors(s3), uint64(1)) require.Equal(t, m.TotalPanics(), uint64(3)) require.Equal(t, m.GetPanics(s3), uint64(1)) + + time.Sleep(1 * time.Millisecond) m.EndTime(t.Context(), time.Now(), s3) require.Greater(t, m.GetDuration(s1), 0*time.Nanosecond) From fd813fc524d985569eadbbfbd9b22fa8ba5c1f71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Wed, 16 Jul 2025 16:02:06 +0300 Subject: [PATCH 7/9] fix: Remove invocationID from metric point attributes --- scheduler/metrics/metrics.go | 105 ++++++++------------------- scheduler/queue/scheduler_test.go | 2 +- scheduler/scheduler_debug.go | 2 +- scheduler/scheduler_dfs.go | 2 +- scheduler/scheduler_round_robin.go | 2 +- scheduler/scheduler_shuffle.go | 2 +- scheduler/scheduler_shuffle_queue.go | 2 +- 7 files changed, 38 insertions(+), 79 deletions(-) diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index d665f54c36..8782af31a9 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -2,6 +2,7 @@ package metrics import ( "context" + "sync" "sync/atomic" "time" @@ -20,42 +21,38 @@ const ( durationMetricName = "sync.table.duration" ) -func NewMetrics(invocationID string) *Metrics { - resources, err := otel.Meter(ResourceName).Int64Counter(resourcesMetricName, - metric.WithDescription("Number of resources synced for a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - errors, err := otel.Meter(ResourceName).Int64Counter(errorsMetricName, - metric.WithDescription("Number of errors encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } - - panics, err := otel.Meter(ResourceName).Int64Counter(panicsMetricName, - metric.WithDescription("Number of panics encountered while syncing a table"), - metric.WithUnit("/{tot}"), - ) - if err != nil { - return nil - } +var ( + resources metric.Int64Counter + errors metric.Int64Counter + panics metric.Int64Counter + duration metric.Int64Counter + once sync.Once +) - duration, err := otel.Meter(ResourceName).Int64Counter(durationMetricName, - metric.WithDescription("Duration of syncing a table"), - metric.WithUnit("ms"), - ) - if err != nil { - return nil - } +func NewMetrics(invocationID string) *Metrics { + once.Do(func() { + resources, _ = otel.Meter(ResourceName).Int64Counter(resourcesMetricName, + metric.WithDescription("Number of resources synced for a table"), + metric.WithUnit("/{tot}"), + ) + + errors, _ = otel.Meter(ResourceName).Int64Counter(errorsMetricName, + metric.WithDescription("Number of errors encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + + panics, _ = otel.Meter(ResourceName).Int64Counter(panicsMetricName, + metric.WithDescription("Number of panics encountered while syncing a table"), + metric.WithUnit("/{tot}"), + ) + + duration, _ = otel.Meter(ResourceName).Int64Counter(durationMetricName, + metric.WithDescription("Duration of syncing a table"), + metric.WithUnit("ms"), + ) + }) return &Metrics{ - invocationID: invocationID, - resources: resources, errors: errors, panics: panics, @@ -66,8 +63,6 @@ func NewMetrics(invocationID string) *Metrics { } type Metrics struct { - invocationID string - resources metric.Int64Counter errors metric.Int64Counter panics metric.Int64Counter @@ -88,45 +83,9 @@ type measurement struct { duration *durationMeasurement } -func (m *measurement) Equal(other *measurement) bool { - return m.resources == other.resources && m.errors == other.errors && m.panics == other.panics && m.duration == other.duration -} - -// Equal compares to stats. Mostly useful in testing -func (m *Metrics) Equal(other *Metrics) bool { - for table, clientStats := range m.measurements { - for client, stats := range clientStats.clients { - if _, ok := other.measurements[table]; !ok { - return false - } - if _, ok := other.measurements[table].clients[client]; !ok { - return false - } - if !stats.Equal(other.measurements[table].clients[client]) { - return false - } - } - } - for table, clientStats := range other.measurements { - for client, stats := range clientStats.clients { - if _, ok := m.measurements[table]; !ok { - return false - } - if _, ok := m.measurements[table].clients[client]; !ok { - return false - } - if !stats.Equal(m.measurements[table].clients[client]) { - return false - } - } - } - return true -} - func (m *Metrics) NewSelector(clientID, tableName string) Selector { return Selector{ Set: attribute.NewSet( - attribute.Key("sync.invocation.id").String(m.invocationID), attribute.Key("sync.table.name").String(tableName), ), clientID: clientID, @@ -134,13 +93,13 @@ func (m *Metrics) NewSelector(clientID, tableName string) Selector { } } -func (m *Metrics) InitWithClients(invocationID string, table *schema.Table, clients []schema.ClientMeta) { +func (m *Metrics) InitWithClients(table *schema.Table, clients []schema.ClientMeta) { m.measurements[table.Name] = tableMeasurements{clients: make(map[string]*measurement), duration: &durationMeasurement{}} for _, client := range clients { m.measurements[table.Name].clients[client.ID()] = &measurement{duration: &durationMeasurement{}} } for _, relation := range table.Relations { - m.InitWithClients(invocationID, relation, clients) + m.InitWithClients(relation, clients) } } diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index 792b39b01e..0c7342674b 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -82,7 +82,7 @@ func TestScheduler(t *testing.T) { } for _, tc := range tableClients { - m.InitWithClients(scheduler.invocationID, tc.Table, []schema.ClientMeta{tc.Client}) + m.InitWithClients(tc.Table, []schema.ClientMeta{tc.Client}) } resolvedResources := make(chan *schema.Resource) diff --git a/scheduler/scheduler_debug.go b/scheduler/scheduler_debug.go index 3b122b4037..93639d936a 100644 --- a/scheduler/scheduler_debug.go +++ b/scheduler/scheduler_debug.go @@ -50,7 +50,7 @@ func (s *syncClient) syncTest(ctx context.Context, syncMultiplier int, resolvedR preInitialisedClients[i] = clients // we do this here to avoid locks so we initialize the metrics structure once in the main goroutine // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(s.invocationID, table, clients) + s.metrics.InitWithClients(table, clients) } // First interleave the tables like in round-robin diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index c48c4345a2..dfdf4703d7 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -40,7 +40,7 @@ func (s *syncClient) syncDfs(ctx context.Context, resolvedResources chan<- *sche preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(s.invocationID, table, clients) + s.metrics.InitWithClients(table, clients) } tableClients := make([]tableClient, 0) diff --git a/scheduler/scheduler_round_robin.go b/scheduler/scheduler_round_robin.go index 11a71a544c..fa830f29fc 100644 --- a/scheduler/scheduler_round_robin.go +++ b/scheduler/scheduler_round_robin.go @@ -33,7 +33,7 @@ func (s *syncClient) syncRoundRobin(ctx context.Context, resolvedResources chan< preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(s.invocationID, table, clients) + s.metrics.InitWithClients(table, clients) } tableClients := roundRobinInterleave(s.tables, preInitialisedClients) diff --git a/scheduler/scheduler_shuffle.go b/scheduler/scheduler_shuffle.go index 49f7d3eba0..ad91fe7532 100644 --- a/scheduler/scheduler_shuffle.go +++ b/scheduler/scheduler_shuffle.go @@ -33,7 +33,7 @@ func (s *syncClient) syncShuffle(ctx context.Context, resolvedResources chan<- * preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(s.invocationID, table, clients) + s.metrics.InitWithClients(table, clients) } // First interleave the tables like in round-robin diff --git a/scheduler/scheduler_shuffle_queue.go b/scheduler/scheduler_shuffle_queue.go index db5f4b5c30..0bc5867cd7 100644 --- a/scheduler/scheduler_shuffle_queue.go +++ b/scheduler/scheduler_shuffle_queue.go @@ -21,7 +21,7 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha preInitialisedClients[i] = clients // we do this here to avoid locks so we initial the metrics structure once in the main goroutines // and then we can just read from it in the other goroutines concurrently given we are not writing to it. - s.metrics.InitWithClients(s.invocationID, table, clients) + s.metrics.InitWithClients(table, clients) } tableClients := roundRobinInterleave(s.tables, preInitialisedClients) From 9e305581c0d1f54cafcb907a43c7b5c4c0a2dfd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Wed, 16 Jul 2025 16:06:37 +0300 Subject: [PATCH 8/9] fix: Remove invocationID from metrics constructor calls --- scheduler/metrics/metrics.go | 4 ++-- scheduler/metrics/metrics_test.go | 2 +- scheduler/queue/scheduler_test.go | 5 ++--- scheduler/scheduler.go | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 8782af31a9..d622c1b5ed 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -29,7 +29,7 @@ var ( once sync.Once ) -func NewMetrics(invocationID string) *Metrics { +func NewMetrics() *Metrics { once.Do(func() { resources, _ = otel.Meter(ResourceName).Int64Counter(resourcesMetricName, metric.WithDescription("Number of resources synced for a table"), @@ -83,7 +83,7 @@ type measurement struct { duration *durationMeasurement } -func (m *Metrics) NewSelector(clientID, tableName string) Selector { +func (*Metrics) NewSelector(clientID, tableName string) Selector { return Selector{ Set: attribute.NewSet( attribute.Key("sync.table.name").String(tableName), diff --git a/scheduler/metrics/metrics_test.go b/scheduler/metrics/metrics_test.go index 2d7dcfc1df..9addc3edf3 100644 --- a/scheduler/metrics/metrics_test.go +++ b/scheduler/metrics/metrics_test.go @@ -8,7 +8,7 @@ import ( ) func TestMetrics(t *testing.T) { - m := NewMetrics("test_invocation_id") + m := NewMetrics() m.measurements["test_table_1"] = tableMeasurements{ clients: map[string]*measurement{ diff --git a/scheduler/queue/scheduler_test.go b/scheduler/queue/scheduler_test.go index 0c7342674b..8d9b71e4ec 100644 --- a/scheduler/queue/scheduler_test.go +++ b/scheduler/queue/scheduler_test.go @@ -45,9 +45,8 @@ func testResolver(_ context.Context, _ schema.ClientMeta, parent *schema.Resourc func TestScheduler(t *testing.T) { nopLogger := zerolog.Nop() - invocationID := uuid.New().String() - m := metrics.NewMetrics(invocationID) - scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(invocationID)) + m := metrics.NewMetrics() + scheduler := NewShuffleQueueScheduler(nopLogger, m, int64(0), WithWorkerCount(1000), WithInvocationID(uuid.New().String())) tableClients := []WorkUnit{ { Table: &schema.Table{ diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 59c7e666f3..8d17e08ffe 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -208,7 +208,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s } syncClient := &syncClient{ - metrics: metrics.NewMetrics(s.invocationID), + metrics: metrics.NewMetrics(), tables: tables, client: client, scheduler: s, From 4deada45dc088d0257b2f3ae7762c72e90ce27f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muraru=20=C8=98tefan?= Date: Thu, 17 Jul 2025 11:53:17 +0300 Subject: [PATCH 9/9] feat: Add back empty client.id metric point so that CLI does not break in older versions --- scheduler/metrics/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index d622c1b5ed..82e67740ef 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -87,6 +87,7 @@ func (*Metrics) NewSelector(clientID, tableName string) Selector { return Selector{ Set: attribute.NewSet( attribute.Key("sync.table.name").String(tableName), + attribute.Key("sync.client.id").String(""), ), clientID: clientID, tableName: tableName,