Skip to content

Commit 30b356b

Browse files
committed
feat: return error pairs
1 parent 8232ee3 commit 30b356b

2 files changed

Lines changed: 59 additions & 62 deletions

File tree

core/chainio/avs_subscriber.go

Lines changed: 40 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
retry "github.com/yetanotherco/aligned_layer/core"
1616
"github.com/yetanotherco/aligned_layer/core/config"
1717

18+
"fmt"
1819
sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
1920
"github.com/ethereum/go-ethereum/crypto"
2021
)
@@ -44,6 +45,11 @@ type AvsSubscriber struct {
4445
logger sdklogging.Logger
4546
}
4647

48+
type ErrorPair struct {
49+
ErrorMainRPC error
50+
ErrorFallbackRPC error
51+
}
52+
4753
func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber, error) {
4854
avsContractBindings, err := NewAvsServiceBindings(
4955
baseConfig.AlignedLayerDeploymentConfig.AlignedLayerServiceManagerAddr,
@@ -62,33 +68,28 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
6268
}, nil
6369
}
6470

65-
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorChannel chan error) error {
71+
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
6672
// Create a new channel to receive new tasks
6773
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
6874

6975
// Subscribe to new tasks
70-
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
71-
if err != nil {
72-
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
73-
//return err
76+
sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
77+
if errMain != nil {
78+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
7479
}
7580

7681
subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
7782
if errFallback != nil {
78-
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
79-
//return err
83+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
8084
}
8185

82-
if err != nil && errFallback != nil {
83-
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "err", err, "errFallback", errFallback)
84-
return err
86+
if errMain != nil && errFallback != nil {
87+
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
88+
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
8589
}
8690

8791
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
8892

89-
// create a new channel to foward errors
90-
// errorChannel := make(chan error)
91-
9293
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
9394

9495
// Forward the new tasks to the provided channel
@@ -116,66 +117,62 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
116117

117118
// Handle errors and resubscribe
118119
go func() {
119-
var err1, err2 error
120+
var errMain, errFallback error
120121
var auxSub, auxSubFallback event.Subscription
121-
for err1 == nil || err2 == nil { //while one is active
122+
for errMain == nil || errFallback == nil { //while one is active
122123
select {
123124
case err := <-sub.Err():
124125
s.logger.Warn("Error in new task subscription", "err", err)
125-
s.logger.Info("failed states:", "err1", err1, "err2", err2)
126+
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
126127

127-
auxSub, err1 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
128-
if err1 == nil {
128+
auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
129+
if errMain == nil {
129130
//sub.Unsubscribe()
130131
sub = auxSub // update the subscription only if it was successful
131132
s.logger.Info("Resubscribed to fallback new task subscription")
132133
}
133134
case err := <-subFallback.Err():
134-
s.logger.Info("failed states:", "err1", err1, "err2", err2)
135+
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
135136
s.logger.Warn("Error in fallback new task subscription", "err", err)
136137

137-
auxSubFallback, err2 = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
138-
if err2 == nil {
138+
auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
139+
if errFallback == nil {
139140
//subFallback.Unsubscribe()
140141
subFallback = auxSubFallback // update the subscription only if it was successful
141142
s.logger.Info("Resubscribed to fallback new task subscription")
142143
}
143144
}
144145
}
145-
errorChannel <- err1
146+
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
146147
}()
147148

148149
return nil
149150
}
150151

151-
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorChannel chan error) error {
152+
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
152153
// Create a new channel to receive new tasks
153154
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
154155

155156
s.logger.Info("Starting subscription to new AlignedLayer V3 tasks")
156157
// Subscribe to new tasks
157-
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
158-
if err != nil {
159-
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
158+
sub, errMain := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
159+
if errMain != nil {
160+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errMain))
160161
//return err
161162
}
162163

163164
subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
164165
if errFallback != nil {
165-
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
166-
//return errFallback
166+
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "err", fmt.Sprintf("%v", errFallback))
167167
}
168168

169-
if err != nil && errFallback != nil {
170-
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "err", err, "errFallback", errFallback)
171-
return err
169+
if errMain != nil && errFallback != nil {
170+
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
171+
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
172172
}
173173

174174
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")
175175

176-
// create a new channel to foward errors
177-
// errorChannel := make(chan error)
178-
179176
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
180177

181178
// Forward the new tasks to the provided channel
@@ -204,33 +201,33 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
204201
// Handle errors and resubscribe
205202
go func() {
206203
s.logger.Info("Starting error handling goroutine")
207-
var err1, err2 error
204+
var errMain, errFallback error
208205
var auxSub, auxSubFallback event.Subscription
209-
for err1 == nil || err2 == nil { //while one is active
206+
for errMain == nil || errFallback == nil { //while one is active
210207
select {
211208
case err := <-sub.Err():
212209
s.logger.Warn("Error in new task subscription", "err", err)
213-
s.logger.Info("failed states:", "err1", err1, "err2", err2)
210+
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
214211

215-
auxSub, err1 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
216-
if err1 == nil {
212+
auxSub, errMain = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
213+
if errMain == nil {
217214
//sub.Unsubscribe()
218215
sub = auxSub // update the subscription only if it was successful
219216
s.logger.Info("Resubscribed to fallback new task subscription")
220217
}
221218
case err := <-subFallback.Err():
222219
s.logger.Warn("Error in fallback new task subscription", "err", err)
223-
s.logger.Info("failed states:", "err1", err1, "err2", err2)
220+
s.logger.Info("failed states:", "errMain", errMain, "errFallback", errFallback)
224221

225-
auxSubFallback, err2 = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
226-
if err2 == nil {
222+
auxSubFallback, errFallback = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
223+
if errFallback == nil {
227224
//subFallback.Unsubscribe()
228225
subFallback = auxSubFallback // update the subscription only if it was successful
229226
s.logger.Info("Resubscribed to fallback new task subscription")
230227
}
231228
}
232229
}
233-
errorChannel <- err1
230+
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
234231
}()
235232

236233
return nil

operator/pkg/operator.go

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
140140
return operator, nil
141141
}
142142

143-
func (o *Operator) SubscribeToNewTasksV2(errorChan chan error) error {
144-
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorChan)
143+
func (o *Operator) SubscribeToNewTasksV2(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
144+
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorPairChan)
145145
}
146146

147-
func (o *Operator) SubscribeToNewTasksV3(errorChan chan error) error {
148-
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorChan)
147+
func (o *Operator) SubscribeToNewTasksV3(errorPairChan chan chainio.ErrorPair) *chainio.ErrorPair {
148+
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorPairChan)
149149
}
150150

151151
type OperatorLastProcessedBatch struct {
@@ -206,15 +206,15 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error {
206206

207207
func (o *Operator) Start(ctx context.Context) error {
208208
// create a new channel to foward errors
209-
subV2 := make(chan error)
210-
err := o.SubscribeToNewTasksV2(subV2)
211-
if err != nil {
209+
subV2ErrorChannel := make(chan chainio.ErrorPair)
210+
errorPairPtr := o.SubscribeToNewTasksV2(subV2ErrorChannel)
211+
if errorPairPtr != nil {
212212
log.Fatal("Could not subscribe to new tasks")
213213
}
214214

215-
subV3 := make(chan error)
216-
err = o.SubscribeToNewTasksV3(subV3)
217-
if err != nil {
215+
subV3ErrorChannel := make(chan chainio.ErrorPair)
216+
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
217+
if errorPairPtr != nil {
218218
log.Fatal("Could not subscribe to new tasks")
219219
}
220220

@@ -234,24 +234,24 @@ func (o *Operator) Start(ctx context.Context) error {
234234
return nil
235235
case err := <-metricsErrChan:
236236
o.Logger.Errorf("Metrics server failed", "err", err)
237-
case err := <-subV2:
238-
o.Logger.Infof("Error in websocket subscription", "err", err)
239-
err = o.SubscribeToNewTasksV2(subV2)
240-
if err != nil {
237+
case errorPair := <-subV2ErrorChannel:
238+
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
239+
errorPairPtr = o.SubscribeToNewTasksV2(subV2ErrorChannel)
240+
if errorPairPtr != nil {
241241
o.Logger.Fatal("Could not subscribe to new tasks V2")
242242
}
243-
case err := <-subV3:
244-
o.Logger.Infof("Error in websocket subscription", "err", err)
245-
err = o.SubscribeToNewTasksV3(subV3)
246-
if err != nil {
243+
case errorPair := <-subV3ErrorChannel:
244+
o.Logger.Infof("Error in websocket subscription", "err", errorPair)
245+
errorPairPtr = o.SubscribeToNewTasksV3(subV3ErrorChannel)
246+
if errorPairPtr != nil {
247247
o.Logger.Fatal("Could not subscribe to new tasks V3")
248248
}
249249
case newBatchLogV2 := <-o.NewTaskCreatedChanV2:
250250
go o.handleNewBatchLogV2(newBatchLogV2)
251251
case newBatchLogV3 := <-o.NewTaskCreatedChanV3:
252252
go o.handleNewBatchLogV3(newBatchLogV3)
253253
case blockNumber := <-o.lastProcessedBatch.batchProcessedChan:
254-
err = o.UpdateLastProcessBatch(blockNumber)
254+
err := o.UpdateLastProcessBatch(blockNumber)
255255
if err != nil {
256256
o.Logger.Errorf("Error while updating last process batch", "err", err)
257257
}

0 commit comments

Comments
 (0)