Skip to content

Commit 0038673

Browse files
committed
feat: subscriber SubscribeToNewTasksV2
1 parent 60b1ccd commit 0038673

3 files changed

Lines changed: 21 additions & 3 deletions

File tree

aggregator/internal/pkg/aggregator.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,6 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uin
404404
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", "0x"+hex.EncodeToString(batchMerkleRoot[:]))
405405
}
406406
func (agg *Aggregator) AddNewTaskV2(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
407-
408407
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
409408
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
410409

aggregator/internal/pkg/subscriber.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,29 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
1616
return err
1717
}
1818
case newBatch := <-agg.NewBatchChan:
19-
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.SenderAddress, newBatch.TaskCreatedBlock)
19+
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
2020
}
2121
}
22+
}
23+
func (agg *Aggregator) SubscribeToNewTasksV2() error {
24+
err := agg.subscribeToNewTasks()
25+
if err != nil {
26+
return err
27+
}
28+
29+
for {
30+
select {
31+
case err := <-agg.taskSubscriber:
32+
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)
2233

34+
err = agg.subscribeToNewTasks()
35+
if err != nil {
36+
return err
37+
}
38+
case newBatch := <-agg.NewBatchChanV2:
39+
agg.AddNewTaskV2(newBatch.BatchMerkleRoot, newBatch.SenderAddress, newBatch.TaskCreatedBlock)
40+
}
41+
}
2342
}
2443

2544
func (agg *Aggregator) subscribeToNewTasks() error {

operator/pkg/operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
9595

9696
rpcClient, err := NewAggregatorRpcClient(configuration.Operator.AggregatorServerIpPortAddress, logger)
9797
if err != nil {
98-
return nil, fmt.Errorf("Could not create RPC client: %s. Is aggregator running?", err)
98+
return nil, fmt.Errorf("could not create RPC client: %s. Is aggregator running?", err)
9999
}
100100

101101
operatorId := eigentypes.OperatorIdFromKeyPair(configuration.BlsConfig.KeyPair)

0 commit comments

Comments
 (0)