Skip to content

Commit a3fbc3b

Browse files
committed
refactor retry logic for SendSignedTaskResponseToAggregator + Local testnet works w/ telemetry
1 parent 62f1fff commit a3fbc3b

4 files changed

Lines changed: 129 additions & 24 deletions

File tree

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Common variables for all the services
2+
# 'production' only prints info and above. 'development' also prints debug
3+
environment: 'development'
4+
aligned_layer_deployment_config_file_path: '../contracts/script/output/devnet/alignedlayer_deployment_output.json'
5+
eigen_layer_deployment_config_file_path: '../contracts/script/output/devnet/eigenlayer_deployment_output.json'
6+
eth_rpc_url: 'http://localhost:8545'
7+
eth_rpc_url_fallback: 'http://localhost:8545'
8+
eth_ws_url: 'ws://localhost:8545'
9+
eth_ws_url_fallback: 'ws://localhost:8545'
10+
eigen_metrics_ip_port_address: 'localhost:9090'
11+
12+
## ECDSA Configurations
13+
ecdsa:
14+
private_key_store_path: '../config-files/devnet/keys/operator-1.ecdsa.key.json'
15+
private_key_store_password: ''
16+
17+
## BLS Configurations
18+
bls:
19+
private_key_store_path: '../config-files/devnet/keys/operator-1.bls.key.json'
20+
private_key_store_password: ''
21+
22+
## Operator Configurations
23+
operator:
24+
aggregator_rpc_server_ip_port_address: localhost:8090
25+
operator_tracker_ip_port_address: http://localhost:4001
26+
address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
27+
earnings_receiver_address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
28+
delegation_approver_address: '0x0000000000000000000000000000000000000000'
29+
staker_opt_out_window_blocks: 0
30+
metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json'
31+
enable_metrics: true
32+
metrics_ip_port_address: localhost:9092
33+
max_batch_size: 268435456 # 256 MiB
34+
last_processed_batch_filepath: '../config-files/operator-1.last_processed_batch.json'
35+
36+
# Operators variables needed for register it in EigenLayer
37+
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
38+
private_key_store_path: config-files/devnet/keys/operator-1.ecdsa.key.json
39+
bls_private_key_store_path: config-files/devnet/keys/operator-1.bls.key.json
40+
signer_type: local_keystore
41+
chain_id: 31337

core/retry_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@ import (
1414
"time"
1515

1616
"github.com/Layr-Labs/eigensdk-go/chainio/clients/eth"
17+
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
1718
rpccalls "github.com/Layr-Labs/eigensdk-go/metrics/collectors/rpc_calls"
1819
backoff "github.com/cenkalti/backoff/v4"
1920
"github.com/ethereum/go-ethereum/accounts/abi/bind"
2021
"github.com/ethereum/go-ethereum/common"
2122
"github.com/ethereum/go-ethereum/core/types"
2223
"github.com/prometheus/client_golang/prometheus"
2324
"github.com/stretchr/testify/assert"
25+
aggregator "github.com/yetanotherco/aligned_layer/aggregator/pkg"
2426
servicemanager "github.com/yetanotherco/aligned_layer/contracts/bindings/AlignedLayerServiceManager"
2527
retry "github.com/yetanotherco/aligned_layer/core"
2628
"github.com/yetanotherco/aligned_layer/core/chainio"
2729
"github.com/yetanotherco/aligned_layer/core/config"
30+
core_types "github.com/yetanotherco/aligned_layer/core/types"
2831
"github.com/yetanotherco/aligned_layer/core/utils"
32+
operator "github.com/yetanotherco/aligned_layer/operator/pkg"
2933
)
3034

