Skip to content

Commit 60b1ccd

Browse files
committed
feat: aggregator: sendAggregatedResponseV2, AddNewTaskV2
1 parent 019f7cc commit 60b1ccd

2 files changed

Lines changed: 59 additions & 15 deletions

File tree

aggregator/cmd/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func aggregatorMain(ctx *cli.Context) error {
5656
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks", "err", listenErr)
5757
}
5858
}()
59+
go func() {
60+
listenErr := aggregator.SubscribeToNewTasksV2()
61+
if listenErr != nil {
62+
aggregatorConfig.BaseConfig.Logger.Fatal("Error subscribing for new tasks V2", "err", listenErr)
63+
}
64+
}()
5965

6066
err = aggregator.Start(context.Background())
6167
if err != nil {

aggregator/internal/pkg/aggregator.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -301,25 +301,20 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
301301

302302
// / Sends response to contract and waits for transaction receipt
303303
// / Returns error if it fails to send tx or receipt is not found
304-
func (agg *Aggregator) sendAggregatedResponseV2(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
305-
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
306-
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
307-
304+
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
308305
agg.walletMutex.Lock()
309306
agg.logger.Infof("- Locked Wallet Resources: Sending aggregated response for batch",
310-
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]),
311-
"senderAddress", hex.EncodeToString(senderAddress[:]),
312-
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
307+
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
313308

314-
txHash, err := agg.avsWriter.SendAggregatedResponseV2(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
309+
txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, nonSignerStakesAndSignature)
315310
if err != nil {
316311
agg.walletMutex.Unlock()
317312
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
318313
return nil, err
319314
}
320315

321316
agg.walletMutex.Unlock()
322-
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))
317+
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchMerkleRoot[:]))
323318

324319
receipt, err := utils.WaitForTransactionReceipt(
325320
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
@@ -331,10 +326,7 @@ func (agg *Aggregator) sendAggregatedResponseV2(batchMerkleRoot [32]byte, sender
331326

332327
return receipt, nil
333328
}
334-
335-
// / Sends response to contract and waits for transaction receipt
336-
// / Returns error if it fails to send tx or receipt is not found
337-
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
329+
func (agg *Aggregator) sendAggregatedResponseV2(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
338330
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
339331
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
340332

@@ -344,7 +336,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
344336
"senderAddress", hex.EncodeToString(senderAddress[:]),
345337
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))
346338

347-
txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, nonSignerStakesAndSignature)
339+
txHash, err := agg.avsWriter.SendAggregatedResponseV2(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
348340
if err != nil {
349341
agg.walletMutex.Unlock()
350342
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
@@ -365,7 +357,53 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
365357
return receipt, nil
366358
}
367359

368-
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
360+
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
361+
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task",
362+
"Batch merkle root", "0x"+hex.EncodeToString(batchMerkleRoot[:]))
363+
364+
agg.taskMutex.Lock()
365+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Adding new task")
366+
367+
// --- UPDATE BATCH - INDEX CACHES ---
368+
batchIndex := agg.nextBatchIndex
369+
if _, ok := agg.batchesIdxByIdentifierHash[batchMerkleRoot]; ok {
370+
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
371+
agg.taskMutex.Unlock()
372+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
373+
return
374+
}
375+
376+
// This shouldn't happen, since both maps are updated together
377+
if _, ok := agg.batchesIdentifierHashByIdx[batchIndex]; ok {
378+
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
379+
agg.taskMutex.Unlock()
380+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
381+
return
382+
}
383+
384+
agg.batchesIdxByIdentifierHash[batchMerkleRoot] = batchIndex
385+
agg.batchCreatedBlockByIdx[batchIndex] = uint64(taskCreatedBlock)
386+
agg.batchesIdentifierHashByIdx[batchIndex] = batchMerkleRoot
387+
agg.batchDataByIdentifierHash[batchMerkleRoot] = BatchData{
388+
BatchMerkleRoot: batchMerkleRoot,
389+
SenderAddress: [20]byte{},
390+
}
391+
agg.nextBatchIndex += 1
392+
393+
quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
394+
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}
395+
396+
err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
397+
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
398+
if err != nil {
399+
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
400+
}
401+
402+
agg.taskMutex.Unlock()
403+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
404+
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", "0x"+hex.EncodeToString(batchMerkleRoot[:]))
405+
}
406+
func (agg *Aggregator) AddNewTaskV2(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
369407

370408
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
371409
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

0 commit comments

Comments
 (0)