Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fast Sync #752

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions apps/blockchain/lib/blockchain/blocktree.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,21 @@ defmodule Blockchain.Blocktree do
else: {:valid, trie}

with {:valid, trie} <- validation do
{:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash)
add_block_without_validation(blocktree, block, trie, specified_block_hash)
end
end

# Cache computed block hash
block = %{block | block_hash: block_hash}
@spec add_block_without_validation(t, Block.t(), TrieStorage.t(), EVM.hash() | nil) ::
{:ok, {t, TrieStorage.t(), EVM.hash()}}
def add_block_without_validation(blocktree, block, trie, specified_block_hash \\ nil) do
{:ok, {block_hash, updated_trie}} = Block.put_block(block, trie, specified_block_hash)

updated_blocktree = update_best_block(blocktree, block)
# Cache computed block hash
block = %{block | block_hash: block_hash}

{:ok, {updated_blocktree, updated_trie, block_hash}}
end
updated_blocktree = update_best_block(blocktree, block)

{:ok, {updated_blocktree, updated_trie, block_hash}}
end

@spec update_best_block(t, Block.t()) :: t
Expand Down
20 changes: 20 additions & 0 deletions apps/evm/lib/evm/log_entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,26 @@ defmodule EVM.LogEntry do
}
end

@spec serialize(t) :: ExRLP.t()
def serialize(log) do
[
log.address,
log.topics,
log.data
]
end

@spec deserialize(ExRLP.t()) :: t
def deserialize(rlp) do
[
address,
topics,
data
] = rlp

new(address, topics, data)
end

@doc """
Converts log struct to standard Ethereum list representation.

Expand Down
20 changes: 15 additions & 5 deletions apps/ex_wire/lib/ex_wire/packet/capability/eth/receipts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do
"""

require Logger
alias Blockchain.Transaction.Receipt

@behaviour ExWire.Packet

@type t :: %__MODULE__{
receipts: [any()]
receipts: [[Receipt.t()]]
}

defstruct [
:receipts
]

@spec new([any()]) :: t()
@spec new([[Receipt.t()]]) :: t()
def new(receipts) do
%__MODULE__{
receipts: receipts
Expand All @@ -45,7 +46,11 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do
@impl true
@spec serialize(t) :: ExRLP.t()
def serialize(packet = %__MODULE__{}) do
for receipt <- packet.receipts, do: receipt
for receipts <- packet.receipts do
for receipt <- receipts do
Receipt.serialize(receipt)
end
end
end

@doc """
Expand All @@ -55,9 +60,14 @@ defmodule ExWire.Packet.Capability.Eth.Receipts do
@impl true
@spec deserialize(ExRLP.t()) :: t
def deserialize(rlp) do
receipts = for receipt <- rlp, do: receipt
block_receipts =
for receipts <- rlp do
for receipt <- receipts do
Receipt.deserialize(receipt)
end
end

new(receipts)
new(block_receipts)
end

@doc """
Expand Down
189 changes: 180 additions & 9 deletions apps/ex_wire/lib/ex_wire/struct/block_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@ defmodule ExWire.Struct.BlockQueue do
alias Block.Header
alias ExWire.Struct.Block, as: BlockStruct
alias Blockchain.{Block, Blocktree, Chain}
alias Blockchain.Transaction.Receipt
alias MerklePatriciaTree.Trie

require Logger

@max_receipts_to_request 500
# These will be used to help us determine if a block is empty
@empty_trie MerklePatriciaTree.Trie.empty_trie_root_hash()
@empty_hash [] |> ExRLP.encode() |> ExthCrypto.Hash.Keccak.kec()

defstruct queue: %{},
backlog: %{},
do_validation: true,
block_numbers: MapSet.new()
block_numbers: MapSet.new(),
fast_sync_in_progress: false,
block_receipts_set: MapSet.new(),
block_receipts_to_request: [],
block_receipts_requested: []

@type block_item :: %{
commitments: list(binary()),
block: Block.t(),
receipts_added: boolean(),
ready: boolean()
}

Expand All @@ -45,7 +52,11 @@ defmodule ExWire.Struct.BlockQueue do
queue: %{integer() => block_map},
backlog: %{EVM.hash() => list(Block.t())},
do_validation: boolean(),
block_numbers: MapSet.t()
block_numbers: MapSet.t(),
fast_sync_in_progress: boolean(),
block_receipts_set: MapSet.t(),
block_receipts_to_request: [EVM.hash()],
block_receipts_requested: [EVM.hash()]
}