3135
func DummyFunction(x uint64) (uint64, error) {
@@ -923,3 +927,69 @@ func TestBatchersBalances(t *testing.T) {
923927
return
924928
}
925929
}
930+
931+
func TestSendSignedTaskResponse(t *testing.T) {
932+
cmd, _, err := SetupAnvil(8545)
933+
if err != nil {
934+
t.Errorf("Error setting up Anvil: %s\n", err)
935+
}
936+
937+
aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml")
938+
operatorConfig := config.NewOperatorConfig("../config-files/config-operator-test.yaml")
939+
940+
agg, err := aggregator.NewAggregator(*aggregatorConfig)
941+
if err != nil {
942+
fmt.Errorf("Could not load aggregator %s", err)
943+
}
944+
ctx, cancel := context.WithCancel(context.Background())
945+
go func() {
946+
err = agg.Start(ctx)
947+
if err != nil {
948+
fmt.Errorf("Could not start Aggregator %s", err)
949+
}
950+
}()
951+
if err != nil {
952+
fmt.Errorf("Could not start Aggregator %s", err)
953+
}
954+
955+
logger := operatorConfig.BaseConfig.Logger
956+
rpcClient, err := operator.NewAggregatorRpcClient(operatorConfig.Operator.AggregatorServerIpPortAddress, logger)
957+
// This is a mock to verify that a lost connection to the aggregator does not return anything.
958+
disconnected_rpcClient, err := operator.NewAggregatorRpcClient("localhost:8999", logger)
959+
if err != nil {
960+
fmt.Errorf("Could not create RPC client: %s. Is aggregator running?", err)
961+
}
962+
963+
//zero_bytes := [32]byte{}
964+
signedTaskResponse := &core_types.SignedTaskResponse{
965+
BatchIdentifierHash: [32]byte{},
966+
BatchMerkleRoot: [32]byte{},
967+
SenderAddress: [20]byte{},
968+
BlsSignature: bls.Signature{},
969+
OperatorId: [32]byte{},
970+
}
971+
972+
task_resp_func := operator.SendSignedTaskResponse(rpcClient, signedTaskResponse)
973+
_, err = task_resp_func()
974+
assert.Nil(t, err)
975+
976+
fmt.Printf("stuff")
977+
task_resp_func = operator.SendSignedTaskResponse(disconnected_rpcClient, signedTaskResponse)
978+
// NOTE: This the expected behavior of a net/rpc/clint/Go for an invalid connection ref: https://cs.opensource.google/go/go/+/refs/tags/go1.23.3:src/net/rpc/client.go;l=298
979+
assert.Panics(t, func() { _, err = task_resp_func() })
980+
fmt.Printf("error: %s", err)
981+
assert.Nil(t, err)
982+
983+
task_resp_func = operator.SendSignedTaskResponse(rpcClient, signedTaskResponse)
984+
_, err = task_resp_func()
985+
assert.Nil(t, err)
986+
987+
// Kill Aggregator
988+
cancel()
989+
990+
if err := cmd.Process.Kill(); err != nil {
991+
t.Errorf("Error killing process: %v\n", err)
992+
return
993+
}
994+
995+
}

operator/pkg/operator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (o *Operator) handleNewBatchLogV2(newBatchLog *servicemanager.ContractAlign
349349
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
350350
)
351351

352-
o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
352+
o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
353353
}
354354
func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) error {
355355

@@ -430,7 +430,7 @@ func (o *Operator) handleNewBatchLogV3(newBatchLog *servicemanager.ContractAlign
430430
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
431431
)
432432

433-
o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
433+
o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
434434
}
435435
func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) error {
436436

operator/pkg/rpc_client.go

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/Layr-Labs/eigensdk-go/logging"
9+
retry "github.com/yetanotherco/aligned_layer/core"
910
"github.com/yetanotherco/aligned_layer/core/types"
1011
)
1112

@@ -16,11 +17,6 @@ type AggregatorRpcClient struct {
1617
logger logging.Logger
1718
}
1819

19-
const (
20-
MaxRetries = 10
21-
RetryInterval = 10 * time.Second
22-
)
23-
2420
func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) (*AggregatorRpcClient, error) {
2521
client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr)
2622
if err != nil {
@@ -34,31 +30,29 @@ func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger)
3430
}, nil
3531
}
3632

37-
// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
38-
// their signed task response.
39-
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *types.SignedTaskResponse) {
40-
var reply uint8
41-
for retries := 0; retries < MaxRetries; retries++ {
33+
func SendSignedTaskResponse(c *AggregatorRpcClient, signedTaskResponse *types.SignedTaskResponse) func() (uint8, error) {
34+
send_task_func := func() (uint8, error) {
35+
var reply uint8
4236
err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponseV2", signedTaskResponse, &reply)
4337
if err != nil {
4438
c.logger.Error("Received error from aggregator", "err", err)
4539
if errors.Is(err, rpc.ErrShutdown) {
4640
c.logger.Error("Aggregator is shutdown. Reconnecting...")
47-
client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr)
48-
if err != nil {
49-
c.logger.Error("Could not reconnect to aggregator", "err", err)
50-
time.Sleep(RetryInterval)
51-
} else {
52-
c.rpcClient = client
53-
c.logger.Info("Reconnected to aggregator")
54-
}
55-
} else {
56-
c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponseV2 RPC call...", err)
57-
time.Sleep(RetryInterval)
5841
}
5942
} else {
6043
c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply)
61-
return
6244
}
45+
return reply, err
6346
}
47+
return send_task_func
48+
}
49+
50+
// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
51+
// their signed task response.
52+
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregatorRetryable(signedTaskResponse *types.SignedTaskResponse) (uint8, error) {
53+
config := retry.DefaultRetryConfig()
54+
config.NumRetries = 10
55+
config.Multiplier = 1 // Constant retry interval
56+
config.InitialInterval = 10 * time.Second
57+
return retry.RetryWithData(SendSignedTaskResponse(c, signedTaskResponse), config)
6458
}

0 commit comments

Comments
 (0)