Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,47 @@ defmodule AlignedLayerServiceManager do
end

def get_latest_block_number() do
Logger.info("Fetching latest block number")
{:ok, num} = Ethers.current_block_number()
Logger.info("Latest block number: #{num}")
num
end

def get_new_batch_events(%{fromBlock: fromBlock, toBlock: toBlock}) do
Logger.info("Fetching new batch events from #{fromBlock} to #{toBlock}")
events =
AlignedLayerServiceManager.EventFilters.new_batch_v3(nil)
|> Ethers.get_logs(fromBlock: fromBlock, toBlock: toBlock)

case events do
{:ok, []} ->
Logger.info("No new batch events found in blocks #{fromBlock}-#{toBlock}")
[]

{:ok, list} ->
Logger.info("Found #{length(list)} new batch events in blocks #{fromBlock}-#{toBlock}")
Enum.map(list, &extract_new_batch_event_info/1)

{:error, reason} ->
Logger.error("Error fetching new batch events from #{fromBlock} to #{toBlock}: #{Map.get(reason, "message")}")
raise("Error fetching events: #{Map.get(reason, "message")}")
end
end

def extract_new_batch_event_info(event) do
block_number = event |> Map.get(:block_number)
tx_hash = event |> Map.get(:transaction_hash)
Logger.info("Extracting new batch event info for block #{block_number}, tx: #{tx_hash}")

new_batch = parse_new_batch_event(event)
Logger.info("New batch event parsed: #{inspect(new_batch)}")

{:ok,
%NewBatchInfo{
address: event |> Map.get(:address),
block_number: event |> Map.get(:block_number),
block_timestamp: get_block_timestamp(event |> Map.get(:block_number)),
transaction_hash: event |> Map.get(:transaction_hash),
block_number: block_number,
block_timestamp: get_block_timestamp(block_number),
transaction_hash: tx_hash,
new_batch: new_batch
}}
end
Expand All @@ -101,29 +112,42 @@ defmodule AlignedLayerServiceManager do
}
end

def is_batch_responded(merkle_root) do
def is_batch_responded(merkle_root, fromBlock) do
Logger.info("Checking if batch is responded for merkle_root: #{merkle_root}, fromBlock: #{fromBlock}")

event =
Utils.string_to_bytes32(merkle_root)
|> AlignedLayerServiceManager.EventFilters.batch_verified()
|> Ethers.get_logs(fromBlock: @first_block)
|> Ethers.get_logs(fromBlock: fromBlock)

case event do
{:error, reason} -> {:error, reason}
{_, []} -> false
{:ok, _} -> true
{:error, reason} ->
Logger.error("Error checking batch response for #{merkle_root}: #{inspect(reason)}")
{:error, reason}
{_, []} ->
Logger.info("Batch #{merkle_root} not responded yet")
false
{:ok, events} ->
Logger.info("Batch #{merkle_root} responded, found #{length(events)} verification events")
true
end
end

# for new batches
def extract_batch_response({_status, %NewBatchInfo{} = batch_creation}) do
created_batch = batch_creation.new_batch
was_batch_responded = is_batch_responded(created_batch.batchMerkleRoot)
Logger.info("Extracting batch response for new batch: #{created_batch.batchMerkleRoot}")
was_batch_responded = is_batch_responded(created_batch.batchMerkleRoot, batch_creation.block_number)

batch_response =
case was_batch_responded do
true -> fetch_batch_response(created_batch.batchMerkleRoot)
true ->
Logger.info("Batch #{created_batch.batchMerkleRoot} was responded, fetching response details")
fetch_batch_response(created_batch.batchMerkleRoot, batch_creation.block_number)
# was not verified, fill with nils
false -> %{block_number: nil, transaction_hash: nil, block_timestamp: nil}
false ->
Logger.info("Batch #{created_batch.batchMerkleRoot} was not responded yet")
%{block_number: nil, transaction_hash: nil, block_timestamp: nil}
end

