Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func aggregatorMain(ctx *cli.Context) error {

// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
if listenErr != nil {
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr)
listenErrPair := aggregator.SubscribeToNewTasks()
if listenErrPair != nil {
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErrPair)
}
}()

Expand Down
2 changes: 1 addition & 1 deletion aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Aggregator struct {
avsReader *chainio.AvsReader
avsSubscriber *chainio.AvsSubscriber
avsWriter *chainio.AvsWriter
taskSubscriber chan error
taskSubscriber chan chainio.ErrorPair
blsAggregationService blsagg.BlsAggregationService

// BLS Signature Service returns an Index
Expand Down
28 changes: 14 additions & 14 deletions aggregator/pkg/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package pkg

func (agg *Aggregator) SubscribeToNewTasks() error {
err := agg.subscribeToNewTasks()
if err != nil {
return err
import "github.com/yetanotherco/aligned_layer/core/chainio"

func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair {
errorPairPtr := agg.subscribeToNewTasks()
Comment thread
uri-99 marked this conversation as resolved.
Outdated
if errorPairPtr != nil {
return errorPairPtr
}

for {
select {
case err := <-agg.taskSubscriber:
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
err = agg.subscribeToNewTasks()
if err != nil {
return err
errorPairPtr = agg.subscribeToNewTasks()
if errorPairPtr != nil {
return errorPairPtr
}
case newBatch := <-agg.NewBatchChan:
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task")
Expand All @@ -21,14 +23,12 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
}
}

func (agg *Aggregator) subscribeToNewTasks() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)
func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair {
errorPairPtr := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
if errorPairPtr != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPairPtr)
}

return err
return errorPairPtr
}
10 changes: 9 additions & 1 deletion core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ func (r *AvsReader) IsOperatorRegistered(address ethcommon.Address) (bool, error
}

func (r *AvsReader) DisabledVerifiers() (*big.Int, error) {
return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
num, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
if err != nil {
// Retry with fallback client
num, err = r.AvsContractBindings.ServiceManagerFallback.ContractAlignedLayerServiceManagerCaller.DisabledVerifiers(&bind.CallOpts{})
if err != nil {
r.logger.Error("Failed to fetch DisabledVerifiers", "err", err)
}
}
return num, err
}

// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number
Expand Down
118 changes: 74 additions & 44 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"

"github.com/ethereum/go-ethereum/core/types"
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/config"

"fmt"
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/crypto"
)
Expand Down Expand Up @@ -43,6 +45,11 @@ type AvsSubscriber struct {
logger sdklogging.Logger
}

type ErrorPair struct {
ErrorMainRPC error
ErrorFallbackRPC error
}

func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) {
avsContractBindings, err := NewAvsServiceBindings(
baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr,
Expand All @@ -61,26 +68,27 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
}, nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")

// create a new channel to foward errors
errorChannel := make(chan error)
if errMain != nil && errFallback != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}

s.logger.Info("Subscribed to new AlignedLayer V2 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

Expand Down Expand Up @@ -109,49 +117,61 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
for {
var errMain, errFallback error
var auxSub, auxSubFallback event.Subscription
for errMain == nil || errFallback == nil { //while one is active
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
Comment thread
uri-99 marked this conversation as resolved.
Outdated
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
Comment thread
uri-99 marked this conversation as resolved.
Outdated

auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain == nil {
//sub.Unsubscribe()
sub = auxSub // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
Comment thread
JuArce marked this conversation as resolved.
Outdated
}
case err := <-subFallback.Err():
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
Comment thread
uri-99 marked this conversation as resolved.
Outdated
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err

auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback == nil {
//subFallback.Unsubscribe()
subFallback = auxSubFallback // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
}
}
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}()

return errorChannel, nil
return nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

s.logger.Info("Starting subscription to new AlignedLayer V3 tasks")
// Subscribe to new tasks
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain))
//return err
Comment thread
JuArce marked this conversation as resolved.
Outdated
}

subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback))
}
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")

// create a new channel to foward errors
errorChannel := make(chan error)
if errMain != nil && errFallback != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}

s.logger.Info("Subscribed to new AlignedLayer V3 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

Expand Down Expand Up @@ -180,27 +200,37 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema

// Handle errors and resubscribe
go func() {
for {
s.logger.Info("Starting error handling goroutine")
var errMain, errFallback error
var auxSub, auxSubFallback event.Subscription
for errMain == nil || errFallback == nil { //while one is active
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)

auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain == nil {
//sub.Unsubscribe()
sub = auxSub // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)

auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback == nil {
//subFallback.Unsubscribe()
subFallback = auxSubFallback // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
}
}
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}()

return errorChannel, nil
return nil
}

func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
Expand Down
3 changes: 3 additions & 0 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainio

import (
"context"
"github.com/rs/zerolog/log"
"math/big"

"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable(
config *retry.RetryParams,
) (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
log.Info().Msg("Subscribing to NewBatchV2")
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
Expand All @@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable(
config *retry.RetryParams,
) (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
log.Info().Msg("Subscribing to NewBatchV3")
return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
Expand Down
37 changes: 20 additions & 17 deletions operator/pkg/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
return operator, nil
}

func (o *Operator) SubscribeToNewTasksV2() (chan error, error) {
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2)
func (o *Operator) SubscribeToNewTasksV2(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorPairChan)
}

func (o *Operator) SubscribeToNewTasksV3() (chan error, error) {
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3)
func (o *Operator) SubscribeToNewTasksV3(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorPairChan)
}

type OperatorLastProcessedBatch struct {
Expand Down Expand Up @@ -205,13 +205,16 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error {
}

func (o *Operator) Start(ctx context.Context) error {
subV2, err := o.SubscribeToNewTasksV2()
if err != nil {
// create a new channel to foward errors
subV2ErrorChannel := make(chan chainio.ErrorPair)
errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel)
if errorPairPtr != nil {
log.Fatal("Could not subscribe to new tasks")
}

subV3, err := o.SubscribeToNewTasksV3()
if err != nil {
subV3ErrorChannel := make(chan chainio.ErrorPair)
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
if errorPairPtr != nil {
log.Fatal("Could not subscribe to new tasks")
}

Expand All @@ -231,24 +234,24 @@ func (o *Operator) Start(ctx context.Context) error {
return nil
case err := <-metricsErrChan:
o.Logger.Errorf("Metrics server failed", "err", err)
case err := <-subV2:
o.Logger.Infof("Error in websocket subscription", "err", err)
subV2, err = o.SubscribeToNewTasksV2()
if err != nil {
case errorPair := <-subV2ErrorChannel:
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
errorPairPtr = o.SubscribeToNewTasksV2(subV2ErrorChannel)
if errorPairPtr != nil {
o.Logger.Fatal("Could not subscribe to new tasks V2")
}
case err := <-subV3:
o.Logger.Infof("Error in websocket subscription", "err", err)
subV2, err = o.SubscribeToNewTasksV3()
if err != nil {
case errorPair := <-subV3ErrorChannel:
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
if errorPairPtr != nil {
o.Logger.Fatal("Could not subscribe to new tasks V3")
}
case newBatchLogV2 := <-o.NewTaskCreatedChanV2:
go o.handleNewBatchLogV2(newBatchLogV2)
case newBatchLogV3 := <-o.NewTaskCreatedChanV3:
go o.handleNewBatchLogV3(newBatchLogV3)
case blockNumber := <-o.lastProcessedBatch.batchProcessedChan:
err = o.UpdateLastProcessBatch(blockNumber)
err := o.UpdateLastProcessBatch(blockNumber)
if err != nil {
o.Logger.Errorf("Error while updating last process batch", "err", err)
}
Expand Down