From 696277aa4e58eb50d5f7557f9473d0d73edadf91 Mon Sep 17 00:00:00 2001 From: JuArce <52429267+JuArce@users.noreply.github.com> Date: Thu, 9 Jan 2025 19:47:10 -0300 Subject: [PATCH 01/12] hotfix: dont panic on event subscriptions [WIP] --- aggregator/pkg/subscriber.go | 4 +- core/chainio/avs_subscriber.go | 85 +++++++++++++++++++++------------- core/chainio/retryable.go | 3 ++ operator/pkg/operator.go | 18 +++---- 4 files changed, 68 insertions(+), 42 deletions(-) diff --git a/aggregator/pkg/subscriber.go b/aggregator/pkg/subscriber.go index 4aef715da1..a2292d31fc 100644 --- a/aggregator/pkg/subscriber.go +++ b/aggregator/pkg/subscriber.go @@ -23,8 +23,8 @@ func (agg *Aggregator) SubscribeToNewTasks() error { func (agg *Aggregator) subscribeToNewTasks() error { var err error - - agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan) + //subV3 := make(chan error) + err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) if err != nil { agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index c408020582..4976988547 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -61,7 +61,7 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, }, nil } -func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) { +func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorChannel chan error) error { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) @@ -69,18 +69,24 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - return nil, err + //return nil, err } - subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { + subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback != nil { s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - return nil, err + //return nil, err } + + if err != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "err", err, "errFallback", errFallback) + return err + } + s.logger.Info("Subscribed to new AlignedLayer V2 tasks") // create a new channel to foward errors - errorChannel := make(chan error) + //errorChannel := make(chan error) pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) @@ -109,49 +115,60 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { - for { + var err1, err2 error + for err1 == nil || err2 == nil { select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) - sub.Unsubscribe() - sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + + auxSub, err1 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if err1 == nil { + //sub.Unsubscribe() + sub = auxSub // update the subscription only if it was successful } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) - subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + + auxSubFallback, err2 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if err2 == nil { + //subFallback.Unsubscribe() + subFallback = auxSubFallback // update the subscription only if it was successful } } } + errorChannel <- err1 }() - return errorChannel, nil + return nil } -func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) { +func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorChannel chan error) error { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) + s.logger.Info("Starting subscription to new AlignedLayer V3 tasks") // Subscribe to new tasks sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - return nil, err + //return nil, err } - subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { + subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback != nil { s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - return nil, err + //return nil, errFallback } + + if err != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "err", err, "errFallback", errFallback) + return err + } + s.logger.Info("Subscribed to new AlignedLayer V3 tasks") // create a new channel to foward errors - errorChannel := make(chan error) + //errorChannel := make(chan error) pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) @@ -180,27 +197,31 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { - for { + var err1, err2 error + for err1 == nil || err2 == nil { select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) - sub.Unsubscribe() - sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + + auxSub, err1 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if err1 == nil { + //sub.Unsubscribe() + sub = auxSub // update the subscription only if it was successful } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) - subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - errorChannel <- err + + auxSubFallback, err2 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if err2 == nil { + //subFallback.Unsubscribe() + subFallback = auxSubFallback // update the subscription only if it was successful } } } + errorChannel <- err1 }() - return errorChannel, nil + return nil } func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) { diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index bf4724e2f2..4c4d8e9f6b 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -2,6 +2,7 @@ package chainio import ( "context" + "github.com/rs/zerolog/log" "math/big" "github.com/ethereum/go-ethereum" @@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable( config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { + log.Info().Msg("Subscribing to NewBatchV2") return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) } return retry.RetryWithData(subscribe_func, config) @@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable( config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { + log.Info().Msg("Subscribing to NewBatchV3") return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } return retry.RetryWithData(subscribe_func, config) diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index d4c36355cb..5d141bce83 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro return operator, nil } -func (o *Operator) SubscribeToNewTasksV2() (chan error, error) { - return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2) +func (o *Operator) SubscribeToNewTasksV2(errorChan chan error) error { + return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorChan) } -func (o *Operator) SubscribeToNewTasksV3() (chan error, error) { - return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3) +func (o *Operator) SubscribeToNewTasksV3(errorChan chan error) error { + return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorChan) } type OperatorLastProcessedBatch struct { @@ -205,12 +205,14 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error { } func (o *Operator) Start(ctx context.Context) error { - subV2, err := o.SubscribeToNewTasksV2() + subV2 := make(chan error) + err := o.SubscribeToNewTasksV2(subV2) if err != nil { log.Fatal("Could not subscribe to new tasks") } - subV3, err := o.SubscribeToNewTasksV3() + subV3 := make(chan error) + err = o.SubscribeToNewTasksV3(subV3) if err != nil { log.Fatal("Could not subscribe to new tasks") } @@ -233,13 +235,13 @@ func (o *Operator) Start(ctx context.Context) error { o.Logger.Errorf("Metrics server failed", "err", err) case err := <-subV2: o.Logger.Infof("Error in websocket subscription", "err", err) - subV2, err = o.SubscribeToNewTasksV2() + err = o.SubscribeToNewTasksV2(subV2) if err != nil { o.Logger.Fatal("Could not subscribe to new tasks V2") } case err := <-subV3: o.Logger.Infof("Error in websocket subscription", "err", err) - subV2, err = o.SubscribeToNewTasksV3() + err = o.SubscribeToNewTasksV3(subV3) if err != nil { o.Logger.Fatal("Could not subscribe to new tasks V3") } From 8869c63476fa3a1b85e6230a0eb211fd35892047 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Fri, 10 Jan 2025 14:33:21 -0300 Subject: [PATCH 02/12] chore: comments --- core/chainio/avs_subscriber.go | 12 ++++++------ operator/pkg/operator.go | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 4976988547..1ed173995e 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -69,13 +69,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - //return nil, err + //return err } subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback != nil { s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - //return nil, err + //return err } if err != nil && errFallback != nil { @@ -86,7 +86,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema s.logger.Info("Subscribed to new AlignedLayer V2 tasks") // create a new channel to foward errors - //errorChannel := make(chan error) + // errorChannel := make(chan error) pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) @@ -151,13 +151,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - //return nil, err + //return err } subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback != nil { s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - //return nil, errFallback + //return errFallback } if err != nil && errFallback != nil { @@ -168,7 +168,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema s.logger.Info("Subscribed to new AlignedLayer V3 tasks") // create a new channel to foward errors - //errorChannel := make(chan error) + // errorChannel := make(chan error) pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index 5d141bce83..a4940ab90b 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -205,6 +205,7 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error { } func (o *Operator) Start(ctx context.Context) error { + // create a new channel to foward errors subV2 := make(chan error) err := o.SubscribeToNewTasksV2(subV2) if err != nil { From 8232ee3d2ca7df7f346fe395e26bf7eadd325c0e Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:41:34 -0300 Subject: [PATCH 03/12] fix: resub infinite loop --- core/chainio/avs_subscriber.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 1ed173995e..bec938fdbe 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/core/types" servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager" @@ -116,23 +117,28 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { var err1, err2 error - for err1 == nil || err2 == nil { + var auxSub, auxSubFallback event.Subscription + for err1 == nil || err2 == nil { //while one is active select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) + s.logger.Info("failed states:", "err1", err1, "err2", err2) - auxSub, err1 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + auxSub, err1 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err1 == nil { //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): + s.logger.Info("failed states:", "err1", err1, "err2", err2) s.logger.Warn("Error in fallback new task subscription", "err", err) - auxSubFallback, err2 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + auxSubFallback, err2 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err2 == nil { //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } } } @@ -197,24 +203,30 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { + s.logger.Info("Starting error handling goroutine") var err1, err2 error - for err1 == nil || err2 == nil { + var auxSub, auxSubFallback event.Subscription + for err1 == nil || err2 == nil { //while one is active select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) + s.logger.Info("failed states:", "err1", err1, "err2", err2) - auxSub, err1 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + auxSub, err1 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err1 == nil { //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) + s.logger.Info("failed states:", "err1", err1, "err2", err2) - auxSubFallback, err2 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + auxSubFallback, err2 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err2 == nil { //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful + s.logger.Info("Resubscribed to fallback new task subscription") } } } From 30b356bfb7dcc81c0166c550f86eac8054a1841c Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:21:21 -0300 Subject: [PATCH 04/12] feat: return error pairs --- core/chainio/avs_subscriber.go | 83 ++++++++++++++++------------------ operator/pkg/operator.go | 38 ++++++++-------- 2 files changed, 59 insertions(+), 62 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index bec938fdbe..3fe01e3c38 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -15,6 +15,7 @@ import ( retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/core/config" + "fmt" sdklogging "github.com/Layr-Labs/eigensdk-go/logging" "github.com/ethereum/go-ethereum/crypto" ) @@ -44,6 +45,11 @@ type AvsSubscriber struct { logger sdklogging.Logger } +type ErrorPair struct { + ErrorMainRPC error + ErrorFallbackRPC error +} + func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) { avsContractBindings, err := NewAvsServiceBindings( baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr, @@ -62,33 +68,28 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, }, nil } -func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorChannel chan error) error { +func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) // Subscribe to new tasks - sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - //return err + sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain)) } subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) - //return err + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback)) } - if err != nil && errFallback != nil { - s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "err", err, "errFallback", errFallback) - return err + if errMain != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback) + return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") - // create a new channel to foward errors - // errorChannel := make(chan error) - pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) // Forward the new tasks to the provided channel @@ -116,66 +117,62 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { - var err1, err2 error + var errMain, errFallback error var auxSub, auxSubFallback event.Subscription - for err1 == nil || err2 == nil { //while one is active + for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) - s.logger.Info("failed states:", "err1", err1, "err2", err2) + s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) - auxSub, err1 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err1 == nil { + auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain == nil { //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): - s.logger.Info("failed states:", "err1", err1, "err2", err2) + s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) s.logger.Warn("Error in fallback new task subscription", "err", err) - auxSubFallback, err2 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err2 == nil { + auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback == nil { //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } } } - errorChannel <- err1 + errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} }() return nil } -func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorChannel chan error) error { +func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair { // Create a new channel to receive new tasks internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) s.logger.Info("Starting subscription to new AlignedLayer V3 tasks") // Subscribe to new tasks - sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) + sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain != nil { + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain)) //return err } subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) - //return errFallback + s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback)) } - if err != nil && errFallback != nil { - s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "err", err, "errFallback", errFallback) - return err + if errMain != nil && errFallback != nil { + s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback) + return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} } s.logger.Info("Subscribed to new AlignedLayer V3 tasks") - // create a new channel to foward errors - // errorChannel := make(chan error) - pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval) // Forward the new tasks to the provided channel @@ -204,33 +201,33 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema // Handle errors and resubscribe go func() { s.logger.Info("Starting error handling goroutine") - var err1, err2 error + var errMain, errFallback error var auxSub, auxSubFallback event.Subscription - for err1 == nil || err2 == nil { //while one is active + for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) - s.logger.Info("failed states:", "err1", err1, "err2", err2) + s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) - auxSub, err1 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) - if err1 == nil { + auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) + if errMain == nil { //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) - s.logger.Info("failed states:", "err1", err1, "err2", err2) + s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) - auxSubFallback, err2 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) - if err2 == nil { + auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) + if errFallback == nil { //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } } } - errorChannel <- err1 + errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback} }() return nil diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index a4940ab90b..00d751ba6a 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro return operator, nil } -func (o *Operator) SubscribeToNewTasksV2(errorChan chan error) error { - return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorChan) +func (o *Operator) SubscribeToNewTasksV2(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair { + return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorPairChan) } -func (o *Operator) SubscribeToNewTasksV3(errorChan chan error) error { - return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorChan) +func (o *Operator) SubscribeToNewTasksV3(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair { + return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorPairChan) } type OperatorLastProcessedBatch struct { @@ -206,15 +206,15 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error { func (o *Operator) Start(ctx context.Context) error { // create a new channel to foward errors - subV2 := make(chan error) - err := o.SubscribeToNewTasksV2(subV2) - if err != nil { + subV2ErrorChannel := make(chan chainio.ErrorPair) + errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPairPtr != nil { log.Fatal("Could not subscribe to new tasks") } - subV3 := make(chan error) - err = o.SubscribeToNewTasksV3(subV3) - if err != nil { + subV3ErrorChannel := make(chan chainio.ErrorPair) + errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPairPtr != nil { log.Fatal("Could not subscribe to new tasks") } @@ -234,16 +234,16 @@ func (o *Operator) Start(ctx context.Context) error { return nil case err := <-metricsErrChan: o.Logger.Errorf("Metrics server failed", "err", err) - case err := <-subV2: - o.Logger.Infof("Error in websocket subscription", "err", err) - err = o.SubscribeToNewTasksV2(subV2) - if err != nil { + case errorPair := <-subV2ErrorChannel: + o.Logger.Infof("Error in websocket subscription", "err", errorPair) + errorPairPtr = o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V2") } - case err := <-subV3: - o.Logger.Infof("Error in websocket subscription", "err", err) - err = o.SubscribeToNewTasksV3(subV3) - if err != nil { + case errorPair := <-subV3ErrorChannel: + o.Logger.Infof("Error in websocket subscription", "err", errorPair) + errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V3") } case newBatchLogV2 := <-o.NewTaskCreatedChanV2: @@ -251,7 +251,7 @@ func (o *Operator) Start(ctx context.Context) error { case newBatchLogV3 := <-o.NewTaskCreatedChanV3: go o.handleNewBatchLogV3(newBatchLogV3) case blockNumber := <-o.lastProcessedBatch.batchProcessedChan: - err = o.UpdateLastProcessBatch(blockNumber) + err := o.UpdateLastProcessBatch(blockNumber) if err != nil { o.Logger.Errorf("Error while updating last process batch", "err", err) } From be8fd38003346c530f7d6a4f81da810690e2e411 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Fri, 10 Jan 2025 20:13:13 -0300 Subject: [PATCH 05/12] fix: aggregator accepts ErrorPair --- aggregator/cmd/main.go | 4 ++-- aggregator/pkg/aggregator.go | 2 +- aggregator/pkg/subscriber.go | 28 ++++++++++++++-------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/aggregator/cmd/main.go b/aggregator/cmd/main.go index 37ab559e0d..4702ca50d4 100644 --- a/aggregator/cmd/main.go +++ b/aggregator/cmd/main.go @@ -60,8 +60,8 @@ func aggregatorMain(ctx *cli.Context) error { // Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions: go func() { - listenErr := aggregator.SubscribeToNewTasks() - if listenErr != nil { + listenErrPair := aggregator.SubscribeToNewTasks() + if listenErrPair != nil { aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr) } }() diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 00c7efd9c0..a7ed7e8a2a 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -46,7 +46,7 @@ type Aggregator struct { avsReader *chainio.AvsReader avsSubscriber *chainio.AvsSubscriber avsWriter *chainio.AvsWriter - taskSubscriber chan error + taskSubscriber chan chainio.ErrorPair blsAggregationService blsagg.BlsAggregationService // BLS Signature Service returns an Index diff --git a/aggregator/pkg/subscriber.go b/aggregator/pkg/subscriber.go index a2292d31fc..d19cd8fbcc 100644 --- a/aggregator/pkg/subscriber.go +++ b/aggregator/pkg/subscriber.go @@ -1,18 +1,20 @@ package pkg -func (agg *Aggregator) SubscribeToNewTasks() error { - err := agg.subscribeToNewTasks() - if err != nil { - return err +import "github.com/yetanotherco/aligned_layer/core/chainio" + +func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair { + errorPairPtr := agg.subscribeToNewTasks() + if errorPairPtr != nil { + return errorPairPtr } for { select { case err := <-agg.taskSubscriber: agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err) - err = agg.subscribeToNewTasks() - if err != nil { - return err + errorPairPtr = agg.subscribeToNewTasks() + if errorPairPtr != nil { + return errorPairPtr } case newBatch := <-agg.NewBatchChan: agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task") @@ -21,14 +23,12 @@ func (agg *Aggregator) SubscribeToNewTasks() error { } } -func (agg *Aggregator) subscribeToNewTasks() error { - var err error - //subV3 := make(chan error) - err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) +func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair { + errorPairPtr := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) - if err != nil { - agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err) + if errorPairPtr != nil { + agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPairPtr) } - return err + return errorPairPtr } From 380488c2b48a1ab25cf2f79dfe8b10a60bb09193 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Fri, 10 Jan 2025 20:13:22 -0300 Subject: [PATCH 06/12] feat: operator uses fallback to check DisabledVerifiers --- core/chainio/avs_reader.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index ae2ea0a9da..95dccd63e6 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -72,7 +72,15 @@ func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error } func (r *AvsReader) DisabledVerifiers() (*big.Int, error) { - return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + num, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + if err != nil { + // Retry with fallback client + num, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{}) + if err != nil { + r.logger.Error("Failed to fetch DisabledVerifiers", "err", err) + } + } + return num, err } // Returns all the "NewBatchV3" logs that have not been responded starting from the given block number From ac280b7ddaf2229117dab5f904fb79d6f1d535e9 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Mon, 13 Jan 2025 11:22:45 -0300 Subject: [PATCH 07/12] fix: var name --- aggregator/cmd/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregator/cmd/main.go b/aggregator/cmd/main.go index 4702ca50d4..d82daa8f87 100644 --- a/aggregator/cmd/main.go +++ b/aggregator/cmd/main.go @@ -62,7 +62,7 @@ func aggregatorMain(ctx *cli.Context) error { go func() { listenErrPair := aggregator.SubscribeToNewTasks() if listenErrPair != nil { - aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr) + aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErrPair) } }() From 1d07b90fc980fb177901b302354b604e4f10101e Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:24:03 -0300 Subject: [PATCH 08/12] refactor: errorPairPtr to errorPair --- aggregator/pkg/subscriber.go | 20 ++++++++++---------- operator/pkg/operator.go | 16 ++++++++-------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/aggregator/pkg/subscriber.go b/aggregator/pkg/subscriber.go index d19cd8fbcc..7c7c092c29 100644 --- a/aggregator/pkg/subscriber.go +++ b/aggregator/pkg/subscriber.go @@ -3,18 +3,18 @@ package pkg import "github.com/yetanotherco/aligned_layer/core/chainio" func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair { - errorPairPtr := agg.subscribeToNewTasks() - if errorPairPtr != nil { - return errorPairPtr + errorPair := agg.subscribeToNewTasks() + if errorPair != nil { + return errorPair } for { select { case err := <-agg.taskSubscriber: agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err) - errorPairPtr = agg.subscribeToNewTasks() - if errorPairPtr != nil { - return errorPairPtr + errorPair = agg.subscribeToNewTasks() + if errorPair != nil { + return errorPair } case newBatch := <-agg.NewBatchChan: agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task") @@ -24,11 +24,11 @@ func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair { } func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair { - errorPairPtr := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) + errorPair := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber) - if errorPairPtr != nil { - agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPairPtr) + if errorPair != nil { + agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPair) } - return errorPairPtr + return errorPair } diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index 00d751ba6a..91413111e5 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -207,14 +207,14 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error { func (o *Operator) Start(ctx context.Context) error { // create a new channel to foward errors subV2ErrorChannel := make(chan chainio.ErrorPair) - errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel) - if errorPairPtr != nil { + errorPair := o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPair != nil { log.Fatal("Could not subscribe to new tasks") } subV3ErrorChannel := make(chan chainio.ErrorPair) - errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel) - if errorPairPtr != nil { + errorPair = o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPair != nil { log.Fatal("Could not subscribe to new tasks") } @@ -236,14 +236,14 @@ func (o *Operator) Start(ctx context.Context) error { o.Logger.Errorf("Metrics server failed", "err", err) case errorPair := <-subV2ErrorChannel: o.Logger.Infof("Error in websocket subscription", "err", errorPair) - errorPairPtr = o.SubscribeToNewTasksV2(subV2ErrorChannel) - if errorPairPtr != nil { + errorPair = o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPair != nil { o.Logger.Fatal("Could not subscribe to new tasks V2") } case errorPair := <-subV3ErrorChannel: o.Logger.Infof("Error in websocket subscription", "err", errorPair) - errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel) - if errorPairPtr != nil { + errorPair = o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPair != nil { o.Logger.Fatal("Could not subscribe to new tasks V3") } case newBatchLogV2 := <-o.NewTaskCreatedChanV2: From d29be3a0b2da853ddc741c2d3a64a6869e010def Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:27:45 -0300 Subject: [PATCH 09/12] fix: remove infos and added which connect errored in the warn --- core/chainio/avs_subscriber.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index 3fe01e3c38..b52efcff62 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -122,22 +122,18 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): - s.logger.Warn("Error in new task subscription", "err", err) - s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) + s.logger.Warn("Error in new task subscription of main connection", "err", err) auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if errMain == nil { - //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): - s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) - s.logger.Warn("Error in fallback new task subscription", "err", err) + s.logger.Warn("Error in new task subscription of fallback connection", "err", err) auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback == nil { - //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } @@ -206,22 +202,18 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema for errMain == nil || errFallback == nil { //while one is active select { case err := <-sub.Err(): - s.logger.Warn("Error in new task subscription", "err", err) - s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) + s.logger.Warn("Error in new task subscription of main connection", "err", err) auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if errMain == nil { - //sub.Unsubscribe() sub = auxSub // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } case err := <-subFallback.Err(): - s.logger.Warn("Error in fallback new task subscription", "err", err) - s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback) + s.logger.Warn("Error in new task subscription of fallback connection", "err", err) auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if errFallback == nil { - //subFallback.Unsubscribe() subFallback = auxSubFallback // update the subscription only if it was successful s.logger.Info("Resubscribed to fallback new task subscription") } From ad8b0bc93b6141d5003fecf709ab520e8ce7b7c1 Mon Sep 17 00:00:00 2001 From: Urix <43704209+uri-99@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:46:22 -0300 Subject: [PATCH 10/12] fix: errorPtr only when necesarry --- operator/pkg/operator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index 91413111e5..607249d8f9 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -236,14 +236,14 @@ func (o *Operator) Start(ctx context.Context) error { o.Logger.Errorf("Metrics server failed", "err", err) case errorPair := <-subV2ErrorChannel: o.Logger.Infof("Error in websocket subscription", "err", errorPair) - errorPair = o.SubscribeToNewTasksV2(subV2ErrorChannel) - if errorPair != nil { + errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V2") } case errorPair := <-subV3ErrorChannel: o.Logger.Infof("Error in websocket subscription", "err", errorPair) - errorPair = o.SubscribeToNewTasksV3(subV3ErrorChannel) - if errorPair != nil { + errorPairPtr := o.SubscribeToNewTasksV3(subV3ErrorChannel) + if errorPairPtr != nil { o.Logger.Fatal("Could not subscribe to new tasks V3") } case newBatchLogV2 := <-o.NewTaskCreatedChanV2: From b0c2d149fe6b84b59e118d8d98f9d786621e843a Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Wed, 28 May 2025 16:11:42 -0300 Subject: [PATCH 11/12] chore: incremented eth package node count --- config-files/config-operator-1-ethereum-package.yaml | 8 ++++---- network_params.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/config-files/config-operator-1-ethereum-package.yaml b/config-files/config-operator-1-ethereum-package.yaml index 27e11d3c2f..b19416d774 100644 --- a/config-files/config-operator-1-ethereum-package.yaml +++ b/config-files/config-operator-1-ethereum-package.yaml @@ -3,10 +3,10 @@ environment: 'development' aligned_layer_deployment_config_file_path: './contracts/script/output/devnet/alignedlayer_deployment_output.json' eigen_layer_deployment_config_file_path: './contracts/script/output/devnet/eigenlayer_deployment_output.json' -eth_rpc_url: "http://localhost:8545" -eth_rpc_url_fallback: "http://localhost:8552" -eth_ws_url: "ws://localhost:8546" -eth_ws_url_fallback: "ws://localhost:8553" +eth_rpc_url: "http://localhost:8552" +eth_rpc_url_fallback: "http://localhost:8559" +eth_ws_url: "ws://localhost:8553" +eth_ws_url_fallback: "ws://localhost:8560" eigen_metrics_ip_port_address: 'localhost:9090' ## ECDSA Configurations diff --git a/network_params.yaml b/network_params.yaml index e67b82d645..ecdeeeab1d 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -1,7 +1,7 @@ participants: - el_type: reth cl_type: lighthouse - count: 2 + count: 3 validator_count: 32 ethereum_metrics_exporter_enabled: true From 3026aa5cc7a7cc476ba87d69360f1d36640b763f Mon Sep 17 00:00:00 2001 From: Julian Arce <52429267+JuArce@users.noreply.github.com> Date: Wed, 4 Jun 2025 10:44:33 -0300 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: Marcos Nicolau <76252340+MarcosNicolau@users.noreply.github.com> --- core/chainio/avs_subscriber.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index b52efcff62..59d878d0b0 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -127,7 +127,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if errMain == nil { sub = auxSub // update the subscription only if it was successful - s.logger.Info("Resubscribed to fallback new task subscription") + s.logger.Info("Main connection resubscribed to new task subscription") } case err := <-subFallback.Err(): s.logger.Warn("Error in new task subscription of fallback connection", "err", err) @@ -154,7 +154,6 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if errMain != nil { s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain)) - //return err } subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())