Skip to content

Commit 99b503f

Browse files
committed
feat: aggregator v1 v2 discerning
1 parent 48d7cc8 commit 99b503f

1 file changed

Lines changed: 28 additions & 11 deletions

File tree

aggregator/internal/pkg/aggregator.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type BatchData struct {
4343
type Aggregator struct {
4444
AggregatorConfig *config.AggregatorConfig
4545
NewBatchChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch
46+
NewBatchChanV2 chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
4647
avsReader *chainio.AvsReader
4748
avsSubscriber *chainio.AvsSubscriber
4849
avsWriter *chainio.AvsWriter
@@ -207,7 +208,7 @@ func (agg *Aggregator) Start(ctx context.Context) error {
207208
case blsAggServiceResp := <-agg.blsAggregationService.GetResponseChannel():
208209
agg.logger.Info("Received response from BLS aggregation service",
209210
"taskIndex", blsAggServiceResp.TaskIndex)
210-
211+
211212
go agg.handleBlsAggServiceResponse(blsAggServiceResp)
212213
}
213214
}
@@ -278,17 +279,33 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
278279
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
279280

280281
for i := 0; i < MaxSentTxRetries; i++ {
281-
_, err = agg.sendAggregatedResponse(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
282-
if err == nil {
283-
agg.logger.Info("Aggregator successfully responded to task",
284-
"taskIndex", blsAggServiceResp.TaskIndex,
285-
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
286-
287-
return
282+
if true { //V1
283+
_, err = agg.sendAggregatedResponse(batchData.BatchMerkleRoot, nonSignerStakesAndSignature)
284+
if err == nil {
285+
agg.logger.Info("Aggregator successfully responded to task",
286+
"taskIndex", blsAggServiceResp.TaskIndex,
287+
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
288+
289+
return
290+
}
291+
292+
// Sleep for a bit before retrying
293+
time.Sleep(2 * time.Second)
294+
295+
} else { //V2
296+
_, err = agg.sendAggregatedResponseV2(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
297+
if err == nil {
298+
agg.logger.Info("Aggregator successfully responded to task",
299+
"taskIndex", blsAggServiceResp.TaskIndex,
300+
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
301+
302+
return
303+
}
304+
305+
// Sleep for a bit before retrying
306+
time.Sleep(2 * time.Second)
288307
}
289308

290-
// Sleep for a bit before retrying
291-
time.Sleep(2 * time.Second)
292309
}
293310

294311
agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
@@ -309,7 +326,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSigne
309326
txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, nonSignerStakesAndSignature)
310327
if err != nil {
311328
agg.walletMutex.Unlock()
312-
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
329+
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchMerkleRoot[:]), err)
313330
return nil, err
314331
}
315332

0 commit comments

Comments
 (0)