Skip to content

Commit

Permalink
feat(state-root): reactor state-root
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed Oct 14, 2024
1 parent 96fa6e9 commit cd3cf57
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 28 deletions.
129 changes: 110 additions & 19 deletions chainbase/src/main/java/org/tron/core/service/RootHashService.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package org.tron.core.service;

import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -20,7 +26,10 @@
import org.tron.common.utils.Pair;
import org.tron.common.utils.Sha256Hash;
import org.tron.core.db.TronDatabase;
import org.tron.core.db2.common.Value;
import org.tron.core.store.AccountAssetStore;
import org.tron.core.store.CorruptedCheckpointStore;
import org.tron.protos.Protocol;

@Slf4j(topic = "DB")
@Component
Expand All @@ -30,31 +39,113 @@ public class RootHashService {
"latest_block_header_number".getBytes());

private static Optional<CorruptedCheckpointStore> corruptedCheckpointStore = Optional.empty();
private static AccountAssetStore assetStore;
private static final List<String> stateDbs = Arrays.asList(
"account", "account-asset", "asset-issue-v2",
"code", "contract", "contract-state", "storage-row",
"delegation", "DelegatedResource",
"exchange-v2",
"market_account", "market_order", "market_pair_price_to_order", "market_pair_to_price",
"properties", "proposal",
"votes", "witness", "witness_schedule"
);
private static final byte[] CURRENT_SHUFFLED_WITNESSES = "current_shuffled_witnesses".getBytes();
private static final String FORK_PREFIX = "FORK_VERSION_";
private static final String DONE_SUFFIX = "_DONE";
private static final String ACCOUNT_VOTE_SUFFIX = "-account-vote";
private static final Set<String> ignoredProperties = Sets.newHashSet(
"VOTE_REWARD_RATE", "SINGLE_REPEAT", "NON_EXISTENT_ACCOUNT_TRANSFER_MIN",
"ALLOW_TVM_ASSET_ISSUE", "ALLOW_TVM_STAKE",
"MAX_VOTE_NUMBER", "MAX_FROZEN_NUMBER", "MAINTENANCE_TIME_INTERVAL",
"LATEST_SOLIDIFIED_BLOCK_NUM", "BLOCK_NET_USAGE",
"BLOCK_FILLED_SLOTS_INDEX", "BLOCK_FILLED_SLOTS_NUMBER");

