Skip to content

Commit

Permalink
Rerun conditional migrations (#6565)
Browse files Browse the repository at this point in the history
Description:
This PR is created to address re-running certain time sensitive migrations, for newer and partial mirror nodes.

This PR modifies:
- SyntheticCryptoTransferApprovalMigration to rerun when the HAPI version changes to 0.38.10 or above.
- SyntheticNftAllowanceOwnerMigration and SyntheticTokenAllowanceOwnerMigration to rerun when the HAPI version changes to 0.37.0 or above.
- InitializeEntityBalanceMigration and TokenAccountBalanceMigration to rerun, after the first account balance file has been parsed.

Related issue(s):
Fixes #6397
  • Loading branch information
mgoelswirlds authored and bilyana-gospodinova committed Aug 22, 2023
1 parent d23fc62 commit 73e0fa8
Show file tree
Hide file tree
Showing 21 changed files with 1,221 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hedera.mirror.importer.parser.record.entity.EntityProperties;
import com.hedera.mirror.importer.parser.record.entity.EntityRecordItemListener;
import com.hedera.mirror.importer.reader.ValidatedDataInputStream;
import com.hedera.mirror.importer.repository.AccountBalanceFileRepository;
import com.hedera.mirror.importer.repository.TokenTransferRepository;
import com.hedera.mirror.importer.repository.TransactionRepository;
import com.hederahashgraph.api.proto.java.Transaction;
Expand All @@ -46,6 +47,7 @@
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
Expand All @@ -57,6 +59,7 @@
* docs/database/README.md#errata for more detail.
*/
@Named
@Order(2)
public class ErrataMigration extends RepeatableMigration implements BalanceStreamFileListener {

private static final int ACCOUNT_BALANCE_FILE_FIXED_TIME_OFFSET = 53;
Expand All @@ -67,6 +70,7 @@ public class ErrataMigration extends RepeatableMigration implements BalanceStrea
@Value("classpath:errata/mainnet/balance-offsets.txt")
private final Resource balanceOffsets;

private final AccountBalanceFileRepository accountBalanceFileRepository;
private final EntityRecordItemListener entityRecordItemListener;
private final EntityProperties entityProperties;
private final NamedParameterJdbcOperations jdbcOperations;
Expand All @@ -81,6 +85,7 @@ public class ErrataMigration extends RepeatableMigration implements BalanceStrea
@SuppressWarnings("java:S107")
public ErrataMigration(
Resource balanceOffsets,
AccountBalanceFileRepository accountBalanceFileRepository,
EntityRecordItemListener entityRecordItemListener,
EntityProperties entityProperties,
NamedParameterJdbcOperations jdbcOperations,
Expand All @@ -91,6 +96,7 @@ public ErrataMigration(
TransactionRepository transactionRepository) {
super(mirrorProperties.getMigration());
this.balanceOffsets = balanceOffsets;
this.accountBalanceFileRepository = accountBalanceFileRepository;
this.entityRecordItemListener = entityRecordItemListener;
this.entityProperties = entityProperties;
this.jdbcOperations = jdbcOperations;
Expand Down Expand Up @@ -122,6 +128,7 @@ public void onEnd(AccountBalanceFile accountBalanceFile) {
if (shouldApplyFixedTimeOffset(consensusTimestamp)) {
accountBalanceFile.setTimeOffset(ACCOUNT_BALANCE_FILE_FIXED_TIME_OFFSET);
}
accountBalanceFileRepository.save(accountBalanceFile);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.google.common.base.Stopwatch;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.repository.AccountBalanceFileRepository;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.jdbc.core.JdbcOperations;

@Named
public class InitializeEntityBalanceMigration extends RepeatableMigration {
public class InitializeEntityBalanceMigration extends TimeSensitiveBalanceMigration {

private static final String INITIALIZE_ENTITY_BALANCE_SQL =
"""
Expand Down Expand Up @@ -66,8 +68,12 @@ on conflict (id) do update
private final JdbcOperations jdbcOperations;

@Lazy
public InitializeEntityBalanceMigration(JdbcOperations jdbcOperations, MirrorProperties mirrorProperties) {
super(mirrorProperties.getMigration());
public InitializeEntityBalanceMigration(
JdbcOperations jdbcOperations,
MirrorProperties mirrorProperties,
AccountBalanceFileRepository accountBalanceFileRepository,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration(), accountBalanceFileRepository, recordFileRepository);
this.jdbcOperations = jdbcOperations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,43 @@
package com.hedera.mirror.importer.migration;

import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.db.DBProperties;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import com.hederahashgraph.api.proto.java.Key;
import jakarta.inject.Named;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Data;
import lombok.Getter;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.transaction.support.TransactionOperations;

@Named
public class SyntheticCryptoTransferApprovalMigration extends AsyncJavaMigration<Long> {
public class SyntheticCryptoTransferApprovalMigration extends AsyncJavaMigration<Long>
implements RecordStreamFileListener {

static final Version HAPI_VERSION_0_38_0 = new Version(0, 38, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
// The contract id of the first synthetic transfer that could have exhibited this problem
private static final long GRANDFATHERED_ID = 2119900L;
// The created timestamp of the grandfathered id contract
static final long LOWER_BOUND_TIMESTAMP = 1680284879342064922L;
// This problem was fixed by services release 0.38.10, this is last timestamp before that release
// This problem was fixed by services release 0.38.6,protobuf release 0.38.10, this is last timestamp before that
// release
static final long UPPER_BOUND_TIMESTAMP = 1686243920981874002L;
private static final long TIMESTAMP_INCREMENT =
Duration.ofDays(1).toNanos(); // 1 day in nanoseconds which will yield 69 async iterations
Expand Down Expand Up @@ -141,6 +152,30 @@ with contractresults as (
update token_transfer set is_approval = true where consensus_timestamp = :consensus_timestamp and token_id = :token_id and account_id = :sender
""";

private final RecordFileRepository recordFileRepository;

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

// The services version 0.38.0 has the fixes this migration solves.
try {
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_38_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_38_0))
.isPresent()) {
doMigrate();
}
}
} catch (IOException e) {
log.error("Error executing the migration again after consensus_timestamp {}", streamFile.getConsensusEnd());
}
}

private enum TRANSFER_TYPE {
CRYPTO_TRANSFER,
NFT_TRANSFER,
Expand All @@ -159,10 +194,12 @@ private enum TRANSFER_TYPE {
@Lazy
public SyntheticCryptoTransferApprovalMigration(
DBProperties dbProperties,
RecordFileRepository recordFileRepository,
MirrorProperties mirrorProperties,
NamedParameterJdbcTemplate transferJdbcTemplate,
TransactionOperations transactionOperations) {
super(mirrorProperties.getMigration(), transferJdbcTemplate, dbProperties.getSchema());
this.recordFileRepository = recordFileRepository;
this.mirrorProperties = mirrorProperties;
this.transferJdbcTemplate = transferJdbcTemplate;
this.transactionOperations = transactionOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
package com.hedera.mirror.importer.migration;

import com.google.common.base.Stopwatch;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.config.Owner;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import java.util.concurrent.atomic.AtomicBoolean;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.JdbcTemplate;

@Named
public class SyntheticNftAllowanceOwnerMigration extends RepeatableMigration {
public class SyntheticNftAllowanceOwnerMigration extends RepeatableMigration implements RecordStreamFileListener {

static final Version HAPI_VERSION_0_37_0 = new Version(0, 37, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
private final RecordFileRepository recordFileRepository;

private static final String UPDATE_NFT_ALLOWANCE_OWNER_SQL =
"""
Expand Down Expand Up @@ -107,9 +117,13 @@ insert into nft_allowance (approved_for_all, owner, payer_account_id, spender, t
private final JdbcTemplate jdbcTemplate;

@Lazy
public SyntheticNftAllowanceOwnerMigration(@Owner JdbcTemplate jdbcTemplate, MirrorProperties mirrorProperties) {
public SyntheticNftAllowanceOwnerMigration(
@Owner JdbcTemplate jdbcTemplate,
MirrorProperties mirrorProperties,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration());
this.jdbcTemplate = jdbcTemplate;
this.recordFileRepository = recordFileRepository;
}

@Override
Expand All @@ -129,4 +143,22 @@ protected void doMigrate() {
jdbcTemplate.execute(UPDATE_NFT_ALLOWANCE_OWNER_SQL);
log.info("Updated nft allowance owners in {}", stopwatch);
}

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

// The services version 0.37.0 has the fixes this migration solves.
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_37_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_37_0))
.isPresent()) {
doMigrate();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
package com.hedera.mirror.importer.migration;

import com.google.common.base.Stopwatch;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.config.Owner;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import java.util.concurrent.atomic.AtomicBoolean;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.JdbcTemplate;

@Named
public class SyntheticTokenAllowanceOwnerMigration extends RepeatableMigration {
public class SyntheticTokenAllowanceOwnerMigration extends RepeatableMigration implements RecordStreamFileListener {

static final Version HAPI_VERSION_0_37_0 = new Version(0, 37, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
private final RecordFileRepository recordFileRepository;

private static final String UPDATE_TOKEN_ALLOWANCE_OWNER_SQL =
"""
Expand Down Expand Up @@ -110,9 +120,13 @@ insert into token_allowance (amount, amount_granted, owner, payer_account_id, sp
private final JdbcTemplate jdbcTemplate;

@Lazy
public SyntheticTokenAllowanceOwnerMigration(@Owner JdbcTemplate jdbcTemplate, MirrorProperties mirrorProperties) {
public SyntheticTokenAllowanceOwnerMigration(
@Owner JdbcTemplate jdbcTemplate,
MirrorProperties mirrorProperties,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration());
this.jdbcTemplate = jdbcTemplate;
this.recordFileRepository = recordFileRepository;
}

@Override
Expand All @@ -132,4 +146,22 @@ protected void doMigrate() {
jdbcTemplate.execute(UPDATE_TOKEN_ALLOWANCE_OWNER_SQL);
log.info("Updated token allowance owners in {}", stopwatch);
}

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

// The services version 0.38.0 has the fixes this migration solves.
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_37_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_37_0))
.isPresent()) {
doMigrate();
}
}
}
}
Loading

0 comments on commit 73e0fa8

Please sign in to comment.