Skip to content

Commit e1088d3

Browse files
committed
feat: server ProcessOperatorSignedTaskResponseV2
1 parent 0038673 commit e1088d3

1 file changed

Lines changed: 91 additions & 0 deletions

File tree

aggregator/internal/pkg/server.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,97 @@ func (agg *Aggregator) ServeOperators() error {
4949
// - 0: Success
5050
// - 1: Error
5151
func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {
52+
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
53+
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
54+
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
55+
56+
taskIndex := uint32(0)
57+
ok := false
58+
59+
for i := 0; i < waitForEventRetries; i++ {
60+
agg.taskMutex.Lock()
61+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
62+
taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchMerkleRoot]
63+
if !ok {
64+
agg.taskMutex.Unlock()
65+
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
66+
time.Sleep(waitForEventSleepSeconds)
67+
} else {
68+
break
69+
}
70+
}
71+
72+
if !ok {
73+
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
74+
*reply = 1
75+
return nil
76+
}
77+
78+
// Note: we already have lock here
79+
agg.logger.Debug("- Checking if operator already responded")
80+
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
81+
if !ok {
82+
batchResponses = make(map[eigentypes.Bytes32]struct{})
83+
agg.operatorRespondedBatch[taskIndex] = batchResponses
84+
}
85+
86+
if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
87+
*reply = 0
88+
agg.logger.Warn("Operator already responded, ignoring",
89+
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
90+
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))
91+
92+
agg.taskMutex.Unlock()
93+
return nil
94+
}
95+
96+
batchResponses[signedTaskResponse.OperatorId] = struct{}{}
97+
98+
// Don't wait infinitely if it can't answer
99+
// Create a context with a timeout of 5 seconds
100+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
101+
defer cancel() // Ensure the cancel function is called to release resources
102+
103+
// Create a channel to signal when the task is done
104+
done := make(chan struct{})
105+
106+
agg.logger.Info("Starting bls signature process")
107+
go func() {
108+
err := agg.blsAggregationService.ProcessNewSignature(
109+
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
110+
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
111+
)
112+
113+
if err != nil {
114+
agg.logger.Warnf("BLS aggregation service error: %s", err)
115+
// remove operator from the list of operators that responded
116+
// so that it can try again
117+
delete(batchResponses, signedTaskResponse.OperatorId)
118+
} else {
119+
agg.logger.Info("BLS process succeeded")
120+
}
121+
122+
close(done)
123+
}()
124+
125+
*reply = 1
126+
// Wait for either the context to be done or the task to complete
127+
select {
128+
case <-ctx.Done():
129+
// The context's deadline was exceeded or it was canceled
130+
agg.logger.Info("Bls process timed out, operator signature will be lost. Batch may not reach quorum")
131+
case <-done:
132+
// The task completed successfully
133+
agg.logger.Info("Bls context finished correctly")
134+
*reply = 0
135+
}
136+
137+
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
138+
agg.taskMutex.Unlock()
139+
140+
return nil
141+
}
142+
func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *types.SignedTaskResponseV2, reply *uint8) error {
52143
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
53144
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
54145
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),

0 commit comments

Comments
 (0)