Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/RA-129 Block fetch data optimization #188

Merged
merged 41 commits into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0743023
refactor: RA-129 fetch AddressUtxoEntity in one go
shleger May 16, 2024
68dac03
Merge branch 'main' into refactor/RA-129-block-optimize
shleger May 16, 2024
466d5e8
refactor: RA-129 try to optimize mappers
shleger May 17, 2024
5d395e6
refactor: RA-129 try to optimize mappers2
shleger May 17, 2024
2161e00
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 17, 2024
6ce43df
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 21, 2024
c7cce12
refactor: RA-129 try to optimize mappers2
shleger May 21, 2024
8c1512f
refactor: RA-129 fix spotless
shleger May 21, 2024
a3f3694
refactor: RA-129 fetch AddressUtxoEntity
shleger May 21, 2024
fc15ad3
refactor: RA-129 reformat
shleger May 21, 2024
1fc2d8c
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 21, 2024
7d97ef0
refactor: RA-129 merge with main
shleger May 21, 2024
87268a9
refactor: RA-129 fix failed test
shleger May 21, 2024
32d602d
refactor: RA-129 add request with StructuredTaskScope
shleger May 23, 2024
b6c83ce
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 23, 2024
cad3974
refactor: RA-129 merge with main
shleger May 23, 2024
e057db1
refactor: RA-129 add preview in pom
shleger May 23, 2024
d3c46bb
refactor: RA-129 three variations of findByTxHash with
shleger May 24, 2024
9dd2a70
refactor: RA-129 comment preview
shleger May 24, 2024
d345548
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 24, 2024
e915720
refactor: RA-129 refactor to submit
shleger May 24, 2024
adc5d05
Merge branch 'main' into refactor/RA-129-block-optimize-db
shleger May 24, 2024
f7bf446
refactor: RA-129 add transaction readOnly
shleger May 24, 2024
b4e1d50
refactor: RA-129 fix comment
shleger May 24, 2024
4b1b94a
Merge branch 'main' into refactor/RA-129-block-optimize-db
shleger May 27, 2024
70ce730
Merge branch 'main' of https://github.com/cardano-foundation/cardano-…
shleger May 30, 2024
9618daa
refactor: RA-129 merge with main
shleger May 30, 2024
e866b7b
refactor: RA-129 increase pool size
shleger May 30, 2024
59587ba
refactor: RA-129 decrease pool size
shleger May 30, 2024
29ef93c
refactor: RA-129 try maximumPoolSize=30
shleger May 30, 2024
8f19ef5
Merge branch 'main' into refactor/RA-129-block-optimize-db
mlinnik Jun 10, 2024
0c627c7
refactor: RA-129 newVirtualThreadPerTaskExecutor changed to newFixedT…
mlinnik Jun 11, 2024
0f36c9d
RA-129: Map added, maximumPoolSize rollback to 12
mlinnik Jun 12, 2024
a03323a
RA-129: Map optimization
mlinnik Jun 13, 2024
90e6564
RA-129: Unnecessary setting removed from yaml, Propagation added to B…
mlinnik Jun 13, 2024
60445ef
RA-129: Tests added
mlinnik Jun 13, 2024
a3c2990
RA-129: Entities renamed to TransactionInfo; ExecutionMode.CONCURRENT…
mlinnik Jun 19, 2024
6854835
Merge branch 'main' into refactor/RA-129-block-optimize-db
BerezinD Jun 20, 2024
45c82b7
Update api/src/main/java/org/cardanofoundation/rosetta/api/block/mode…
BerezinD Jun 21, 2024
6f79899
Update api/src/main/java/org/cardanofoundation/rosetta/api/block/mode…
BerezinD Jun 21, 2024
d336c4f
Update api/src/main/java/org/cardanofoundation/rosetta/api/block/mode…
BerezinD Jun 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

