Skip to content

Commit 679ed22

Browse files
committed
feat: add processNewBatchV2 to avs_subscriber
1 parent ca7fd53 commit 679ed22

1 file changed

Lines changed: 21 additions & 1 deletion

File tree

core/chainio/avs_subscriber.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,33 @@ func subscribeToNewTasks(
152152
return sub, nil
153153
}
154154

155-
return nil, fmt.Errorf("Failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
155+
return nil, fmt.Errorf("failed to subscribe to new AlignedLayer tasks after %d retries", MaxRetries)
156156
}
157157

158158
func (s *AvsSubscriber) processNewBatch(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatch, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatch) {
159159
newBatchMutex.Lock()
160160
defer newBatchMutex.Unlock()
161161

162+
if _, ok := batchesSet[batch.BatchMerkleRoot]; !ok {
163+
s.logger.Info("Received new task", "batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]))
164+
batchesSet[batch.BatchMerkleRoot] = struct{}{}
165+
166+
newTaskCreatedChan <- batch
167+
168+
// Remove the batch from the set after RemoveBatchFromSetInterval time
169+
go func() {
170+
time.Sleep(RemoveBatchFromSetInterval)
171+
newBatchMutex.Lock()
172+
delete(batchesSet, batch.BatchMerkleRoot)
173+
newBatchMutex.Unlock()
174+
}()
175+
}
176+
}
177+
178+
func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
179+
newBatchMutex.Lock()
180+
defer newBatchMutex.Unlock()
181+
162182
batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
163183
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
164184

0 commit comments

Comments
 (0)