@Autowired
public RootHashService(@Autowired CorruptedCheckpointStore corruptedCheckpointStore) {
public RootHashService(@Autowired CorruptedCheckpointStore corruptedCheckpointStore,
@Autowired AccountAssetStore assetStore) {
RootHashService.corruptedCheckpointStore = Optional.ofNullable(corruptedCheckpointStore);
RootHashService.assetStore = assetStore;
}

public static Pair<Optional<Long>, Sha256Hash> getRootHash(Map<byte[], byte[]> rows) {
AtomicReference<Optional<Long>> height = new AtomicReference<>(Optional.empty());
List<Sha256Hash> ids = Streams.stream(rows.entrySet()).parallel().map(entry -> {
if (Arrays.equals(HEADER_KEY, entry.getKey())) {
height.set(Optional.of(ByteArray.toLong(entry.getValue())));
try {
Map<byte[], byte[]> preparedStateData = preparedStateData(rows);
AtomicReference<Optional<Long>> height = new AtomicReference<>(Optional.empty());
List<Sha256Hash> ids = Streams.stream(preparedStateData.entrySet()).parallel().map(entry -> {
if (Arrays.equals(HEADER_KEY, entry.getKey())) {
height.set(Optional.of(ByteArray.toLong(entry.getValue())));
}
return getHash(entry);
}).sorted().collect(Collectors.toList());
Sha256Hash actual = MerkleRoot.root(ids);
long num = height.get().orElseThrow(() -> new TronDBException("blockNum is null"));
Optional<Sha256Hash> expected = GlobalContext.popBlockHash(num);
if (expected.isPresent() && !Objects.equals(expected.get(), actual)) {
corruptedCheckpointStore.ifPresent(TronDatabase::reset);
corruptedCheckpointStore.ifPresent(store -> store.updateByBatch(rows));
throw new TronDBException(String.format(
"Root hash mismatch for blockNum: %s, expected: %s, actual: %s",
num, expected, actual));
}
return getHash(entry);
}).sorted().collect(Collectors.toList());
Sha256Hash actual = MerkleRoot.root(ids);
long num = height.get().orElseThrow(() -> new TronDBException("blockNum is null"));
Optional<Sha256Hash> expected = GlobalContext.popBlockHash(num);
if (expected.isPresent() && !Objects.equals(expected.get(), actual)) {
corruptedCheckpointStore.ifPresent(TronDatabase::reset);
corruptedCheckpointStore.ifPresent(store -> store.updateByBatch(rows));
throw new TronDBException(String.format(
"Root hash mismatch for blockNum: %s, expected: %s, actual: %s", num, expected, actual));
return new Pair<>(height.get(), actual);
} catch (IOException e) {
throw new TronDBException(e);
}
}

private static Map<byte[], byte[]> preparedStateData(Map<byte[], byte[]> rows)
throws IOException {
Map<byte[], byte[]> preparedStateData = new HashMap<>(rows.size());
for (Map.Entry<byte[], byte[]> e : rows.entrySet()) {
byte[] key = e.getKey();
String dbName = simpleDecode(key);
if (!stateDbs.contains(dbName)) {
continue;
}
byte[] realKey = Arrays.copyOfRange(key, dbName.getBytes().length + Integer.BYTES,
key.length);
if ("witness_schedule".equals(dbName) && Arrays.equals(realKey, CURRENT_SHUFFLED_WITNESSES)) {
continue;
}
if ("properties".equals(dbName)) {
String keyStr = new String(realKey);
if (ignoredProperties.contains(keyStr)
|| keyStr.startsWith(FORK_PREFIX) || keyStr.endsWith(DONE_SUFFIX)) {
continue;
}
}
byte[] value = e.getValue();
byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
if (realValue != null) {
if ("witness".equals(dbName)) {
realValue = Protocol.Witness.parseFrom(realValue)
.toBuilder().clearTotalMissed()
.build().toByteArray(); // ignore totalMissed
}
if ("account".equals(dbName)
|| ("delegation".equals(dbName) && new String(realKey).endsWith(ACCOUNT_VOTE_SUFFIX))) {
Protocol.Account account = Protocol.Account.parseFrom(realValue);
Map<String, Long> assets = new TreeMap<>(assetStore.getAllAssets(account));
realValue = account.toBuilder().clearAsset().clearAssetV2().putAllAssetV2(assets)
.build().toByteArray();
}
}
if (realValue != null) {
preparedStateData.put(realKey, realValue);
} else {
if (Value.Operator.DELETE.getValue() != value[0]) {
preparedStateData.put(realKey, ByteString.EMPTY.toByteArray());
}
}
}
return preparedStateData;
}

return new Pair<>(height.get(), actual);
private static String simpleDecode(byte[] bytes) {
byte[] lengthBytes = Arrays.copyOf(bytes, Integer.BYTES);
int length = Ints.fromByteArray(lengthBytes);
byte[] value = Arrays.copyOfRange(bytes, Integer.BYTES, Integer.BYTES + length);
return new String(value);
}

private static Sha256Hash getHash(Map.Entry<byte[], byte[]> entry) {
Expand All @@ -64,9 +155,9 @@ private static Sha256Hash getHash(Map.Entry<byte[], byte[]> entry) {
private static byte[] simpleEncode(String s) {
byte[] bytes = s.getBytes();
byte[] length = Ints.toByteArray(bytes.length);
byte[] r = new byte[4 + bytes.length];
System.arraycopy(length, 0, r, 0, 4);
System.arraycopy(bytes, 0, r, 4, bytes.length);
byte[] r = new byte[Integer.BYTES + bytes.length];
System.arraycopy(length, 0, r, 0, Integer.BYTES);
System.arraycopy(bytes, 0, r, Integer.BYTES, bytes.length);
return r;
}
}
28 changes: 19 additions & 9 deletions plugins/src/main/java/org/tron/plugins/DbCheckSum.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -35,17 +36,13 @@ public class DbCheckSum implements Callable<Integer> {
}

private static final List<String> stateDbs = Arrays.asList(
"account", "account-asset",
"asset-issue", "asset-issue-v2",
"code", "contract", "contract-state",
"account", "account-asset", "asset-issue-v2",
"code", "contract", "contract-state", "storage-row",
"delegation", "DelegatedResource",
"exchange", "exchange-v2",
"exchange-v2",
"market_account", "market_order", "market_pair_price_to_order", "market_pair_to_price",
"properties",
"proposal",
"storage-row",
"votes",
"witness", "witness_schedule"
"properties", "proposal",
"votes", "witness", "witness_schedule"
);
private static final byte[] CURRENT_SHUFFLED_WITNESSES = "current_shuffled_witnesses".getBytes();
private static final String FORK_PREFIX = "FORK_VERSION_";
Expand All @@ -56,6 +53,7 @@ public class DbCheckSum implements Callable<Integer> {
"MAX_VOTE_NUMBER", "MAX_FROZEN_NUMBER", "MAINTENANCE_TIME_INTERVAL",
"LATEST_SOLIDIFIED_BLOCK_NUM", "BLOCK_NET_USAGE",
"BLOCK_FILLED_SLOTS_INDEX", "BLOCK_FILLED_SLOTS_NUMBER");
private static final String ACCOUNT_VOTE_SUFFIX = "-account-vote";

@CommandLine.Spec
static CommandLine.Model.CommandSpec spec;
Expand Down Expand Up @@ -170,6 +168,18 @@ private String check() throws RocksDBException, IOException {
.toBuilder().clearTotalMissed()
.build().toByteArray(); // ignore totalMissed
}
if ("account".equals(dbName)
|| ("delegation".equals(dbName) && new String(key).endsWith(ACCOUNT_VOTE_SUFFIX))) {
Protocol.Account account = Protocol.Account.parseFrom(value);
if (account.getAssetOptimized()) {
value = account.toBuilder().clearAsset().clearAssetV2()
.build().toByteArray();
} else {
value = account.toBuilder().clearAsset().clearAssetV2()
.putAllAssetV2(new TreeMap<>(account.getAssetV2Map()))
.build().toByteArray();
}
}
srcDbKeyCount = srcDbKeyCount.add(BigInteger.ONE);
srcDbKeySum = checkSum(srcDbKeySum, key);
srcDbValueSum = checkSum(srcDbValueSum, value);
Expand Down

0 comments on commit cd3cf57

Please sign in to comment.