%BatchDB{
Expand All @@ -138,7 +162,7 @@ defmodule AlignedLayerServiceManager do
response_timestamp: batch_response.block_timestamp,
amount_of_proofs: nil,
proof_hashes: nil,
fee_per_proof: BatcherPaymentServiceManager.get_fee_per_proof(%{merkle_root: created_batch.batchMerkleRoot}),
fee_per_proof: BatcherPaymentServiceManager.get_fee_per_proof(%{merkle_root: created_batch.batchMerkleRoot, fromBlock: batch_creation.block_number}),
sender_address: Utils.string_to_bytes32(created_batch.senderAddress),
max_aggregator_fee: created_batch.maxAggregatorFee,
is_valid: true # set to false later if a process determines it is invalid
Expand All @@ -147,15 +171,18 @@ defmodule AlignedLayerServiceManager do

# for existing but unverified batches
def extract_batch_response(%Batches{} = unverified_batch) do
was_batch_responded = is_batch_responded(unverified_batch.merkle_root)
Logger.info("Extracting batch response for existing unverified batch: #{unverified_batch.merkle_root}")
was_batch_responded = is_batch_responded(unverified_batch.merkle_root, unverified_batch.submission_block_number)

case was_batch_responded do
# Do nothing since unverified batch was not yet verified
false ->
Logger.info("Unverified batch #{unverified_batch.merkle_root} still not responded")
nil

true ->
batch_response = fetch_batch_response(unverified_batch.merkle_root)
Logger.info("Unverified batch #{unverified_batch.merkle_root} now responded, updating status")
batch_response = fetch_batch_response(unverified_batch.merkle_root, unverified_batch.submission_block_number)

%BatchDB{
merkle_root: unverified_batch.merkle_root,
Expand All @@ -177,29 +204,46 @@ defmodule AlignedLayerServiceManager do
end
end

def fetch_batch_response(merkle_root) do
case get_batch_verified_events(%{merkle_root: merkle_root}) do
{:ok, batch_verified_info} -> batch_verified_info
{:empty, _} -> nil
{:error, error} -> raise("Error fetching batch response: #{error}")
def fetch_batch_response(merkle_root, fromBlock \\ @first_block) do
Logger.info("Fetching batch response for merkle_root: #{merkle_root}, fromBlock: #{fromBlock}")
case get_batch_verified_events(%{merkle_root: merkle_root, fromBlock: fromBlock}) do
{:ok, batch_verified_info} ->
Logger.info("Successfully fetched batch response for #{merkle_root}")
batch_verified_info
{:empty, _} ->
Logger.info("No batch verified events found for #{merkle_root}")
nil
{:error, error} ->
Logger.error("Error fetching batch response for #{merkle_root}: #{error}")
raise("Error fetching batch response: #{error}")
end
end

def get_batch_verified_events(%{merkle_root: merkle_root}) do
def get_batch_verified_events(%{merkle_root: merkle_root, fromBlock: fromBlock}) do
Logger.info("Getting batch verified events for merkle_root: #{merkle_root}, fromBlock: #{fromBlock}")
event =
AlignedLayerServiceManager.EventFilters.batch_verified(Utils.string_to_bytes32(merkle_root))
|> Ethers.get_logs(fromBlock: @first_block)
|> Ethers.get_logs(fromBlock: fromBlock)

case event do
{:error, reason} -> {:error, reason}
{_, []} -> {:empty, "No task found"}
{:ok, event} -> extract_batch_verified_event_info(event |> List.first())
{:error, reason} ->
Logger.error("Error getting batch verified events for #{merkle_root}: #{inspect(reason)}")
{:error, reason}
{_, []} ->
Logger.info("No batch verified events found for #{merkle_root}")
{:empty, "No task found"}
{:ok, events} ->
Logger.info("Found #{length(events)} batch verified events for #{merkle_root}")
extract_batch_verified_event_info(events |> List.first())
end
end

defp extract_batch_verified_event_info(event) do
batch_merkle_root = event |> Map.get(:topics_raw) |> Enum.at(1)
sender_address = event |> Map.get(:data) |> Enum.at(0)
block_number = event |> Map.get(:block_number)
tx_hash = event |> Map.get(:transaction_hash)
Logger.info("Extracting batch verified event info for block #{block_number}, tx: #{tx_hash}, merkle_root: #{batch_merkle_root}")

{:ok,
%BatchVerifiedInfo{
Expand All @@ -213,9 +257,15 @@ defmodule AlignedLayerServiceManager do
end

def get_block_timestamp(block_number) do
Logger.info("Fetching block timestamp for block #{block_number}")
case Ethers.Utils.get_block_timestamp(block_number) do
{:ok, timestamp} -> DateTime.from_unix!(timestamp)
{:error, error} -> raise("Error fetching block timestamp: #{error}")
{:ok, timestamp} ->
datetime = DateTime.from_unix!(timestamp)
Logger.info("Block #{block_number} timestamp: #{datetime}")
datetime
{:error, error} ->
Logger.error("Error fetching block timestamp for block #{block_number}: #{error}")
raise("Error fetching block timestamp: #{error}")
end
end

Expand All @@ -230,8 +280,10 @@ defmodule AlignedLayerServiceManager do
end

def update_restakeable_strategies() do
Logger.info("Updating restakeable strategies")
case AlignedLayerServiceManager.get_restakeable_strategies() |> Ethers.call() do
{:ok, restakeable_strategies} ->
Logger.info("Successfully fetched #{length(restakeable_strategies)} restakeable strategies")
Strategies.update(restakeable_strategies)

{:error, error} ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ defmodule BatcherPaymentServiceManager do
@batcher_payment_service_address
end

def get_fee_per_proof(%{merkle_root: merkle_root}) do
def get_fee_per_proof(%{merkle_root: merkle_root, fromBlock: fromBlock}) do
Logger.info("Getting fee per proof for merkle_root: #{merkle_root}, fromBlock: #{fromBlock}")
BatcherPaymentServiceManager.EventFilters.task_created(
merkle_root
|> Utils.string_to_bytes32()
)
|> Ethers.get_logs(fromBlock: @first_block)
|> Ethers.get_logs(fromBlock: fromBlock)
|> case do
{:ok, []} ->
Logger.warning("No fee per proof events found for merkle root: #{merkle_root}.")
Expand All @@ -57,7 +58,7 @@ defmodule BatcherPaymentServiceManager do
{:ok, events} ->
event = events |> hd()
fee_per_proof = event.data |> hd()
Logger.debug("Fee per proof of #{merkle_root}: #{fee_per_proof} WEI.")
Logger.info("Fee per proof of #{merkle_root}: #{fee_per_proof} WEI.")

fee_per_proof

Expand Down