Skip to content

Commit 696277a

Browse files
committed
hotfix: dont panic on event subscriptions [WIP]
1 parent b396d02 commit 696277a

4 files changed

Lines changed: 68 additions & 42 deletions

File tree

aggregator/pkg/subscriber.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
2323

2424
func (agg *Aggregator) subscribeToNewTasks() error {
2525
var err error
26-
27-
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)
26+
//subV3 := make(chan error)
27+
err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber)
2828

2929
if err != nil {
3030
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)

core/chainio/avs_subscriber.go

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,26 +61,32 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
6161
}, nil
6262
}
6363

64-
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
64+
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorChannel chan error) error {
6565
// Create a new channel to receive new tasks
6666
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
6767

6868
// Subscribe to new tasks
6969
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
7070
if err != nil {
7171
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
72-
return nil, err
72+
//return nil, err
7373
}
7474

75-
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
76-
if err != nil {
75+
subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
76+
if errFallback != nil {
7777
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
78-
return nil, err
78+
//return nil, err
7979
}
80+
81+
if err != nil && errFallback != nil {
82+
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "err", err, "errFallback", errFallback)
83+
return err
84+
}
85+
8086
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
8187

8288
// create a new channel to foward errors
83-
errorChannel := make(chan error)
89+
//errorChannel := make(chan error)
8490

8591
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
8692

@@ -109,49 +115,60 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
109115

110116
// Handle errors and resubscribe
111117
go func() {
112-
for {
118+
var err1, err2 error
119+
for err1 == nil || err2 == nil {
113120
select {
114121
case err := <-sub.Err():
115122
s.logger.Warn("Error in new task subscription", "err", err)
116-
sub.Unsubscribe()
117-
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
118-
if err != nil {
119-
errorChannel <- err
123+
124+
auxSub, err1 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
125+
if err1 == nil {
126+
//sub.Unsubscribe()
127+
sub = auxSub // update the subscription only if it was successful
120128
}
121129
case err := <-subFallback.Err():
122130
s.logger.Warn("Error in fallback new task subscription", "err", err)
123-
subFallback.Unsubscribe()
124-
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
125-
if err != nil {
126-
errorChannel <- err
131+
132+
auxSubFallback, err2 := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
133+
if err2 == nil {
134+
//subFallback.Unsubscribe()
135+
subFallback = auxSubFallback // update the subscription only if it was successful
127136
}
128137
}
129138
}
139+
errorChannel <- err1
130140
}()
131141

132-
return errorChannel, nil
142+
return nil
133143
}
134144

135-
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) (chan error, error) {
145+
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorChannel chan error) error {
136146
// Create a new channel to receive new tasks
137147
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)
138148

149+
s.logger.Info("Starting subscription to new AlignedLayer V3 tasks")
139150
// Subscribe to new tasks
140151
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
141152
if err != nil {
142153
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
143-
return nil, err
154+
//return nil, err
144155
}
145156

146-
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
147-
if err != nil {
157+
subFallback, errFallback := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
158+
if errFallback != nil {
148159
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
149-
return nil, err
160+
//return nil, errFallback
150161
}
162+
163+
if err != nil && errFallback != nil {
164+
s.logger.Error("Failed to subscribe to new AlignedLayer V3 tasks with both RPCs", "err", err, "errFallback", errFallback)
165+
return err
166+
}
167+
151168
s.logger.Info("Subscribed to new AlignedLayer V3 tasks")
152169

153170
// create a new channel to foward errors
154-
errorChannel := make(chan error)
171+
//errorChannel := make(chan error)
155172

156173
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
157174

@@ -180,27 +197,31 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
180197

181198
// Handle errors and resubscribe
182199
go func() {
183-
for {
200+
var err1, err2 error
201+
for err1 == nil || err2 == nil {
184202
select {
185203
case err := <-sub.Err():
186204
s.logger.Warn("Error in new task subscription", "err", err)
187-
sub.Unsubscribe()
188-
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
189-
if err != nil {
190-
errorChannel <- err
205+
206+
auxSub, err1 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
207+
if err1 == nil {
208+
//sub.Unsubscribe()
209+
sub = auxSub // update the subscription only if it was successful
191210
}
192211
case err := <-subFallback.Err():
193212
s.logger.Warn("Error in fallback new task subscription", "err", err)
194-
subFallback.Unsubscribe()
195-
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
196-
if err != nil {
197-
errorChannel <- err
213+
214+
auxSubFallback, err2 := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
215+
if err2 == nil {
216+
//subFallback.Unsubscribe()
217+
subFallback = auxSubFallback // update the subscription only if it was successful
198218
}
199219
}
200220
}
221+
errorChannel <- err1
201222
}()
202223

203-
return errorChannel, nil
224+
return nil
204225
}
205226

206227
func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {

core/chainio/retryable.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chainio
22

33
import (
44
"context"
5+
"github.com/rs/zerolog/log"
56
"math/big"
67

78
"github.com/ethereum/go-ethereum"
@@ -206,6 +207,7 @@ func SubscribeToNewTasksV2Retryable(
206207
config *retry.RetryParams,
207208
) (event.Subscription, error) {
208209
subscribe_func := func() (event.Subscription, error) {
210+
log.Info().Msg("Subscribing to NewBatchV2")
209211
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
210212
}
211213
return retry.RetryWithData(subscribe_func, config)
@@ -225,6 +227,7 @@ func SubscribeToNewTasksV3Retryable(
225227
config *retry.RetryParams,
226228
) (event.Subscription, error) {
227229
subscribe_func := func() (event.Subscription, error) {
230+
log.Info().Msg("Subscribing to NewBatchV3")
228231
return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot)
229232
}
230233
return retry.RetryWithData(subscribe_func, config)

operator/pkg/operator.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
140140
return operator, nil
141141
}
142142

143-
func (o *Operator) SubscribeToNewTasksV2() (chan error, error) {
144-
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2)
143+
func (o *Operator) SubscribeToNewTasksV2(errorChan chan error) error {
144+
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2, errorChan)
145145
}
146146

147-
func (o *Operator) SubscribeToNewTasksV3() (chan error, error) {
148-
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3)
147+
func (o *Operator) SubscribeToNewTasksV3(errorChan chan error) error {
148+
return o.avsSubscriber.SubscribeToNewTasksV3(o.NewTaskCreatedChanV3, errorChan)
149149
}
150150

151151
type OperatorLastProcessedBatch struct {
@@ -205,12 +205,14 @@ func (o *Operator) UpdateLastProcessBatch(blockNumber uint32) error {
205205
}
206206

207207
func (o *Operator) Start(ctx context.Context) error {
208-
subV2, err := o.SubscribeToNewTasksV2()
208+
subV2 := make(chan error)
209+
err := o.SubscribeToNewTasksV2(subV2)
209210
if err != nil {
210211
log.Fatal("Could not subscribe to new tasks")
211212
}
212213

213-
subV3, err := o.SubscribeToNewTasksV3()
214+
subV3 := make(chan error)
215+
err = o.SubscribeToNewTasksV3(subV3)
214216
if err != nil {
215217
log.Fatal("Could not subscribe to new tasks")
216218
}
@@ -233,13 +235,13 @@ func (o *Operator) Start(ctx context.Context) error {
233235
o.Logger.Errorf("Metrics server failed", "err", err)
234236
case err := <-subV2:
235237
o.Logger.Infof("Error in websocket subscription", "err", err)
236-
subV2, err = o.SubscribeToNewTasksV2()
238+
err = o.SubscribeToNewTasksV2(subV2)
237239
if err != nil {
238240
o.Logger.Fatal("Could not subscribe to new tasks V2")
239241
}
240242
case err := <-subV3:
241243
o.Logger.Infof("Error in websocket subscription", "err", err)
242-
subV2, err = o.SubscribeToNewTasksV3()
244+
err = o.SubscribeToNewTasksV3(subV3)
243245
if err != nil {
244246
o.Logger.Fatal("Could not subscribe to new tasks V3")
245247
}

0 commit comments

Comments
 (0)