Skip to content

Commit

Permalink
Merge branch 'main' into chore/dockerfile-dirs-creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Kammerlo authored Jun 23, 2024
2 parents 3bbf57d + 4bab3ab commit 9a694e0
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@

public interface AddressUtxoRepository extends JpaRepository<AddressUtxoEntity, UtxoId> {

List<AddressUtxoEntity> findAddressUtxoEntitiesByOutputIndexAndTxHash(Integer outputIndex,
String txHash);

@Query(value =
"""
SELECT a FROM AddressUtxoEntity a
Expand Down Expand Up @@ -44,4 +41,6 @@ AND NOT EXISTS(SELECT 1 FROM TxInputEntity o WHERE o.txHash = a.txHash AND o.out
AND a.blockNumber <= :block
""")
List<AddressUtxoEntity> findUnspentUtxosByStakeAddressAndBlock(@Param("address") String stakeAddress, @Param("block") long block);

List<AddressUtxoEntity> findByTxHashIn(List<String> utxHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@

public interface DelegationRepository extends JpaRepository<DelegationEntity, DelegationId> {

List<DelegationEntity> findByTxHash(String txHash);
List<DelegationEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface PoolRegistrationRepository extends
JpaRepository<PoolRegistrationEntity, PoolRegistrationId> {

List<PoolRegistrationEntity> findByTxHash(String txHash);
List<PoolRegistrationEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface PoolRetirementRepository extends
JpaRepository<PoolRetirementEntity, PoolRetirementId> {

List<PoolRetirementEntity> findByTxHash(String txHash);
List<PoolRetirementEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
public interface StakeRegistrationRepository extends
JpaRepository<StakeRegistrationEntity, StakeRegistrationId> {

List<StakeRegistrationEntity> findByTxHash(String txHash);
List<StakeRegistrationEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@

public interface WithdrawalRepository extends JpaRepository<WithdrawalEntity, WithdrawalId> {

List<WithdrawalEntity> findByTxHash(String txHash);
List<WithdrawalEntity> findByTxHashIn(List<String> txHashes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import org.cardanofoundation.rosetta.api.block.model.domain.Block;
import org.cardanofoundation.rosetta.api.block.model.domain.BlockTx;
Expand All @@ -15,12 +17,12 @@
@Slf4j
@Service
@RequiredArgsConstructor
@Transactional(readOnly = true, propagation = Propagation.SUPPORTS)
public class BlockServiceImpl implements BlockService {

private final LedgerBlockService ledgerBlockService;
private final ProtocolParamService protocolParamService;


@Override
public Block findBlock(Long index, String hash) {
log.info("[block] Looking for block: hash={}, index={}", hash, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.validation.constraints.NotNull;

import lombok.RequiredArgsConstructor;
Expand All @@ -24,7 +30,12 @@
import org.cardanofoundation.rosetta.api.block.model.domain.BlockTx;
import org.cardanofoundation.rosetta.api.block.model.domain.ProtocolParams;
import org.cardanofoundation.rosetta.api.block.model.entity.BlockEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.DelegationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.PoolRegistrationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.PoolRetirementEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.StakeRegistrationEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.TxnEntity;
import org.cardanofoundation.rosetta.api.block.model.entity.WithdrawalEntity;
import org.cardanofoundation.rosetta.api.block.model.repository.BlockRepository;
import org.cardanofoundation.rosetta.api.block.model.repository.DelegationRepository;
import org.cardanofoundation.rosetta.api.block.model.repository.PoolRegistrationRepository;
Expand All @@ -35,7 +46,6 @@
import org.cardanofoundation.rosetta.common.exception.ExceptionFactory;
import org.cardanofoundation.rosetta.common.services.ProtocolParamService;


@Slf4j
@RequiredArgsConstructor
@Component
Expand All @@ -53,9 +63,7 @@ public class LedgerBlockServiceImpl implements LedgerBlockService {
private final WithdrawalRepository withdrawalRepository;
private final AddressUtxoRepository addressUtxoRepository;


private final BlockMapper blockMapper;

private final TransactionMapper transactionMapper;
private final AddressUtxoEntityToUtxo addressUtxoEntityToUtxo;

Expand All @@ -75,10 +83,30 @@ public Optional<Block> findBlock(Long blockNumber, String blockHash) {
}
}

@Override
public Optional<BlockIdentifierExtended> findBlockIdentifier(Long blockNumber, String blockHash) {
log.debug("Query blockNumber: {} , blockHash: {}", blockNumber, blockHash);
if (blockHash == null && blockNumber != null) {
return blockRepository.findBlockIdentifierByNumber(blockNumber)
.map(blockMapper::mapToBlockIdentifierExtended);
} else if (blockHash != null && blockNumber == null) {
return blockRepository.findBlockIdentifierByHash(blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
} else {
return blockRepository
.findBlockIdentifierByNumberAndHash(blockNumber, blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
}
}

@NotNull
private Block toModelFrom(BlockEntity blockEntity) {
Block model = blockMapper.mapToBlock(blockEntity);
model.getTransactions().forEach(this::populateTransaction);
ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
List<BlockTx> transactions = model.getTransactions();
TransactionInfo fetched = findByTxHash(transactions);
Map<UtxoKey, AddressUtxoEntity> utxoMap = getUtxoMapFromEntities(fetched);
transactions.forEach(tx -> populateTransaction(tx, pps, fetched, utxoMap));
return model;
}

Expand All @@ -93,8 +121,11 @@ public List<BlockTx> findTransactionsByBlock(Long blk, String blkHash) {
List<TxnEntity> txList = txRepository.findTransactionsByBlockHash(blkEntity.get().getHash());
log.debug("Found {} transactions", txList.size());
if (ObjectUtils.isNotEmpty(txList)) {
ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
List<BlockTx> transactions = txList.stream().map(blockMapper::mapToBlockTx).toList();
transactions.forEach(this::populateTransaction);
TransactionInfo fetched = findByTxHash(transactions);
Map<UtxoKey, AddressUtxoEntity> utxoMap = getUtxoMapFromEntities(fetched);
transactions.forEach(tx -> populateTransaction(tx, pps, fetched, utxoMap));
return transactions;
}
return Collections.emptyList();
Expand All @@ -111,11 +142,11 @@ public Block findLatestBlock() {

@Override
public BlockIdentifierExtended findLatestBlockIdentifier() {
log.debug("About to look for latest block");
log.debug("About to look for latest findLatestBlockIdentifier");
BlockIdentifierExtended latestBlock = blockRepository.findLatestBlockIdentifier()
.map(blockMapper::mapToBlockIdentifierExtended)
.orElseThrow(ExceptionFactory::genesisBlockNotFound);
log.debug("Returning latest block {}", latestBlock);
log.debug("Returning latest findLatestBlockIdentifier {}", latestBlock);
return latestBlock;
}

Expand All @@ -131,77 +162,117 @@ public BlockIdentifierExtended findGenesisBlockIdentifier() {
return cachedGenesisBlock;
}

@Override
public Optional<BlockIdentifierExtended> findBlockIdentifier(Long blockNumber, String blockHash) {
log.debug("Query blockNumber: {} , blockHash: {}", blockNumber, blockHash);
if (blockHash == null && blockNumber != null) {
return blockRepository.findBlockIdentifierByNumber(blockNumber)
.map(blockMapper::mapToBlockIdentifierExtended);
} else if (blockHash != null && blockNumber == null) {
return blockRepository.findBlockIdentifierByHash(blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
} else {
return blockRepository
.findBlockIdentifierByNumberAndHash(blockNumber, blockHash)
.map(blockMapper::mapToBlockIdentifierExtended);
private TransactionInfo findByTxHash(List<BlockTx> transactions) {
List<String> txHashes = transactions.stream().map(BlockTx::getHash).toList();

List<String> utxHashes = transactions
.stream()
.flatMap(t -> Stream.concat(t.getInputs().stream(), t.getOutputs().stream()))
.map(Utxo::getTxHash)
.toList();

try (var executorService = Executors.newFixedThreadPool(6)) {
Future<List<AddressUtxoEntity>> utxos = executorService.submit(() ->
addressUtxoRepository.findByTxHashIn(utxHashes));
Future<List<StakeRegistrationEntity>> sReg = executorService.submit(() ->
stakeRegistrationRepository.findByTxHashIn(txHashes));
Future<List<DelegationEntity>> delegations = executorService.submit(() ->
delegationRepository.findByTxHashIn(txHashes));
Future<List<PoolRegistrationEntity>> pReg = executorService.submit(() ->
poolRegistrationRepository.findByTxHashIn(txHashes));
Future<List<PoolRetirementEntity>> pRet = executorService.submit(() ->
poolRetirementRepository.findByTxHashIn(txHashes));
Future<List<WithdrawalEntity>> withdrawals = executorService.submit(() ->
withdrawalRepository.findByTxHashIn(txHashes));

return new TransactionInfo(utxos.get(), sReg.get(), delegations.get(), pReg.get(), pRet.get(),
withdrawals.get());
} catch (InterruptedException | ExecutionException e) {
log.error("Error fetching transaction data", e);
Thread.currentThread().interrupt();
throw ExceptionFactory.unspecifiedError("Error fetching transaction data");
}
}

private void populateTransaction(BlockTx transaction) {
private void populateTransaction(BlockTx transaction, ProtocolParams pps, TransactionInfo fetched,
Map<UtxoKey, AddressUtxoEntity> utxoMap) {
Optional.ofNullable(transaction.getInputs())
.stream()
.flatMap(List::stream)
.forEach(this::populateUtxo);
.forEach(utxo -> populateUtxo(utxo, utxoMap));

Optional.ofNullable(transaction.getOutputs())
.stream()
.flatMap(List::stream)
.forEach(this::populateUtxo);
.forEach(utxo -> populateUtxo(utxo, utxoMap));

transaction.setStakeRegistrations(
stakeRegistrationRepository
.findByTxHash(transaction.getHash())
fetched.stakeRegistrations
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapStakeRegistrationEntityToStakeRegistration)
.toList());

transaction.setDelegations(
delegationRepository
.findByTxHash(transaction.getHash())
fetched.delegations
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapDelegationEntityToDelegation)
.toList());

transaction.setPoolRegistrations(poolRegistrationRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapEntityToPoolRegistration)
.toList());
transaction.setWithdrawals(
fetched.withdrawals
.stream()
.filter(tx -> tx.getTxHash().equals(transaction.getHash()))
.map(transactionMapper::mapWithdrawalEntityToWithdrawal)
.toList());

transaction.setPoolRetirements(poolRetirementRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapEntityToPoolRetirement)
.toList());
transaction.setPoolRegistrations(
fetched.poolRegistrations
.stream()
.map(transactionMapper::mapEntityToPoolRegistration)
.toList());

transaction.setWithdrawals(withdrawalRepository
.findByTxHash(transaction.getHash())
.stream()
.map(transactionMapper::mapWithdrawalEntityToWithdrawal)
.toList());
transaction.setPoolRetirements(
fetched.poolRetirements
.stream()
.map(transactionMapper::mapEntityToPoolRetirement)
.toList());

ProtocolParams pps = protocolParamService.findProtocolParametersFromIndexer();
transaction.setSize(calcSize(transaction, pps));
}

private static long calcSize(BlockTx tx, ProtocolParams p) {
return (Long.parseLong(tx.getFee()) - p.getMinFeeB().longValue()) / p.getMinFeeA().longValue();
}

private void populateUtxo(Utxo utxo) {
AddressUtxoEntity first = addressUtxoRepository
.findAddressUtxoEntitiesByOutputIndexAndTxHash(utxo.getOutputIndex(), utxo.getTxHash())
.getFirst();
Optional.ofNullable(first).ifPresent(m -> addressUtxoEntityToUtxo.overWriteDto(utxo,m));
private void populateUtxo(Utxo utxo, Map<UtxoKey, AddressUtxoEntity> utxoMap) {
AddressUtxoEntity entity = utxoMap.get(
new UtxoKey(utxo.getTxHash(), utxo.getOutputIndex()));
Optional.ofNullable(entity)
.ifPresent(e -> addressUtxoEntityToUtxo.overWriteDto(utxo, e));
}

private static Map<UtxoKey, AddressUtxoEntity> getUtxoMapFromEntities(TransactionInfo fetched) {
return fetched.utxos
.stream()
.collect(Collectors.toMap(
utxo -> new UtxoKey(utxo.getTxHash(), utxo.getOutputIndex()),
utxo -> utxo
));
}

private record TransactionInfo(List<AddressUtxoEntity> utxos,
List<StakeRegistrationEntity> stakeRegistrations,
List<DelegationEntity> delegations,
List<PoolRegistrationEntity> poolRegistrations,
List<PoolRetirementEntity> poolRetirements,
List<WithdrawalEntity> withdrawals) {

}

private record UtxoKey(String txHash, Integer outputIndex) {

}

}
Loading

0 comments on commit 9a694e0

Please sign in to comment.