@@ -3,10 +3,12 @@ package pkg
33import (
44 "context"
55 "encoding/hex"
6- gethtypes "github.com/ethereum/go-ethereum/core/types "
6+ "fmt "
77 "sync"
88 "time"
99
10+ gethtypes "github.com/ethereum/go-ethereum/core/types"
11+
1012 "github.com/prometheus/client_golang/prometheus"
1113 "github.com/yetanotherco/aligned_layer/metrics"
1214
@@ -15,7 +17,7 @@ import (
1517 "github.com/Layr-Labs/eigensdk-go/logging"
1618 "github.com/Layr-Labs/eigensdk-go/services/avsregistry"
1719 blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
18- oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorpubkeys "
20+ oppubkeysserv "github.com/Layr-Labs/eigensdk-go/services/operatorsinfo "
1921 eigentypes "github.com/Layr-Labs/eigensdk-go/types"
2022 "github.com/ethereum/go-ethereum/event"
2123 servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
@@ -56,6 +58,12 @@ type Aggregator struct {
5658 // Stores the taskCreatedBlock for each batch bt batch index
5759 batchCreatedBlockByIdx map [uint32 ]uint64
5860
61+ // Stores if an operator already submitted a response for a batch
62+ // This is to avoid double submissions
63+ // struct{} is used as a placeholder because it is the smallest type
64+ // go does not have a set type
65+ operatorRespondedBatch map [uint32 ]map [eigentypes.Bytes32 ]struct {}
66+
5967 // This task index is to communicate with the local BLS
6068 // Service.
6169 // Note: In case of a reboot it can start from 0 again
@@ -113,9 +121,24 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
113121 return nil , err
114122 }
115123
116- operatorPubkeysService := oppubkeysserv .NewOperatorPubkeysServiceInMemory (context .Background (), clients .AvsRegistryChainSubscriber , clients .AvsRegistryChainReader , logger )
117- avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (avsReader .AvsRegistryReader , operatorPubkeysService , logger )
118- blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , logger )
124+ // This is a dummy "hash function" made to fulfill the BLS aggregator service API requirements.
125+ // When operators respond to a task, a call to `ProcessNewSignature` is made. In `v0.1.6` of the eigensdk,
126+ // this function required an argument `TaskResponseDigest`, which has changed to just `TaskResponse` in v0.1.9.
127+ // The digest we used in v0.1.6 was just the batch merkle root. To continue with the same idea, the hashing
128+ // function is set as the following one, which does nothing more than output the input it receives, which in
129+ // our case will be the batch merkle root. If wanted, we could define a real hash function here but there should
130+ // not be any need to re-hash the batch merkle root.
131+ hashFunction := func (taskResponse eigentypes.TaskResponse ) (eigentypes.TaskResponseDigest , error ) {
132+ taskResponseDigest , ok := taskResponse .([32 ]byte )
133+ if ! ok {
134+ return eigentypes.TaskResponseDigest {}, fmt .Errorf ("TaskResponse is not a 32-byte value" )
135+ }
136+ return taskResponseDigest , nil
137+ }
138+
139+ operatorPubkeysService := oppubkeysserv .NewOperatorsInfoServiceInMemory (context .Background (), clients .AvsRegistryChainSubscriber , clients .AvsRegistryChainReader , nil , logger )
140+ avsRegistryService := avsregistry .NewAvsRegistryServiceChainCaller (avsReader .ChainReader , operatorPubkeysService , logger )
141+ blsAggregationService := blsagg .NewBlsAggregatorService (avsRegistryService , hashFunction , logger )
119142
120143 // Metrics
121144 reg := prometheus .NewRegistry ()
@@ -133,6 +156,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
133156 batchesRootByIdx : batchesRootByIdx ,
134157 batchesIdxByRoot : batchesIdxByRoot ,
135158 batchCreatedBlockByIdx : batchCreatedBlockByIdx ,
159+ operatorRespondedBatch : make (map [uint32 ]map [eigentypes.Bytes32 ]struct {}),
136160 nextBatchIndex : nextBatchIndex ,
137161 taskMutex : & sync.Mutex {},
138162 walletMutex : & sync.Mutex {},
@@ -182,7 +206,16 @@ const MaxSentTxRetries = 5
182206
183207func (agg * Aggregator ) handleBlsAggServiceResponse (blsAggServiceResp blsagg.BlsAggregationServiceResponse ) {
184208 if blsAggServiceResp .Err != nil {
185- agg .logger .Warn ("BlsAggregationServiceResponse contains an error" , "err" , blsAggServiceResp .Err )
209+ agg .taskMutex .Lock ()
210+ batchMerkleRoot := agg .batchesRootByIdx [blsAggServiceResp .TaskIndex ]
211+ agg .logger .Error ("BlsAggregationServiceResponse contains an error" , "err" , blsAggServiceResp .Err , "merkleRoot" , hex .EncodeToString (batchMerkleRoot [:]))
212+ agg .logger .Info ("- Locking task mutex: Delete task from operator map" , "taskIndex" , blsAggServiceResp .TaskIndex )
213+
214+ // Remove task from the list of tasks
215+ delete (agg .operatorRespondedBatch , blsAggServiceResp .TaskIndex )
216+
217+ agg .logger .Info ("- Unlocking task mutex: Delete task from operator map" , "taskIndex" , blsAggServiceResp .TaskIndex )
218+ agg .taskMutex .Unlock ()
186219 return
187220 }
188221 nonSignerPubkeys := []servicemanager.BN254G1Point {}
@@ -209,13 +242,16 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
209242 agg .AggregatorConfig .BaseConfig .Logger .Info ("- Locked Resources: Fetching merkle root" )
210243 batchMerkleRoot := agg .batchesRootByIdx [blsAggServiceResp .TaskIndex ]
211244 taskCreatedBlock := agg .batchCreatedBlockByIdx [blsAggServiceResp .TaskIndex ]
245+
246+ // Delete the task from the map
247+ delete (agg .operatorRespondedBatch , blsAggServiceResp .TaskIndex )
248+
212249 agg .AggregatorConfig .BaseConfig .Logger .Info ("- Unlocked Resources: Fetching merkle root" )
213250 agg .taskMutex .Unlock ()
214251
215252 agg .logger .Info ("Threshold reached" , "taskIndex" , blsAggServiceResp .TaskIndex ,
216253 "merkleRoot" , hex .EncodeToString (batchMerkleRoot [:]))
217254
218-
219255 currentBlock , err := agg .AggregatorConfig .BaseConfig .EthRpcClient .BlockNumber (context .Background ())
220256 if err != nil {
221257 agg .logger .Error ("Error getting current block number" , "err" , err )
@@ -270,10 +306,8 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
270306 "merkleRoot" , hex .EncodeToString (batchMerkleRoot [:]))
271307}
272308
273-
274-
275- /// Sends response to contract and waits for transaction receipt
276- /// Returns error if it fails to send tx or receipt is not found
309+ // / Sends response to contract and waits for transaction receipt
310+ // / Returns error if it fails to send tx or receipt is not found
277311func (agg * Aggregator ) sendAggregatedResponse (batchMerkleRoot [32 ]byte , nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature ) (* gethtypes.Receipt , error ) {
278312 agg .walletMutex .Lock ()
279313 agg .logger .Infof ("- Locked Wallet Resources: Sending aggregated response for batch %s" , hex .EncodeToString (batchMerkleRoot [:]))
@@ -299,7 +333,6 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSigne
299333 return receipt , nil
300334}
301335
302-
303336func (agg * Aggregator ) AddNewTask (batchMerkleRoot [32 ]byte , taskCreatedBlock uint32 ) {
304337 agg .AggregatorConfig .BaseConfig .Logger .Info ("Adding new task" ,
305338 "Batch merkle root" , hex .EncodeToString (batchMerkleRoot [:]))
0 commit comments