public interface AddressUtxoRepository extends JpaRepository<AddressUtxoEntity, UtxoId> {

List<AddressUtxoEntity> findAddressUtxoEntitiesByOutputIndexAndTxHash(Integer outputIndex,
String txHash);

@Query(value =
"""
SELECT a FROM AddressUtxoEntity a
Expand Down Expand Up @@ -44,4 +41,6 @@ AND NOT EXISTS(SELECT 1 FROM TxInputEntity o WHERE o.txHash = a.txHash AND o.out
AND a.blockNumber <= :block
""")
List<AddressUtxoEntity> findUnspentUtxosByStakeAddressAndBlock(@Param("address") String stakeAddress, @Param("block") long block);

List<AddressUtxoEntity> findByTxHashIn(List<String> utxHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@

public interface DelegationRepository extends JpaRepository<DelegationEntity, DelegationId> {

List<DelegationEntity> findByTxHash(String txHash);
List<DelegationEntity> findByTxHashIn(List<String> list);
BerezinD marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface PoolRegistrationRepository extends
JpaRepository<PoolRegistrationEntity, PoolRegistrationId> {

List<PoolRegistrationEntity> findByTxHash(String txHash);
List<PoolRegistrationEntity> findByTxHashIn(List<String> list);
BerezinD marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface PoolRetirementRepository extends
JpaRepository<PoolRetirementEntity, PoolRetirementId> {

List<PoolRetirementEntity> findByTxHash(String txHash);
List<PoolRetirementEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface StakeRegistrationRepository extends
JpaRepository<StakeRegistrationEntity, StakeRegistrationId> {

List<StakeRegistrationEntity> findByTxHash(String txHash);
List<StakeRegistrationEntity> findByTxHashIn(List<String> list);
BerezinD marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@

public interface WithdrawalRepository extends JpaRepository<WithdrawalEntity, WithdrawalId> {

List<WithdrawalEntity> findByTxHash(String txHash);
List<WithdrawalEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import org.cardanofoundation.rosetta.api.block.model.domain.Block;
import org.cardanofoundation.rosetta.api.block.model.domain.BlockTx;
Expand All @@ -15,12 +17,12 @@
@Slf4j
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true, propagation = Propagation.SUPPORTS)
public class BlockServiceImpl implements BlockService {

private final LedgerBlockService ledgerBlockService;
private final ProtocolParamService protocolParamService;


@Override
public Block findBlock(Long index, String hash) {
log.info("[block] Looking for block: hash={}, index={}", hash, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.validation.constraints.NotNull;

import lombok.RequiredArgsConstructor;
Expand All @@ -24,7 +30,12 @@
import org.cardanofoundation.rosetta.api.block.model.domain.BlockTx;
import org.cardanofoundation.rosetta.api.block.model.domain.ProtocolParams;
import org.cardanofoundation.rosetta.api.block.model.entity.BlockEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.DelegationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.PoolRegistrationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.PoolRetirementEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.StakeRegistrationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.TxnEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.WithdrawalEntity;
import org.cardanofoundation.rosetta.api.block.model.repository.BlockRepository;
import org.cardanofoundation.rosetta.api.block.model.repository.DelegationRepository;
import org.cardanofoundation.rosetta.api.block.model.repository.PoolRegistrationRepository;
Expand All @@ -35,7 +46,6 @@
import org.cardanofoundation.rosetta.common.exception.ExceptionFactory;
import org.cardanofoundation.rosetta.common.services.ProtocolParamService;


@Slf4j
@RequiredArgsConstructor
@Component
Expand All @@ -53,9 +63,7 @@ public class LedgerBlockServiceImpl implements LedgerBlockService {
private final WithdrawalRepository withdrawalRepository;
private final AddressUtxoRepository addressUtxoRepository;


private final BlockMapper blockMapper;

private final TransactionMapper transactionMapper;
private final AddressUtxoEntityToUtxo addressUtxoEntityToUtxo;

Expand All @@ -75,10 +83,30 @@ public Optional<Block> findBlock(Long blockNumber, String blockHash) {
}
}

@Override
public Optional<BlockIdentifierExtended> findBlockIdentifier(Long blockNumber, String blockHash) {
log.debug("Query blockNumber: {} , blockHash: {}", blockNumber, blockHash);
if (blockHash == null && blockNumber != null) {
return blockRepository.findBlockIdentifierByNumber(blockNumber)
.map(blockMapper::mapToBlockIdentifierExtended);
} else if (blockHash != null && blockNumber == null) {
return blockRepository.findBlockIdentifierByHash(blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
} else {
return blockRepository
.findBlockIdentifierByNumberAndHash(blockNumber, blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
}
}

@NotNull
private Block toModelFrom(BlockEntity blockEntity) {
Block model = blockMapper.mapToBlock(blockEntity);
model.getTransactions().forEach(this::populateTransaction);
ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
List<BlockTx> transactions = model.getTransactions();
TransactionInfo fetched = findByTxHash(transactions);
Map<UtxoKey, AddressUtxoEntity> utxoMap = getUtxoMapFromEntities(fetched);
transactions.forEach(tx -> populateTransaction(tx, pps, fetched, utxoMap));
return model;
}

Expand All @@ -93,8 +121,11 @@ public List<BlockTx> findTransactionsByBlock(Long blk, String blkHash) {
List<TxnEntity> txList = txRepository.findTransactionsByBlockHash(blkEntity.get().getHash());
log.debug("Found {} transactions", txList.size());
if (ObjectUtils.isNotEmpty(txList)) {
ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
List<BlockTx> transactions = txList.stream().map(blockMapper::mapToBlockTx).toList();
transactions.forEach(this::populateTransaction);
TransactionInfo fetched = findByTxHash(transactions);
Map<UtxoKey, AddressUtxoEntity> utxoMap = getUtxoMapFromEntities(fetched);
transactions.forEach(tx -> populateTransaction(tx, pps, fetched, utxoMap));
return transactions;
}
return Collections.emptyList();
Expand All @@ -111,11 +142,11 @@ public Block findLatestBlock() {

@Override
public BlockIdentifierExtended findLatestBlockIdentifier() {
log.debug("About to look for latest block");
log.debug("About to look for latest findLatestBlockIdentifier");
BlockIdentifierExtended latestBlock = blockRepository.findLatestBlockIdentifier()
.map(blockMapper::mapToBlockIdentifierExtended)
.orElseThrow(ExceptionFactory::genesisBlockNotFound);
log.debug("Returning latest block {}", latestBlock);
log.debug("Returning latest findLatestBlockIdentifier {}", latestBlock);
return latestBlock;
}

Expand All @@ -131,77 +162,117 @@ public BlockIdentifierExtended findGenesisBlockIdentifier() {
return cachedGenesisBlock;
}

@Override
public Optional<BlockIdentifierExtended> findBlockIdentifier(Long blockNumber, String blockHash) {
log.debug("Query blockNumber: {} , blockHash: {}", blockNumber, blockHash);
if (blockHash == null && blockNumber != null) {
return blockRepository.findBlockIdentifierByNumber(blockNumber)
.map(blockMapper::mapToBlockIdentifierExtended);
} else if (blockHash != null && blockNumber == null) {
return blockRepository.findBlockIdentifierByHash(blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
} else {
return blockRepository
.findBlockIdentifierByNumberAndHash(blockNumber, blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
private TransactionInfo findByTxHash(List<BlockTx> transactions) {
List<String> txHashes = transactions.stream().map(BlockTx::getHash).toList();

List<String> utxHashes = transactions
.stream()
.flatMap(t -> Stream.concat(t.getInputs().stream(), t.getOutputs().stream()))
.map(Utxo::getTxHash)
.toList();

try (var executorService = Executors.newFixedThreadPool(6)) {
Future<List<AddressUtxoEntity>> utxos = executorService.submit(() ->
addressUtxoRepository.findByTxHashIn(utxHashes));
Future<List<StakeRegistrationEntity>> sReg = executorService.submit(() ->
stakeRegistrationRepository.findByTxHashIn(txHashes));
Future<List<DelegationEntity>> delegations = executorService.submit(() ->
delegationRepository.findByTxHashIn(txHashes));
Future<List<PoolRegistrationEntity>> pReg = executorService.submit(() ->
poolRegistrationRepository.findByTxHashIn(txHashes));
Future<List<PoolRetirementEntity>> pRet = executorService.submit(() ->
poolRetirementRepository.findByTxHashIn(txHashes));
Future<List<WithdrawalEntity>> withdrawals = executorService.submit(() ->
withdrawalRepository.findByTxHashIn(txHashes));

return new TransactionInfo(utxos.get(), sReg.get(), delegations.get(), pReg.get(), pRet.get(),
withdrawals.get());
} catch (InterruptedException | ExecutionException e) {
log.error("Error fetching transaction data", e);
Thread.currentThread().interrupt();
throw ExceptionFactory.unspecifiedError("Error fetching transaction data");
BerezinD marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void populateTransaction(BlockTx transaction) {
private void populateTransaction(BlockTx transaction, ProtocolParams pps, TransactionInfo fetched,
Map<UtxoKey, AddressUtxoEntity> utxoMap) {
Optional.ofNullable(transaction.getInputs())
.stream()
.flatMap(List::stream)
.forEach(this::populateUtxo);
.forEach(utxo -> populateUtxo(utxo, utxoMap));

Optional.ofNullable(transaction.getOutputs())
.stream()
.flatMap(List::stream)
.forEach(this::populateUtxo);
.forEach(utxo -> populateUtxo(utxo, utxoMap));

transaction.setStakeRegistrations(
stakeRegistrationRepository
.findByTxHash(transaction.getHash())
fetched.stakeRegistrations
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapStakeRegistrationEntityToStakeRegistration)
.toList());

transaction.setDelegations(
delegationRepository
.findByTxHash(transaction.getHash())
fetched.delegations
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapDelegationEntityToDelegation)
.toList());

transaction.setPoolRegistrations(poolRegistrationRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapEntityToPoolRegistration)
.toList());
transaction.setWithdrawals(
fetched.withdrawals
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapWithdrawalEntityToWithdrawal)
.toList());

transaction.setPoolRetirements(poolRetirementRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapEntityToPoolRetirement)
.toList());
transaction.setPoolRegistrations(
fetched.poolRegistrations
.stream()
.map(transactionMapper::mapEntityToPoolRegistration)
.toList());

transaction.setWithdrawals(withdrawalRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapWithdrawalEntityToWithdrawal)
.toList());
transaction.setPoolRetirements(
fetched.poolRetirements
.stream()
.map(transactionMapper::mapEntityToPoolRetirement)
.toList());

ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
transaction.setSize(calcSize(transaction, pps));
}

private static long calcSize(BlockTx tx, ProtocolParams p) {
return (Long.parseLong(tx.getFee()) - p.getMinFeeB().longValue()) / p.getMinFeeA().longValue();
}

private void populateUtxo(Utxo utxo) {
AddressUtxoEntity first = addressUtxoRepository
.findAddressUtxoEntitiesByOutputIndexAndTxHash(utxo.getOutputIndex(), utxo.getTxHash())
.getFirst();
Optional.ofNullable(first).ifPresent(m -> addressUtxoEntityToUtxo.overWriteDto(utxo,m));
private void populateUtxo(Utxo utxo, Map<UtxoKey, AddressUtxoEntity> utxoMap) {
AddressUtxoEntity entity = utxoMap.get(
new UtxoKey(utxo.getTxHash(), utxo.getOutputIndex()));
Optional.ofNullable(entity)
.ifPresent(e -> addressUtxoEntityToUtxo.overWriteDto(utxo, e));
}

private static Map<UtxoKey, AddressUtxoEntity> getUtxoMapFromEntities(TransactionInfo fetched) {
return fetched.utxos
.stream()
.collect(Collectors.toMap(
utxo -> new UtxoKey(utxo.getTxHash(), utxo.getOutputIndex()),
utxo -> utxo
));
}

private record TransactionInfo(List<AddressUtxoEntity> utxos,
List<StakeRegistrationEntity> stakeRegistrations,
List<DelegationEntity> delegations,
List<PoolRegistrationEntity> poolRegistrations,
List<PoolRetirementEntity> poolRetirements,
List<WithdrawalEntity> withdrawals) {

}

private record UtxoKey(String txHash, Integer outputIndex) {

}

}
Loading
Loading