Skip to content

Commit

Permalink
make preimage storage provider use memoized supplier
Browse files Browse the repository at this point in the history
Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
garyschulte committed Nov 12, 2024
1 parent 429a00f commit 476fdcf
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.base.Suppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,17 +49,17 @@ public class KeyValueStorageProvider implements StorageProvider {

protected final Function<List<SegmentIdentifier>, SegmentedKeyValueStorage>
segmentedStorageCreator;
private final KeyValueStorage worldStatePreimageStorage;
private final Supplier<KeyValueStorage> preimageStorageSupplier;
protected final Map<List<SegmentIdentifier>, SegmentedKeyValueStorage> storageInstances =
new HashMap<>();
private final ObservableMetricsSystem metricsSystem;

public KeyValueStorageProvider(
final Function<List<SegmentIdentifier>, SegmentedKeyValueStorage> segmentedStorageCreator,
final KeyValueStorage worldStatePreimageStorage,
final Supplier<KeyValueStorage> worldStatePreimageStorage,
final ObservableMetricsSystem metricsSystem) {
this.segmentedStorageCreator = segmentedStorageCreator;
this.worldStatePreimageStorage = worldStatePreimageStorage;
this.preimageStorageSupplier = Suppliers.memoize(worldStatePreimageStorage::get);
this.metricsSystem = metricsSystem;
}

Expand Down Expand Up @@ -99,7 +101,7 @@ public WorldStateStorageCoordinator createWorldStateStorageCoordinator(

@Override
public WorldStatePreimageStorage createWorldStatePreimageStorage() {
return new WorldStatePreimageKeyValueStorage(worldStatePreimageStorage);
return new WorldStatePreimageKeyValueStorage(preimageStorageSupplier.get());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.hyperledger.besu.plugin.services.storage.KeyValueStorageFactory;
import org.hyperledger.besu.services.kvstore.LimitedInMemoryKeyValueStorage;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.function.Supplier;

public class KeyValueStorageProviderBuilder {

Expand Down Expand Up @@ -58,22 +57,19 @@ public KeyValueStorageProvider build() {
"Cannot build a storage provider without the plugin common configuration.");
checkNotNull(metricsSystem, "Cannot build a storage provider without a metrics system.");

// TODO: unhack this storage pre-init hack, maybe a memoized supplier
storageFactory.create(
new ArrayList<>(EnumSet.allOf(KeyValueSegmentIdentifier.class)),
commonConfiguration,
metricsSystem);

final KeyValueStorage worldStatePreImageStorage =
final Supplier<KeyValueStorage> preimageStorageSupplier =
commonConfiguration.getDataStorageConfiguration().getHashPreImageStorageEnabled()
? storageFactory.create(
KeyValueSegmentIdentifier.HASH_PREIMAGE_STORE, commonConfiguration, metricsSystem)
: new LimitedInMemoryKeyValueStorage(DEFAULT_WORLD_STATE_PRE_IMAGE_CACHE_SIZE);
? () ->
storageFactory.create(
KeyValueSegmentIdentifier.HASH_PREIMAGE_STORE,
commonConfiguration,
metricsSystem)
: () -> new LimitedInMemoryKeyValueStorage(DEFAULT_WORLD_STATE_PRE_IMAGE_CACHE_SIZE);
;

return new KeyValueStorageProvider(
segments -> storageFactory.create(segments, commonConfiguration, metricsSystem),
worldStatePreImageStorage,
preimageStorageSupplier,
(ObservableMetricsSystem) metricsSystem);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ public Optional<Address> getAccountTrieKeyPreimage(final Bytes32 trieKey) {
.map(val -> Address.wrap(Bytes.wrap(val)));
}

@Override
public boolean canSupportStreaming() {
return keyValueStorage.isPersistent();
}

@Override
public Updater updater() {
return new Updater(keyValueStorage.startTransaction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private static MutableWorldState createGenesisBonsaiWorldState() {
new BonsaiWorldStateKeyValueStorage(
new KeyValueStorageProvider(
segmentIdentifiers -> new SegmentedInMemoryKeyValueStorage(),
new InMemoryKeyValueStorage(),
() -> new InMemoryKeyValueStorage(),
new NoOpMetricsSystem()),
new NoOpMetricsSystem(),
DataStorageConfiguration.DEFAULT_BONSAI_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ public Optional<UInt256> getStorageValueByStorageSlotKey(
.map(UInt256::fromBytes);
}

@Override
public Stream<StreamableAccount> streamAccounts(final Bytes32 startKeyHash, final int limit) {
return worldStateKeyValueStorage.streamAccounts(this, startKeyHash, limit);
}

@Override
public UInt256 getPriorStorageValue(final Address address, final UInt256 storageKey) {
return getStorageValue(address, storageKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHas

public NavigableMap<Bytes32, AccountStorageEntry> storageEntriesFrom(
final Hash addressHash, final Bytes32 startKeyHash, final int limit) {
if (preImageProxy != null && preImageProxy.canSupportStreaming()) {
if (preImageProxy != null) {
return streamFlatStorages(addressHash, startKeyHash, UInt256.MAX_VALUE, limit)
.entrySet()
// map back to slot keys using preImage provider:
Expand All @@ -201,28 +201,23 @@ public NavigableMap<Bytes32, AccountStorageEntry> storageEntriesFrom(

public Stream<WorldState.StreamableAccount> streamAccounts(
final DiffBasedWorldView context, final Bytes32 startKeyHash, final int limit) {
if (preImageProxy.canSupportStreaming()) {
return streamFlatAccounts(startKeyHash, UInt256.MAX_VALUE, limit)
.entrySet()
// map back to addresses using preImage provider:
.stream()
.map(
entry ->
preImageProxy
.getAccountTrieKeyPreimage(entry.getKey())
.map(
address ->
new WorldState.StreamableAccount(
Optional.of(address),
BonsaiAccount.fromRLP(
context, address, entry.getValue(), false))))
.filter(Optional::isPresent)
.map(Optional::get)
.filter(acct -> context.updater().getAccount(acct.getAddress().orElse(null)) != null)
.sorted(Comparator.comparing(account -> account.getAddress().orElse(Address.ZERO)));
} else {
throw new RuntimeException("Not configured to support enumerating accounts");
}
return streamFlatAccounts(startKeyHash, UInt256.MAX_VALUE, limit)
.entrySet()
// map back to addresses using preImage provider:
.stream()
.map(
entry ->
preImageProxy
.getAccountTrieKeyPreimage(entry.getKey())
.map(
address ->
new WorldState.StreamableAccount(
Optional.of(address),
BonsaiAccount.fromRLP(context, address, entry.getValue(), false))))
.filter(Optional::isPresent)
.map(Optional::get)
.filter(acct -> context.updater().getAccount(acct.getAddress().orElse(null)) != null)
.sorted(Comparator.comparing(account -> account.getAddress().orElse(Address.ZERO)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ public NoOpTrieLogManager() {
}

@Override
public synchronized Optional<TrieLog> saveTrieLog(
public synchronized void saveTrieLog(
final DiffBasedWorldStateUpdateAccumulator<?> localUpdater,
final Hash forWorldStateRootHash,
final BlockHeader forBlockHeader,
final DiffBasedWorldState forWorldState) {
// notify trie log added observers, synchronously
TrieLog trieLog = trieLogFactory.create(localUpdater, forBlockHeader);
trieLogObservers.forEach(o -> o.onTrieLogAdded(new TrieLogAddedEvent(trieLog)));
return Optional.of(trieLog);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TrieLogManager(
this.trieLogFactory = setupTrieLogFactory(pluginContext);
}

public synchronized Optional<TrieLog> saveTrieLog(
public synchronized void saveTrieLog(
final DiffBasedWorldStateUpdateAccumulator<?> localUpdater,
final Hash forWorldStateRootHash,
final BlockHeader forBlockHeader,
Expand All @@ -80,7 +80,6 @@ public synchronized Optional<TrieLog> saveTrieLog(
trieLogObservers.forEach(o -> o.onTrieLogAdded(new TrieLogAddedEvent(trieLog)));

success = true;
return Optional.of(trieLog);
} finally {
if (success) {
stateUpdater.commit();
Expand All @@ -89,7 +88,6 @@ public synchronized Optional<TrieLog> saveTrieLog(
}
}
}
return Optional.empty();
}

private TrieLog prepareTrieLog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,33 @@ public void persist(final BlockHeader blockHeader) {
final DiffBasedWorldStateKeyValueStorage.Updater stateUpdater =
worldStateKeyValueStorage.updater();
Runnable saveTrieLog = () -> {};
Runnable savePreimages =
() -> {
var preImageUpdater = worldStateKeyValueStorage.getPreimageStorage().updater();
localCopy
.getAccountsToUpdate()
// log.getAccountChanges()
.keySet()
.forEach(acct -> preImageUpdater.putAccountTrieKeyPreimage(acct.addressHash(), acct));
localCopy.getStorageToUpdate().values().stream()
.flatMap(z -> z.keySet().stream())
.filter(
z -> {
// TODO: we should add logic here to prevent writing
// common slot keys
return z.getSlotKey().isPresent();
})
.distinct()
.forEach(
slot -> {
preImageUpdater.putStorageTrieKeyPreimage(
slot.getSlotHash(), slot.getSlotKey().get());
});
// prob need to override this in a bonsai implementation that simply defers to the trielog
// tx/commit
preImageUpdater.commit();
};

try {
final Hash calculatedRootHash;

Expand All @@ -183,40 +210,11 @@ public void persist(final BlockHeader blockHeader) {
verifyWorldStateRoot(calculatedRootHash, blockHeader);
saveTrieLog =
() -> {
var trieLog =
trieLogManager.saveTrieLog(localCopy, calculatedRootHash, blockHeader, this);
trieLogManager.saveTrieLog(localCopy, calculatedRootHash, blockHeader, this);
// not save a frozen state in the cache
if (!worldStateConfig.isFrozen()) {
cachedWorldStorageManager.addCachedLayer(blockHeader, calculatedRootHash, this);
}

// TODO: maybe move this, make conditional so we don't affect performance
// if we are not tracking preimages. using the trielog probably is going to get us
// duplicates
// because we will get updates in addition to creates :frown:
if (trieLog.isPresent()) {
var log = trieLog.get();
var preImageUpdater = worldStateKeyValueStorage.getPreimageStorage().updater();
log.getAccountChanges()
.keySet()
.forEach(
acct ->
preImageUpdater.putAccountTrieKeyPreimage(acct.addressHash(), acct));
localCopy.getStorageToUpdate().values().stream()
.flatMap(z -> z.keySet().stream())
.filter(
z -> {
// TODO: we should add logic here to prevent writing
// common slot keys
return z.getSlotKey().isPresent();
})
.distinct()
.forEach(
slot -> {
preImageUpdater.putStorageTrieKeyPreimage(
slot.getSlotHash(), slot.getSlotKey().get());
});
}
};

stateUpdater
Expand All @@ -238,6 +236,9 @@ public void persist(final BlockHeader blockHeader) {
stateUpdater.commit();
accumulator.reset();
saveTrieLog.run();
// TODO: maybe move this, make conditional so we don't affect performance
// if we are not tracking preimages.
savePreimages.run();
} else {
stateUpdater.rollback();
accumulator.reset();
Expand Down Expand Up @@ -323,8 +324,9 @@ public Hash blockHash() {
}

@Override
public abstract Stream<StreamableAccount> streamAccounts(
final Bytes32 startKeyHash, final int limit);
public Stream<StreamableAccount> streamAccounts(final Bytes32 startKeyHash, final int limit) {
return worldStateKeyValueStorage.streamAccounts(this, startKeyHash, limit);
}

@Override
public UInt256 getPriorStorageValue(final Address address, final UInt256 storageKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ public interface WorldStatePreimageStorage {

Optional<Address> getAccountTrieKeyPreimage(Bytes32 trieKey);

/**
* This method indicates whether this Pre-Image store is "complete", meaning it has all of the
* hash preimages for all entries in the state trie.
*
* @return boolean indicating whether the pre-image store is complete or not
*/
boolean canSupportStreaming();

Updater updater();

interface Updater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.evm.internal.EvmConfiguration;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.storage.KeyValueStorage;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.services.kvstore.LimitedInMemoryKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedInMemoryKeyValueStorage;

import java.util.Optional;
Expand All @@ -47,7 +49,7 @@ public class InMemoryKeyValueStorageProvider extends KeyValueStorageProvider {
public InMemoryKeyValueStorageProvider() {
super(
segmentIdentifiers -> new SegmentedInMemoryKeyValueStorage(),
new InMemoryKeyValueStorage(),
InMemoryKeyValueStorageProvider::get,
new NoOpMetricsSystem());
}

Expand Down Expand Up @@ -126,4 +128,8 @@ public static PrivateStateStorage createInMemoryPrivateStateStorage() {
public static VariablesStorage createInMemoryVariablesStorage() {
return new VariablesKeyValueStorage(new InMemoryKeyValueStorage());
}

private static KeyValueStorage get() {
return new LimitedInMemoryKeyValueStorage(4000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.cache.BonsaiCachedMerkleTrieLoader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.cache.BonsaiCachedWorldStorageManager;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
Expand All @@ -57,6 +58,7 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -73,6 +75,10 @@ class BonsaiWorldStateProviderTest {
@Mock StorageProvider storageProvider;

@Mock SegmentedKeyValueStorage segmentedKeyValueStorage;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
WorldStatePreimageKeyValueStorage preimageStorage;

@Mock KeyValueStorage trieLogStorage;
@Mock SegmentedKeyValueStorageTransaction segmentedKeyValueStorageTransaction;
BonsaiWorldStateProvider bonsaiWorldStateArchive;
Expand All @@ -82,8 +88,10 @@ class BonsaiWorldStateProviderTest {

@BeforeEach
public void setUp() {

when(storageProvider.getStorageBySegmentIdentifiers(anyList()))
.thenReturn(segmentedKeyValueStorage);
when(storageProvider.createWorldStatePreimageStorage()).thenReturn(preimageStorage);
when(segmentedKeyValueStorage.startTransaction())
.thenReturn(segmentedKeyValueStorageTransaction);
when(storageProvider.getStorageBySegmentIdentifier(any())).thenReturn(trieLogStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ private static WorldStateArchive buildWorldStateArchive(

final MutableWorldState worldState = worldStateArchive.getMutable();
final WorldUpdater updater = worldState.updater();

for (final Map.Entry<String, ReferenceTestWorldState.AccountMock> entry : accounts.entrySet()) {
ReferenceTestWorldState.insertAccount(
updater, Address.fromHexString(entry.getKey()), entry.getValue());
Expand Down
Loading

0 comments on commit 476fdcf

Please sign in to comment.