@doc """
Expand All @@ -65,7 +76,12 @@ defmodule ExWire.Struct.BlockQueue do
Trie.t()
) :: {t, Blocktree.t(), Trie.t(), boolean()}
def add_header(
block_queue = %__MODULE__{queue: queue},
block_queue = %__MODULE__{
queue: queue,
fast_sync_in_progress: fast_sync_in_progress,
block_receipts_set: block_receipts_set,
block_receipts_to_request: block_receipts_to_request
},
block_tree,
header,
header_hash,
Expand All @@ -74,8 +90,9 @@ defmodule ExWire.Struct.BlockQueue do
trie
) do
block_map = Map.get(queue, header.number, %{})
header_num_and_hash = {header.number, header_hash}

{block_map, should_request_body} =
{block_map, should_request_body, receipts_to_request, receipts_set} =
case Map.get(block_map, header_hash) do
nil ->
# may already be ready, already.
Expand All @@ -85,22 +102,47 @@ defmodule ExWire.Struct.BlockQueue do
Map.put(block_map, header_hash, %{
commitments: MapSet.new([remote_id]),
block: %Block{header: header},
receipts_added: is_empty,
ready: is_empty
})

{block_map, not is_empty}
{receipts_set, receipts_to_request} =
if fast_sync_in_progress and not is_empty do
{
MapSet.put(block_receipts_set, header_num_and_hash),
block_queue.block_receipts_to_request ++ [header_num_and_hash]
}
else
{block_receipts_set, block_receipts_to_request}
end

{block_map, not is_empty, receipts_to_request, receipts_set}

block_item ->
{receipts_set, receipts_to_request} =
if fast_sync_in_progress and header.number != 0 and
Enum.empty?(block_item.block.receipts) and
not MapSet.member?(block_receipts_set, header_num_and_hash) do
{
MapSet.put(block_receipts_set, header_num_and_hash),
block_queue.block_receipts_to_request ++ [header_num_and_hash]
}
else
{block_receipts_set, block_receipts_to_request}
end

{Map.put(block_map, header_hash, %{
block_item
| commitments: MapSet.put(block_item.commitments, remote_id)
}), false}
}), false, receipts_to_request, receipts_set}
end

updated_block_queue = %{
block_queue
| queue: Map.put(queue, header.number, block_map),
block_numbers: MapSet.put(block_queue.block_numbers, header.number)
block_numbers: MapSet.put(block_queue.block_numbers, header.number),
block_receipts_set: receipts_set,
block_receipts_to_request: receipts_to_request
}

{new_block_queue, new_block_tree, new_trie} =
Expand Down Expand Up @@ -164,6 +206,103 @@ defmodule ExWire.Struct.BlockQueue do
process_block_queue(updated_block_queue, block_tree, chain, trie)
end

@doc """
Returns the collection of block hashes for which Receipts are needed, as well as the
updated BlockQueue accounting for the requested hashes, if fast sync is in progress and
a request is not already in flight.
"""
@spec get_receipts_to_request(t()) :: {:ok, [EVM.hash()], t()} | :do_not_request
def get_receipts_to_request(
block_queue = %__MODULE__{
fast_sync_in_progress: is_fast,
block_receipts_to_request: to_request,
block_receipts_requested: requested
}
) do
if is_fast and Enum.empty?(requested) and not Enum.empty?(to_request) do
{new_requests, to_request_tail} = Enum.split(to_request, @max_receipts_to_request)

{
:ok,
new_requests |> Enum.map(fn {_number, hash} -> hash end),
%{
block_queue
| block_receipts_to_request: to_request_tail,
block_receipts_requested: new_requests
}
}
else
:do_not_request
end
end

@doc """
Processes the provided Receipts, verifying them against stored Headers and adding
them to the Blocks stored in the BlockQueue.
This will return the updated BlockQueue and the hashes of the blocks to request Receipts for next.
"""
@spec add_receipts(t(), [[Receipt.t()]]) :: {t(), [EVM.hash()]} | {t(), []}
def add_receipts(block_queue = %__MODULE__{block_receipts_requested: req}, receipts)
when length(req) != length(receipts) do
:ok =
Logger.warn(fn ->
"[Block Queue] Received Receipts of different length than requested. Cannot match them to blocks. Receipts # [#{
Enum.count(receipts)
}], Requested # [#{Enum.count(req)}]"
end)

{block_queue, req |> Enum.map(fn {_number, hash} -> hash end)}
end

def add_receipts(
block_queue = %__MODULE__{
queue: queue,
block_receipts_set: block_receipts_set,
block_receipts_requested: requested
},
block_receipts
) do
number_hash_tuple_receipts = Enum.zip(requested, block_receipts)

updated_queue =
Enum.reduce(number_hash_tuple_receipts, queue, fn {{number, hash}, receipts},
updated_queue ->
block_map = Map.get(queue, number)
block_item = Map.get(block_map, hash)
block = Map.get(block_item, :block)
updated_block = %{block | receipts: receipts}

# TODO: Build Trie and verify that Receipts Root matches header.receipts_root

Map.put(
updated_queue,
number,
Map.put(block_map, hash, %{
block_item
| receipts_added: true,
block: updated_block
})
)
end)

updated_receipts_set = MapSet.difference(block_receipts_set, MapSet.new(requested))

updated_block_queue = %{
block_queue
| queue: updated_queue,
block_receipts_requested: [],
block_receipts_set: updated_receipts_set
}

case get_receipts_to_request(updated_block_queue) do
{:ok, hashes, block_queue_to_return} ->
{block_queue_to_return, hashes}

:do_not_request ->
{updated_block_queue, []}
end
end

@doc """
Processes a the block queue, adding any blocks which are complete and pass
the number of confirmations to the block tree. These blocks are then removed
Expand Down Expand Up @@ -191,6 +330,36 @@ defmodule ExWire.Struct.BlockQueue do
defp do_process_blocks([], block_queue, block_tree, _chain, trie),
do: {block_queue, block_tree, trie}

