Skip to content

Commit 48d7cc8

Browse files
committed
feat: rpc_client SendSignedTaskResponseToAggregatorV2
1 parent 318ef36 commit 48d7cc8

1 file changed

Lines changed: 26 additions & 0 deletions

File tree

operator/pkg/rpc_client.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,29 @@ func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskRespo
6262
}
6363
}
6464
}
65+
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregatorV2(signedTaskResponse *types.SignedTaskResponseV2) {
66+
var reply uint8
67+
for retries := 0; retries < MaxRetries; retries++ {
68+
err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponseV2", signedTaskResponse, &reply)
69+
if err != nil {
70+
c.logger.Error("Received error from aggregator", "err", err)
71+
if errors.Is(err, rpc.ErrShutdown) {
72+
c.logger.Error("Aggregator is shutdown. Reconnecting...")
73+
client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr)
74+
if err != nil {
75+
c.logger.Error("Could not reconnect to aggregator", "err", err)
76+
time.Sleep(RetryInterval)
77+
} else {
78+
c.rpcClient = client
79+
c.logger.Info("Reconnected to aggregator")
80+
}
81+
} else {
82+
c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponseV2 RPC call...", err)
83+
time.Sleep(RetryInterval)
84+
}
85+
} else {
86+
c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply)
87+
return
88+
}
89+
}
90+
}

0 commit comments

Comments
 (0)