Skip to content

Commit 019f7cc

Browse files
committed
feat: subscribeToNewTasksV2, SubscribeToNewTasksV2, getLatestTaskFromEthereumV2
1 parent 9fe3900 commit 019f7cc

1 file changed

Lines changed: 147 additions & 0 deletions

File tree

core/chainio/avs_subscriber.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,73 @@ func (s *AvsSubscriber) SubscribeToNewTasks(newTaskCreatedChan chan *servicemana
132132

133133
return errorChannel, nil
134134
}
135+
func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) (chan error, error) {
136+
// Create a new channel to receive new tasks
137+
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
138+
139+
// Subscribe to new tasks
140+
sub, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
141+
if err != nil {
142+
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
143+
return nil, err
144+
}
145+
146+
subFallback, err := subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
147+
if err != nil {
148+
s.logger.Error("Failed to subscribe to new AlignedLayer tasks", "err", err)
149+
return nil, err
150+
}
151+
152+
// create a new channel to foward errors
153+
errorChannel := make(chan error)
154+
155+
pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
156+
157+
// Forward the new tasks to the provided channel
158+
go func() {
159+
defer pollLatestBatchTicker.Stop()
160+
newBatchMutex := &sync.Mutex{}
161+
batchesSet := make(map[[32]byte]struct{})
162+
for {
163+
select {
164+
case newBatch := <-internalChannel:
165+
s.processNewBatchV2(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
166+
case <-pollLatestBatchTicker.C:
167+
latestBatch, err := s.getLatestTaskFromEthereumV2()
168+
if err != nil {
169+
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
170+
continue
171+
}
172+
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
173+
}
174+
}
175+
176+
}()
177+
178+
// Handle errors and resubscribe
179+
go func() {
180+
for {
181+
select {
182+
case err := <-sub.Err():
183+
s.logger.Warn("Error in new task subscription", "err", err)
184+
sub.Unsubscribe()
185+
sub, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManager, internalChannel, s.logger)
186+
if err != nil {
187+
errorChannel <- err
188+
}
189+
case err := <-subFallback.Err():
190+
s.logger.Warn("Error in fallback new task subscription", "err", err)
191+
subFallback.Unsubscribe()
192+
subFallback, err = subscribeToNewTasksV2(s.AvsContractBindings.ServiceManagerFallback, internalChannel, s.logger)
193+
if err != nil {
194+
errorChannel <- err
195+
}
196+
}
197+
}
198+
}()
199+
200+
return errorChannel, nil
201+
}
135202

136203
func subscribeToNewTasks(
137204
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
@@ -154,6 +221,27 @@ func subscribeToNewTasks(
154221

155222
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
156223
}
224+
func subscribeToNewTasksV2(
225+
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
226+
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2,
227+
logger sdklogging.Logger,
228+
) (event.Subscription, error) {
229+
for i := 0; i < MaxRetries; i++ {
230+
sub, err := serviceManager.WatchNewBatchV2(
231+
&bind.WatchOpts{}, newTaskCreatedChan, nil,
232+
)
233+
if err != nil {
234+
logger.Warn("Failed to subscribe to new AlignedLayer tasks", "err", err)
235+
time.Sleep(RetryInterval)
236+
continue
237+
}
238+
239+
logger.Info("Subscribed to new AlignedLayer tasks")
240+
return sub, nil
241+
}
242+
243+
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
244+
}
157245

158246
func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatch, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatch) {
159247
newBatchMutex.Lock()
@@ -263,7 +351,66 @@ func (s *AvsSubscriber) getLatestTaskFromEthereum() (*servicemanager.ContractAli
263351
latestTask.BatchMerkleRoot = lastLog.Topics[1]
264352

265353
return &latestTask, nil
354+
}
355+
func (s *AvsSubscriber) getLatestTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {
356+
latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(context.Background())
357+
if err != nil {
358+
latestBlock, err = s.AvsContractBindings.ethClientFallback.BlockNumber(context.Background())
359+
if err != nil {
360+
return nil, fmt.Errorf("failed to get latest block number: %w", err)
361+
}
362+
}
363+
364+
var fromBlock uint64
365+
366+
if latestBlock < BlockInterval {
367+
fromBlock = 0
368+
} else {
369+
fromBlock = latestBlock - BlockInterval
370+
}
371+
372+
alignedLayerServiceManagerABI, err := abi.JSON(strings.NewReader(servicemanager.ContractAlignedLayerServiceManagerMetaData.ABI))
373+
if err != nil {
374+
return nil, fmt.Errorf("failed to parse ABI: %w", err)
375+
}
376+
377+
// We just care about the NewBatch event
378+
newBatchEvent := alignedLayerServiceManagerABI.Events["NewBatch"]
379+
if newBatchEvent.ID == (ethcommon.Hash{}) {
380+
return nil, fmt.Errorf("NewBatch event not found in ABI")
381+
}
266382

383+
query := ethereum.FilterQuery{
384+
FromBlock: big.NewInt(int64(fromBlock)),
385+
ToBlock: big.NewInt(int64(latestBlock)),
386+
Addresses: []ethcommon.Address{s.AlignedLayerServiceManagerAddr},
387+
Topics: [][]ethcommon.Hash{{newBatchEvent.ID, {}}},
388+
}
389+
390+
logs, err := s.AvsContractBindings.ethClient.FilterLogs(context.Background(), query)
391+
if err != nil {
392+
logs, err = s.AvsContractBindings.ethClientFallback.FilterLogs(context.Background(), query)
393+
if err != nil {
394+
return nil, fmt.Errorf("failed to get logs: %w", err)
395+
}
396+
}
397+
398+
if len(logs) == 0 {
399+
return nil, fmt.Errorf("no logs found")
400+
}
401+
402+
lastLog := logs[len(logs)-1]
403+
404+
var latestTask servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
405+
err = alignedLayerServiceManagerABI.UnpackIntoInterface(&latestTask, "NewBatchV2", lastLog.Data)
406+
if err != nil {
407+
return nil, fmt.Errorf("failed to unpack log data: %w", err)
408+
}
409+
410+
// The second topic is the batch merkle root, as it is an indexed variable in the contract
411+
latestTask.BatchMerkleRoot = lastLog.Topics[1]
412+
413+
return &latestTask, nil
267414
}
268415

269416
func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error {

0 commit comments

Comments
 (0)