From 9802b408f06039b0321de17a9a20ed5978f31d1f Mon Sep 17 00:00:00 2001 From: Will Meister Date: Thu, 3 Jan 2019 12:39:37 -0600 Subject: [PATCH 1/5] [WIP] Fast Sync initial commit. Still need validation, block processing, and end case handling. --- apps/evm/lib/evm/log_entry.ex | 20 +++ .../ex_wire/packet/capability/eth/receipts.ex | 22 ++- .../ex_wire/lib/ex_wire/struct/block_queue.ex | 163 ++++++++++++++++-- apps/ex_wire/lib/ex_wire/sync.ex | 80 ++++++++- 4 files changed, 264 insertions(+), 21 deletions(-) diff --git a/apps/evm/lib/evm/log_entry.ex b/apps/evm/lib/evm/log_entry.ex index 3a7405f55..b31e88ca5 100644 --- a/apps/evm/lib/evm/log_entry.ex +++ b/apps/evm/lib/evm/log_entry.ex @@ -59,6 +59,26 @@ defmodule EVM.LogEntry do } end + @spec serialize(t) :: ExRLP.t() + def serialize(log) do + [ + log.address, + log.topics, + log.data + ] + end + + @spec deserialize(ExRLP.t()) :: t + def deserialize(rlp) do + [ + address, + topics, + data + ] = rlp + + new(address, topics, data) + end + @doc """ Converts log struct to standard Ethereum list representation. diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex index cfec882f3..b61d4f789 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -11,18 +11,19 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do """ require Logger + alias Blockchain.Transaction.Receipt @behaviour ExWire.Packet @type t :: %__MODULE__{ - receipts: [any()] + receipts: [[Receipt.t()]] } defstruct [ :receipts ] - @spec new([any()]) :: t() + @spec new([Receipt.t()]) :: t() def new(receipts) do %__MODULE__{ receipts: receipts @@ -45,7 +46,11 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do @impl true @spec serialize(t) :: ExRLP.t() def serialize(packet = %__MODULE__{}) do - for receipt <- packet.receipts, do: receipt + for receipts <- packet.receipts do + for receipt <- receipts do + Receipt.serialize(receipt) + end + end end @doc """ @@ -55,9 +60,16 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do @impl true @spec deserialize(ExRLP.t()) :: t def deserialize(rlp) do - receipts = for receipt <- rlp, do: receipt + block_receipts = + for receipts <- rlp do + for receipt <- receipts do + Receipt.deserialize(receipt) + end + end - new(receipts) + :ok = Logger.info("[RECEIPTS] Decoded Receipt!!!!!!!!!!") + + new(block_receipts) end @doc """ diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 3d884c817..3e7494892 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -18,10 +18,12 @@ defmodule ExWire.Struct.BlockQueue do alias Block.Header alias ExWire.Struct.Block, as: BlockStruct alias Blockchain.{Block, Blocktree, Chain} + alias Blockchain.Transaction.Receipt alias MerklePatriciaTree.Trie require Logger + @max_receipts_to_request 500 # These will be used to help us determine if a block is empty @empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash() @empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec() @@ -29,11 +31,16 @@ defmodule ExWire.Struct.BlockQueue do defstruct queue: %{}, backlog: %{}, do_validation: true, - block_numbers: MapSet.new() + block_numbers: MapSet.new(), + fast_sync_in_progress: false, + block_receipts_set: MapSet.new(), + block_receipts_to_request: [], + block_receipts_requested: [] @type block_item :: %{ commitments: list(binary()), block: Block.t(), + receipts_added: boolean(), ready: boolean() } @@ -45,7 +52,11 @@ defmodule ExWire.Struct.BlockQueue do queue: %{integer() => block_map}, backlog: %{EVM.hash() => list(Block.t())}, do_validation: boolean(), - block_numbers: MapSet.t() + block_numbers: MapSet.t(), + fast_sync_in_progress: boolean(), + block_receipts_set: MapSet.t(), + block_receipts_to_request: [EVM.hash()], + block_receipts_requested: [EVM.hash()] } @doc """ @@ -65,7 +76,12 @@ defmodule ExWire.Struct.BlockQueue do Trie.t() ) :: {t, Blocktree.t(), Trie.t(), boolean()} def add_header( - block_queue = %__MODULE__{queue: queue}, + block_queue = %__MODULE__{ + queue: queue, + fast_sync_in_progress: fast_sync_in_progress, + block_receipts_set: block_receipts_set, + block_receipts_to_request: block_receipts_to_request + }, block_tree, header, header_hash, @@ -74,8 +90,9 @@ defmodule ExWire.Struct.BlockQueue do trie ) do block_map = Map.get(queue, header.number, %{}) + header_num_and_hash = {header.number, header_hash} - {block_map, should_request_body} = + {block_map, should_request_body, receipts_to_request, receipts_set} = case Map.get(block_map, header_hash) do nil -> # may already be ready, already. @@ -85,28 +102,56 @@ defmodule ExWire.Struct.BlockQueue do Map.put(block_map, header_hash, %{ commitments: MapSet.new([remote_id]), block: %Block{header: header}, + receits_added: false, ready: is_empty }) - {block_map, not is_empty} + {receipts_set, receipts_to_request} = + if fast_sync_in_progress do + { + MapSet.put(block_receipts_set, header_num_and_hash), + [block_queue.block_receipts_to_request | header_num_and_hash] + } + else + {block_receipts_set, block_receipts_to_request} + end + + {block_map, not is_empty, receipts_to_request, receipts_set} block_item -> + {receipts_set, receipts_to_request} = + if fast_sync_in_progress and Enum.empty?(block_item.block.receipts) and + not MapSet.member?(block_receipts_set, header_num_and_hash) do + { + MapSet.put(block_receipts_set, header_num_and_hash), + [block_queue.block_receipts_to_request | header_num_and_hash] + } + else + {block_receipts_set, block_receipts_to_request} + end + {Map.put(block_map, header_hash, %{ block_item | commitments: MapSet.put(block_item.commitments, remote_id) - }), false} + }), false, receipts_to_request, receipts_set} end updated_block_queue = %{ block_queue | queue: Map.put(queue, header.number, block_map), - block_numbers: MapSet.put(block_queue.block_numbers, header.number) + block_numbers: MapSet.put(block_queue.block_numbers, header.number), + block_receipts_set: receipts_set, + block_receipts_to_request: receipts_to_request } - {new_block_queue, new_block_tree, new_trie} = - process_block_queue(updated_block_queue, block_tree, chain, trie) + if fast_sync_in_progress do + {updated_block_queue, block_tree, trie, should_request_body} + else + {new_block_queue, new_block_tree, new_trie} = + process_block_queue(updated_block_queue, block_tree, chain, trie) - {new_block_queue, new_block_tree, new_trie, should_request_body} + {new_block_queue, new_block_tree, new_trie, should_request_body} + end end @doc """ @@ -164,6 +209,104 @@ defmodule ExWire.Struct.BlockQueue do process_block_queue(updated_block_queue, block_tree, chain, trie) end + @doc """ + Returns the collection of block hashes for which Receipts are needed, as well as the + updated BlockQueue accounting for the requested hashes, if fast sync is in progress and + a request is not already in flight. + """ + @spec get_receipts_to_request(t()) :: {:ok, [EVM.hash()], t()} | :do_not_request + def get_receipts_to_request( + block_queue = %__MODULE__{ + fast_sync_in_progress: is_fast, + block_receipts_to_request: to_request, + block_receipts_requested: requested + } + ) do + if is_fast and Enum.empty?(requested) and not Enum.empty?(to_request) do + {new_requests, to_request_tail} = Enum.split(to_request, @max_receipts_to_request) + + { + :ok, + new_requests |> Enum.map(fn {_number, hash} -> hash end), + %{ + block_queue + | block_receipts_to_request: to_request_tail, + block_receipts_requested: new_requests + } + } + else + # TODO: check if we're done with Fast Sync and update BlockQueue.fast_sync_in_progress + :do_not_request + end + end + + @doc """ + Processes the provided Receipts, verifying them against stored Headers and adding + them to the Blocks stored in the BlockQueue. + This will return the updated BlockQueue and the hashes of the blocks to request Receipts for next. + """ + @spec add_receipts(t(), [[Receipt.t()]]) :: {t(), [EVM.hash()]} | {t(), []} + def add_receipts(queue = %__MODULE__{block_receipts_requested: req}, receipts) + when length(req) != length(receipts) do + :ok = + Logger.warn(fn -> + "[Block Queue] Received Receipts of different length than requested. Cannot match them to blocks. Receipts # [#{ + Enum.count(receipts) + }], Requested # [#{Enum.count(req)}]" + end) + + {queue, req} + end + + def add_receipts( + block_queue = %__MODULE__{ + queue: queue, + block_receipts_set: block_receipts_set, + block_receipts_requested: requested + }, + block_receipts + ) do + number_hash_tuple_receipts = Enum.zip(requested, block_receipts) + + updated_queue = + Enum.reduce(number_hash_tuple_receipts, block_queue, fn {{number, hash}, receipts}, + updated_queue -> + block_map = Map.get(queue, number) + block_item = Map.get(block_map, hash) + block = Map.get(block_item, :block) + updated_block = %{block | receipts: receipts} + + # TODO: Build Trie and verify that Receipts Root matches header.receipts_root + + Map.put( + updated_queue, + number, + Map.put(block_map, hash, %{ + block_item + | receipts_added: true, + block: updated_block + }) + ) + end) + + updated_receipts_set = MapSet.difference(block_receipts_set, MapSet.new(requested)) + + updated_block_queue = %{ + block_queue + | queue: updated_queue, + block_receipts_requested: [], + block_receipts_set: updated_receipts_set + } + + case get_receipts_to_request(updated_block_queue) do + {:ok, hashes, block_queue_to_return} -> + {block_queue_to_return, hashes} + + :do_not_request -> + {updated_block_queue, []} + end + end + @doc """ Processes a the block queue, adding any blocks which are complete and pass the number of confirmations to the block tree. These blocks are then removed diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 330c1e514..b85d9be7b 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -28,7 +28,9 @@ defmodule ExWire.Sync do BlockBodies, BlockHeaders, GetBlockBodies, - GetBlockHeaders + GetBlockHeaders, + GetReceipts, + Receipts } alias ExWire.Packet.Capability.Par.{ @@ -128,6 +130,21 @@ defmodule ExWire.Sync do {:processed_block_chunk, chunk_hash, processed_blocks, block}, state = %{warp_queue: warp_queue} ) do + # + # ###### + # ## TODO: REMOVE THIS !!!! + # ###### + # + # if block do + # block_hashes = [ + # Block.hash(block), + # Exth.decode_hex("0x8e9726d8a4fd1ece78e2cac3aa0eb73f22444c99320c24cc879440398304fb60"), + # Exth.decode_hex("0x4a4e2470a6b55b2af27dfdf5562ba1ceafa6f75d92e0859b2db6225184a74f9b") + # ] + # :ok = Logger.debug(fn -> "[Sync] Sending GetReceipts request: #{Exth.encode_hex(Block.hash(block))}, Header: #{Header.to_string(block.header)}" end) + # true = send_with_retry(GetReceipts.new(block_hashes), :random, :request_receipts) + # end + next_state = warp_queue |> WarpQueue.processed_block_chunk(chunk_hash, block, processed_blocks) @@ -204,6 +221,10 @@ defmodule ExWire.Sync do {:noreply, handle_snapshot_data(snapshot_data, peer, state)} end + def handle_info({:packet, %Receipts{} = receipts_data, peer}, state) do + {:noreply, handle_receipts(receipts_data, peer, state)} + end + def handle_info({:packet, packet, peer}, state) do :ok = Exth.trace(fn -> "[Sync] Ignoring packet #{packet.__struct__} from #{peer}" end) @@ -357,6 +378,33 @@ defmodule ExWire.Sync do %{state | warp_queue: next_warp_queue} end + @doc """ + When we receive the Receipts payload that we requested, add them to the BlockQueue, + request new receipts if there are any to be fetched, and return the state with the + updated BlockQueue that accounts for the processed Receipts as well as any newly-requested ones. + """ + @spec handle_receipts(Receipts.t(), Peer.t(), state()) :: state() + def handle_receipts( + %Receipts{receipts: blocks_receipts}, + _peer, + state = %{block_queue: block_queue} + ) do + case BlockQueue.add_receipts(block_queue, blocks_receipts) do + {updated_block_queue, []} -> + :ok = Logger.debug("Processed receipts, no new ones queued to fetch.") + %{state | block_queue: updated_block_queue} + + {updated_block_queue, hashes_to_request} -> + :ok = + Logger.debug(fn -> + "[Sync] Sending GetReceipts request for [#{Enum.count(hashes_to_request)}] receipts." + end) + + _ = send_with_retry(GetReceipts.new(hashes_to_request), :random, :request_receipts) + %{state | block_queue: updated_block_queue} + end + end + @doc """ When we get block headers from peers, we add them to our current block queue to incorporate the blocks into our state chain. @@ -386,7 +434,7 @@ defmodule ExWire.Sync do :ok = maybe_request_next_block(block_queue) state else - {next_highest_block_number, next_block_queue, next_block_tree, next_trie, header_hashes} = + {next_highest_block_number, updated_block_queue, next_block_tree, next_trie, header_hashes} = Enum.reduce( block_headers.headers, {highest_block_number, block_queue, block_tree, trie, []}, @@ -420,12 +468,28 @@ defmodule ExWire.Sync do end ) - :ok = - PeerSupervisor.send_packet( + next_block_queue = + case BlockQueue.get_receipts_to_request(updated_block_queue) do + {:ok, block_hashes, modified_queue} -> + :ok = + Logger.debug(fn -> + "[Sync] Sending GetReceipts request for [#{Enum.count(block_hashes)}] receipts." + end) + + _ = send_with_retry(GetReceipts.new(block_hashes), :random, :request_receipts) + modified_queue + + :do_not_request -> + updated_block_queue + end + + _ = + send_with_retry( %GetBlockBodies{ hashes: header_hashes }, - :random + :random, + :request_block_bodies ) next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) @@ -586,7 +650,11 @@ defmodule ExWire.Sync do @spec send_with_retry( Packet.packet(), PeerSupervisor.node_selector(), - :request_manifest | :request_next_block | {:request_chunk, EVM.hash()} + :request_manifest + | :request_next_block + | :request_block_bodies + | :request_receipts + | {:request_chunk, EVM.hash()} ) :: boolean() defp send_with_retry(packet, node_selector, retry_message) do send_packet_result = From cf22dd33b4340ba64a9de3d76b96562fefcae323 Mon Sep 17 00:00:00 2001 From: Will Meister Date: Thu, 3 Jan 2019 14:38:16 -0600 Subject: [PATCH 2/5] Adding fast sync block processing logic --- apps/blockchain/lib/blockchain/blocktree.ex | 18 ++++-- .../ex_wire/lib/ex_wire/struct/block_queue.ex | 46 ++++++++++++--- apps/ex_wire/lib/ex_wire/sync.ex | 57 ++++++++++--------- 3 files changed, 78 insertions(+), 43 deletions(-) diff --git a/apps/blockchain/lib/blockchain/blocktree.ex b/apps/blockchain/lib/blockchain/blocktree.ex index 498787fd0..1c01afbed 100644 --- a/apps/blockchain/lib/blockchain/blocktree.ex +++ b/apps/blockchain/lib/blockchain/blocktree.ex @@ -55,15 +55,21 @@ defmodule Blockchain.Blocktree do else: {:valid, trie} with {:valid, trie} <- validation do - {:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash) + add_block_without_validation(blocktree, block, trie, specified_block_hash) + end + end - # Cache computed block hash - block = %{block | block_hash: block_hash} + @spec add_block_without_validation(t, Block.t(), TrieStorage.t(), EVM.hash() | nil) :: + {:ok, {t, TrieStorage.t(), EVM.hash()}} + def add_block_without_validation(blocktree, block, trie, specified_block_hash \\ nil) do + {:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash) - updated_blocktree = update_best_block(blocktree, block) + # Cache computed block hash + block = %{block | block_hash: block_hash} - {:ok, {updated_blocktree, updated_trie, block_hash}} - end + updated_blocktree = update_best_block(blocktree, block) + + {:ok, {updated_blocktree, updated_trie, block_hash}} end @spec update_best_block(t, Block.t()) :: t diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 3e7494892..03d902960 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -144,14 +144,10 @@ defmodule ExWire.Struct.BlockQueue do block_receipts_to_request: receipts_to_request } - if fast_sync_in_progress do - {updated_block_queue, block_tree, trie, should_request_body} - else - {new_block_queue, new_block_tree, new_trie} = - process_block_queue(updated_block_queue, block_tree, chain, trie) + {new_block_queue, new_block_tree, new_trie} = + process_block_queue(updated_block_queue, block_tree, chain, trie) - {new_block_queue, new_block_tree, new_trie, should_request_body} - end + {new_block_queue, new_block_tree, new_trie, should_request_body} end @doc """ @@ -334,6 +330,36 @@ defmodule ExWire.Struct.BlockQueue do defp do_process_blocks([], block_queue, block_tree, _chain, trie), do: {block_queue, block_tree, trie} + defp do_process_blocks( + [block | rest], + block_queue = %__MODULE__{fast_sync_in_progress: true}, + block_tree, + chain, + trie + ) do + {:ok, {updated_blocktree, updated_trie, block_hash}} = + Blocktree.add_block_without_validation(block_tree, block, trie) + + :ok = + Logger.debug(fn -> + "[Block Queue] Added block #{block.header.number} (0x#{ + Base.encode16(block_hash, case: :lower) + }) to new block tree without validation during fast sync." + end) + + {backlogged_blocks, new_backlog} = Map.pop(block_queue.backlog, block_hash, []) + + new_block_queue = %{block_queue | backlog: new_backlog} + + do_process_blocks( + backlogged_blocks ++ rest, + new_block_queue, + updated_blocktree, + chain, + updated_trie + ) + end + defp do_process_blocks([block | rest], block_queue, block_tree, chain, trie) do {new_block_tree, new_trie, new_backlog, extra_blocks} = case Blocktree.verify_and_add_block( @@ -464,12 +490,14 @@ defmodule ExWire.Struct.BlockQueue do } """ @spec get_complete_blocks(t) :: {t, [Block.t()]} - def get_complete_blocks(block_queue = %__MODULE__{queue: queue}) do + def get_complete_blocks( + block_queue = %__MODULE__{queue: queue, fast_sync_in_progress: fast_syncing} + ) do {queue, blocks} = Enum.reduce(queue, {queue, []}, fn {number, block_map}, {queue, blocks} -> {final_block_map, new_blocks} = Enum.reduce(block_map, {block_map, []}, fn {hash, block_item}, {block_map, blocks} -> - if block_item.ready and + if block_item.ready and (not fast_syncing or block_item.receipts_added) and MapSet.size(block_item.commitments) >= ExWire.Config.commitment_count() do {Map.delete(block_map, hash), [block_item.block | blocks]} else diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index b85d9be7b..73fc1daba 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -130,21 +130,6 @@ defmodule ExWire.Sync do {:processed_block_chunk, chunk_hash, processed_blocks, block}, state = %{warp_queue: warp_queue} ) do - # - # ###### - # ## TODO: REMOVE THIS !!!! - # ###### - # - # if block do - # block_hashes = [ - # Block.hash(block), - # Exth.decode_hex("0x8e9726d8a4fd1ece78e2cac3aa0eb73f22444c99320c24cc879440398304fb60"), - # Exth.decode_hex("0x4a4e2470a6b55b2af27dfdf5562ba1ceafa6f75d92e0859b2db6225184a74f9b") - # ] - # :ok = Logger.debug(fn -> "[Sync] Sending GetReceipts request: #{Exth.encode_hex(Block.hash(block))}, Header: #{Header.to_string(block.header)}" end) - # true = send_with_retry(GetReceipts.new(block_hashes), :random, :request_receipts) - # end - next_state = warp_queue |> WarpQueue.processed_block_chunk(chunk_hash, block, processed_blocks) @@ -387,22 +372,38 @@ defmodule ExWire.Sync do def handle_receipts( %Receipts{receipts: blocks_receipts}, _peer, - state = %{block_queue: block_queue} + state = %{ + trie: trie, + chain: chain, + block_tree: block_tree, + block_queue: block_queue + } ) do - case BlockQueue.add_receipts(block_queue, blocks_receipts) do - {updated_block_queue, []} -> - :ok = Logger.debug("Processed receipts, no new ones queued to fetch.") - %{state | block_queue: updated_block_queue} + updated_block_queue = + case BlockQueue.add_receipts(block_queue, blocks_receipts) do + {updated_block_queue, []} -> + :ok = Logger.debug("Processed receipts, no new ones queued to fetch.") + updated_block_queue + + {updated_block_queue, hashes_to_request} -> + :ok = + Logger.debug(fn -> + "[Sync] Sending GetReceipts request for [#{Enum.count(hashes_to_request)}] receipts." + end) + + _ = send_with_retry(GetReceipts.new(hashes_to_request), :random, :request_receipts) + updated_block_queue + end - {updated_block_queue, hashes_to_request} -> - :ok = - Logger.debug(fn -> - "[Sync] Sending GetReceipts request for [#{Enum.count(hashes_to_request)}] receipts." - end) + {final_queue, final_tree, final_trie} = + BlockQueue.process_block_queue(updated_block_queue, block_tree, chain, trie) - _ = send_with_retry(GetReceipts.new(hashes_to_request), :random, :request_receipts) - %{state | block_queue: updated_block_queue} - end + %{ + state + | trie: final_trie, + block_tree: final_tree, + block_queue: final_queue + } end @doc """ From 71136c9b2c85cb74d4cd80e08e8973e9fcfae1a0 Mon Sep 17 00:00:00 2001 From: Will Meister Date: Fri, 4 Jan 2019 15:31:00 -0600 Subject: [PATCH 3/5] Fixing a few bugs in BlockQueue --- .../lib/ex_wire/packet/capability/eth/receipts.ex | 2 -- apps/ex_wire/lib/ex_wire/struct/block_queue.ex | 11 ++++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex index b61d4f789..8f8a84852 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -67,8 +67,6 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do end end - :ok = Logger.info("[RECEIPTS] Decoded Receipt!!!!!!!!!!") - new(block_receipts) end diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 03d902960..9ab779b2d 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -32,7 +32,7 @@ defmodule ExWire.Struct.BlockQueue do backlog: %{}, do_validation: true, block_numbers: MapSet.new(), - fast_sync_in_progress: false, + fast_sync_in_progress: true, block_receipts_set: MapSet.new(), block_receipts_to_request: [], block_receipts_requested: [] @@ -102,7 +102,7 @@ defmodule ExWire.Struct.BlockQueue do Map.put(block_map, header_hash, %{ commitments: MapSet.new([remote_id]), block: %Block{header: header}, - receits_added: false, + receipts_added: false, ready: is_empty }) @@ -110,7 +110,7 @@ defmodule ExWire.Struct.BlockQueue do if fast_sync_in_progress do { MapSet.put(block_receipts_set, header_num_and_hash), - [block_queue.block_receipts_to_request | header_num_and_hash] + block_queue.block_receipts_to_request ++ [header_num_and_hash] } else {block_receipts_set, block_receipts_to_request} @@ -124,7 +124,7 @@ defmodule ExWire.Struct.BlockQueue do not MapSet.member?(block_receipts_set, header_num_and_hash) do { MapSet.put(block_receipts_set, header_num_and_hash), - [block_queue.block_receipts_to_request | header_num_and_hash] + block_queue.block_receipts_to_request ++ [header_num_and_hash] } else {block_receipts_set, block_receipts_to_request} @@ -219,6 +219,7 @@ defmodule ExWire.Struct.BlockQueue do } ) do if is_fast and Enum.empty?(requested) and not Enum.empty?(to_request) do + {new_requests, to_request_tail} = Enum.split(to_request, @max_receipts_to_request) { @@ -251,7 +252,7 @@ defmodule ExWire.Struct.BlockQueue do }], Requested # [#{Enum.count(req)}]" end) - {queue, req} + {queue, req |> Enum.map(fn {_number, hash} -> hash end)} end def add_receipts( From 42c1eff8c6ca2036c504978f960c656bb1de2116 Mon Sep 17 00:00:00 2001 From: Will Meister Date: Fri, 4 Jan 2019 17:06:41 -0600 Subject: [PATCH 4/5] Fixing more bugs in fast sync -- Still need to figure out end case --- .../ex_wire/lib/ex_wire/struct/block_queue.ex | 17 +++++++------- apps/ex_wire/lib/ex_wire/sync.ex | 22 +++++++++++-------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index 9ab779b2d..b8edc930c 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -102,12 +102,12 @@ defmodule ExWire.Struct.BlockQueue do Map.put(block_map, header_hash, %{ commitments: MapSet.new([remote_id]), block: %Block{header: header}, - receipts_added: false, + receipts_added: is_empty, ready: is_empty }) {receipts_set, receipts_to_request} = - if fast_sync_in_progress do + if fast_sync_in_progress and not is_empty do { MapSet.put(block_receipts_set, header_num_and_hash), block_queue.block_receipts_to_request ++ [header_num_and_hash] @@ -120,7 +120,8 @@ defmodule ExWire.Struct.BlockQueue do block_item -> {receipts_set, receipts_to_request} = - if fast_sync_in_progress and Enum.empty?(block_item.block.receipts) and + if fast_sync_in_progress and header.number != 0 and + Enum.empty?(block_item.block.receipts) and not MapSet.member?(block_receipts_set, header_num_and_hash) do { MapSet.put(block_receipts_set, header_num_and_hash), @@ -219,7 +220,6 @@ defmodule ExWire.Struct.BlockQueue do } ) do if is_fast and Enum.empty?(requested) and not Enum.empty?(to_request) do - {new_requests, to_request_tail} = Enum.split(to_request, @max_receipts_to_request) { @@ -232,7 +232,6 @@ defmodule ExWire.Struct.BlockQueue do } } else - # TODO: check if we're done with Fast Sync and update BlockQueue.fast_sync_in_progress :do_not_request end end @@ -243,7 +242,7 @@ defmodule ExWire.Struct.BlockQueue do This will return the updated BlockQueue and the hashes of the blocks to request Receipts for next. """ @spec add_receipts(t(), [[Receipt.t()]]) :: {t(), [EVM.hash()]} | {t(), []} - def add_receipts(queue = %__MODULE__{block_receipts_requested: req}, receipts) + def add_receipts(block_queue = %__MODULE__{block_receipts_requested: req}, receipts) when length(req) != length(receipts) do :ok = Logger.warn(fn -> @@ -252,7 +251,7 @@ defmodule ExWire.Struct.BlockQueue do }], Requested # [#{Enum.count(req)}]" end) - {queue, req |> Enum.map(fn {_number, hash} -> hash end)} + {block_queue, req |> Enum.map(fn {_number, hash} -> hash end)} end def add_receipts( @@ -266,8 +265,8 @@ defmodule ExWire.Struct.BlockQueue do number_hash_tuple_receipts = Enum.zip(requested, block_receipts) updated_queue = - Enum.reduce(number_hash_tuple_receipts, block_queue, fn {{number, hash}, receipts}, - updated_queue -> + Enum.reduce(number_hash_tuple_receipts, queue, fn {{number, hash}, receipts}, + updated_queue -> block_map = Map.get(queue, number) block_item = Map.get(block_map, hash) block = Map.get(block_item, :block) diff --git a/apps/ex_wire/lib/ex_wire/sync.ex b/apps/ex_wire/lib/ex_wire/sync.ex index 73fc1daba..5fa1e8a1a 100644 --- a/apps/ex_wire/lib/ex_wire/sync.ex +++ b/apps/ex_wire/lib/ex_wire/sync.ex @@ -398,6 +398,8 @@ defmodule ExWire.Sync do {final_queue, final_tree, final_trie} = BlockQueue.process_block_queue(updated_block_queue, block_tree, chain, trie) + :ok = maybe_request_next_block(final_queue) + %{ state | trie: final_trie, @@ -469,6 +471,17 @@ defmodule ExWire.Sync do end ) + if not Enum.empty?(header_hashes) do + _ = + send_with_retry( + %GetBlockBodies{ + hashes: header_hashes + }, + :random, + :request_block_bodies + ) + end + next_block_queue = case BlockQueue.get_receipts_to_request(updated_block_queue) do {:ok, block_hashes, modified_queue} -> @@ -484,15 +497,6 @@ defmodule ExWire.Sync do updated_block_queue end - _ = - send_with_retry( - %GetBlockBodies{ - hashes: header_hashes - }, - :random, - :request_block_bodies - ) - next_maybe_saved_trie = maybe_save(block_tree, next_block_tree, next_trie) :ok = maybe_request_next_block(next_block_queue) From 75b9d7735d4298592fc80385d231f2521248a2f1 Mon Sep 17 00:00:00 2001 From: Will Meister Date: Sat, 5 Jan 2019 11:56:45 -0600 Subject: [PATCH 5/5] Fixing spec in Receipts.new(...), defaulting block_queue to fast_sync_in_progress = false --- apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex | 2 +- apps/ex_wire/lib/ex_wire/struct/block_queue.ex | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex index 8f8a84852..59e617426 100644 --- a/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex +++ b/apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex @@ -23,7 +23,7 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do :receipts ] - @spec new([Receipt.t()]) :: t() + @spec new([[Receipt.t()]]) :: t() def new(receipts) do %__MODULE__{ receipts: receipts diff --git a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex index b8edc930c..ebc2efe44 100644 --- a/apps/ex_wire/lib/ex_wire/struct/block_queue.ex +++ b/apps/ex_wire/lib/ex_wire/struct/block_queue.ex @@ -32,7 +32,7 @@ defmodule ExWire.Struct.BlockQueue do backlog: %{}, do_validation: true, block_numbers: MapSet.new(), - fast_sync_in_progress: true, + fast_sync_in_progress: false, block_receipts_set: MapSet.new(), block_receipts_to_request: [], block_receipts_requested: []