defp do_process_blocks(
[block | rest],
block_queue = %__MODULE__{fast_sync_in_progress: true},
block_tree,
chain,
trie
) do
{:ok, {updated_blocktree, updated_trie, block_hash}} =
Blocktree.add_block_without_validation(block_tree, block, trie)

:ok =
Logger.debug(fn ->
"[Block Queue] Added block #{block.header.number} (0x#{
Base.encode16(block_hash, case: :lower)
}) to new block tree without validation during fast sync."
end)

{backlogged_blocks, new_backlog} = Map.pop(block_queue.backlog, block_hash, [])

new_block_queue = %{block_queue | backlog: new_backlog}

do_process_blocks(
backlogged_blocks ++ rest,
new_block_queue,
updated_blocktree,
chain,
updated_trie
)
end

defp do_process_blocks([block | rest], block_queue, block_tree, chain, trie) do
{new_block_tree, new_trie, new_backlog, extra_blocks} =
case Blocktree.verify_and_add_block(
Expand Down Expand Up @@ -321,12 +490,14 @@ defmodule ExWire.Struct.BlockQueue do
}
"""
@spec get_complete_blocks(t) :: {t, [Block.t()]}
def get_complete_blocks(block_queue = %__MODULE__{queue: queue}) do
def get_complete_blocks(
block_queue = %__MODULE__{queue: queue, fast_sync_in_progress: fast_syncing}
) do
{queue, blocks} =
Enum.reduce(queue, {queue, []}, fn {number, block_map}, {queue, blocks} ->
{final_block_map, new_blocks} =
Enum.reduce(block_map, {block_map, []}, fn {hash, block_item}, {block_map, blocks} ->
if block_item.ready and
if block_item.ready and (not fast_syncing or block_item.receipts_added) and
MapSet.size(block_item.commitments) >= ExWire.Config.commitment_count() do
{Map.delete(block_map, hash), [block_item.block | blocks]}
else
Expand Down
Loading