Skip to content

Commit 318ef36

Browse files
committed
feat: operator NewTaskCreatedChanV2, and discerning ProcessNewBatchLogV2
1 parent e1088d3 commit 318ef36

1 file changed

Lines changed: 31 additions & 22 deletions

File tree

operator/pkg/operator.go

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type Operator struct {
4646
OperatorId eigentypes.OperatorId
4747
avsSubscriber chainio.AvsSubscriber
4848
NewTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch
49+
NewTaskCreatedChanV2 chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
4950
Logger logging.Logger
5051
aggRpcClient AggregatorRpcClient
5152
metricsReg *prometheus.Registry
@@ -92,6 +93,7 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
9293
log.Fatalf("Could not create AVS subscriber")
9394
}
9495
newTaskCreatedChan := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch)
96+
newTaskCreatedChanV2 := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)
9597

9698
rpcClient, err := NewAggregatorRpcClient(configuration.Operator.AggregatorServerIpPortAddress, logger)
9799
if err != nil {
@@ -111,6 +113,7 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
111113
avsSubscriber: *avsSubscriber,
112114
Address: address,
113115
NewTaskCreatedChan: newTaskCreatedChan,
116+
NewTaskCreatedChanV2: newTaskCreatedChanV2,
114117
aggRpcClient: *rpcClient,
115118
OperatorId: operatorId,
116119
metricsReg: reg,
@@ -125,6 +128,9 @@ func NewOperatorFromConfig(configuration config.OperatorConfig) (*Operator, erro
125128
func (o *Operator) SubscribeToNewTasks() (chan error, error) {
126129
return o.avsSubscriber.SubscribeToNewTasks(o.NewTaskCreatedChan)
127130
}
131+
func (o *Operator) SubscribeToNewTasksV2() (chan error, error) {
132+
return o.avsSubscriber.SubscribeToNewTasksV2(o.NewTaskCreatedChanV2)
133+
}
128134

129135
func (o *Operator) Start(ctx context.Context) error {
130136
sub, err := o.SubscribeToNewTasks()
@@ -154,32 +160,11 @@ func (o *Operator) Start(ctx context.Context) error {
154160
}
155161
case newBatchLog := <-o.NewTaskCreatedChan:
156162
err := o.ProcessNewBatchLog(newBatchLog)
157-
// err := o.ProcessNewBatchLogV2(newBatchLog)
158163
if err != nil {
159164
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLog.BatchMerkleRoot, err)
160165
continue
161166
}
162167

163-
// V2
164-
// batchIdentifier := append(newBatchLog.BatchMerkleRoot[:], newBatchLog.SenderAddress[:]...)
165-
// var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
166-
// responseSignature := o.SignTaskResponse(batchIdentifierHash)
167-
168-
// V2
169-
// signedTaskResponse := types.SignedTaskResponse{
170-
// BatchIdentifierHash: batchIdentifierHash,
171-
// BatchMerkleRoot: newBatchLog.BatchMerkleRoot,
172-
// SenderAddress: newBatchLog.SenderAddress,
173-
// BlsSignature: *responseSignature,
174-
// OperatorId: o.OperatorId,
175-
// }
176-
// o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
177-
// hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
178-
// hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
179-
// hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
180-
// )
181-
182-
183168
responseSignature := o.SignTaskResponse(newBatchLog.BatchMerkleRoot)
184169
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
185170

@@ -188,11 +173,35 @@ func (o *Operator) Start(ctx context.Context) error {
188173
BlsSignature: *responseSignature,
189174
OperatorId: o.OperatorId,
190175
}
191-
192176
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
193177
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
194178
)
195179
go o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
180+
case newBatchLogV2 := <-o.NewTaskCreatedChanV2:
181+
err := o.ProcessNewBatchLogV2(newBatchLogV2)
182+
if err != nil {
183+
o.Logger.Infof("batch %x did not verify. Err: %v", newBatchLogV2.BatchMerkleRoot, err)
184+
continue
185+
}
186+
187+
batchIdentifier := append(newBatchLogV2.BatchMerkleRoot[:], newBatchLogV2.SenderAddress[:]...)
188+
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))
189+
responseSignature := o.SignTaskResponse(batchIdentifierHash)
190+
o.Logger.Debugf("responseSignature about to send: %x", responseSignature)
191+
192+
signedTaskResponse := types.SignedTaskResponseV2{
193+
BatchIdentifierHash: batchIdentifierHash,
194+
BatchMerkleRoot: newBatchLogV2.BatchMerkleRoot,
195+
SenderAddress: newBatchLogV2.SenderAddress,
196+
BlsSignature: *responseSignature,
197+
OperatorId: o.OperatorId,
198+
}
199+
o.Logger.Infof("Signed Task Response to send: BatchIdentifierHash=%s, BatchMerkleRoot=%s, SenderAddress=%s",
200+
hex.EncodeToString(signedTaskResponse.BatchIdentifierHash[:]),
201+
hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
202+
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
203+
)
204+
go o.aggRpcClient.SendSignedTaskResponseToAggregatorV2(&signedTaskResponse)
196205
}
197206
}
198207
}

0 commit comments

Comments
 (0)