Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rerun conditional migrations #6565

Merged
merged 58 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
102dff1
Initial commit to rerun said migrations.
mgoelswirlds Aug 1, 2023
ae2ad08
Changes to fix sonarcloud issues.
mgoelswirlds Aug 1, 2023
901eb32
Changes to fix sonarcloud issues.
mgoelswirlds Aug 1, 2023
bc8af67
Adding common code to repeatable migration.
mgoelswirlds Aug 2, 2023
ae0ac55
Adding test cases and renaming variables.
mgoelswirlds Aug 2, 2023
0388007
Adding code back to respective migration tests.
mgoelswirlds Aug 3, 2023
488f89c
Removing unneeded variables.
mgoelswirlds Aug 3, 2023
30aa94a
Removing extra file.
mgoelswirlds Aug 3, 2023
9580a62
Set items to null.
mgoelswirlds Aug 4, 2023
ea2991a
Update hedera-mirror-importer/src/main/java/com/hedera/mirror/importe…
mgoelswirlds Aug 4, 2023
7f575b9
Update hedera-mirror-importer/src/main/java/com/hedera/mirror/importe…
mgoelswirlds Aug 4, 2023
2c6e69e
Update hedera-mirror-importer/src/main/java/com/hedera/mirror/importe…
mgoelswirlds Aug 4, 2023
8888db3
PR comment changes.TokenAccountBalanceMigration
mgoelswirlds Aug 4, 2023
305bc8b
Undoing the last changes and adding transaction based event listeners…
mgoelswirlds Aug 8, 2023
0653952
Merge branch 'main' into 6397-Time-sensitive-migrations
mgoelswirlds Aug 8, 2023
e35f1ba
Fixing merge conflict.
mgoelswirlds Aug 8, 2023
79feac5
Addressing PR comments.
mgoelswirlds Aug 8, 2023
cd3a698
Changing comments to match correct services version
mgoelswirlds Aug 8, 2023
606334b
Changing the version to match the services version
mgoelswirlds Aug 8, 2023
8806622
Addressing PR comments
mgoelswirlds Aug 8, 2023
392813e
combining ifs
mgoelswirlds Aug 8, 2023
fc5d337
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 9, 2023
4f5ccd7
fixing tests
mgoelswirlds Aug 9, 2023
32789d7
Undoing side effects.
mgoelswirlds Aug 9, 2023
20d71a2
Addressing PR comments
mgoelswirlds Aug 9, 2023
1143306
Fixing InitializeEntityBalanceMigration
mgoelswirlds Aug 10, 2023
f11db71
Adding tests to the synthetic migrations.
mgoelswirlds Aug 10, 2023
8a2340a
Addressing PR comments.
mgoelswirlds Aug 10, 2023
c850cb4
removing a file checked in by mistke
mgoelswirlds Aug 10, 2023
dbf7c61
Addressing PR comments
mgoelswirlds Aug 10, 2023
49463bb
Changing synthetic migrations to run based on consensus timestamp and…
mgoelswirlds Aug 13, 2023
8879cb5
Making migration changes to be more deterministic.
mgoelswirlds Aug 14, 2023
22a330e
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 14, 2023
98b0d70
missed checkin
mgoelswirlds Aug 14, 2023
3d03c94
Update hedera-mirror-importer/src/main/java/com/hedera/mirror/importe…
mgoelswirlds Aug 14, 2023
1dcd887
PR comment changes
mgoelswirlds Aug 14, 2023
99f8f68
PR comment changes
mgoelswirlds Aug 14, 2023
d701633
Fix test
mgoelswirlds Aug 14, 2023
42e323a
Pulling up code in an abstract class
mgoelswirlds Aug 15, 2023
b164527
Moving the onEnd logic in the common base class.
mgoelswirlds Aug 15, 2023
4e9737a
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 15, 2023
63c9f10
Adding the logic to save to the repository before running onENd.
mgoelswirlds Aug 15, 2023
15e19dc
Undoing Account balance file save changes.
mgoelswirlds Aug 15, 2023
ed59c5f
Update README.md
mgoelswirlds Aug 15, 2023
4bd8e6e
Saving the timeOffset for errata migration
mgoelswirlds Aug 15, 2023
e71e85b
Merge branch '6397-Time-sensitive-migrations' of https://github.com/h…
mgoelswirlds Aug 15, 2023
1ac1042
Providing order priority to Migrations.
mgoelswirlds Aug 16, 2023
97a1395
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 16, 2023
e127396
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 16, 2023
45702ab
Adding coverage.
mgoelswirlds Aug 16, 2023
c4acb56
Refactoring tests to add common setup methods.
mgoelswirlds Aug 17, 2023
180015f
Merge remote-tracking branch 'origin/main' into 6397-Time-sensitive-m…
mgoelswirlds Aug 17, 2023
c393971
Fixing Initialize balance test coverage.
mgoelswirlds Aug 17, 2023
604bd3e
Fixing Initialize balance test coverage.
mgoelswirlds Aug 17, 2023
4e752d9
Adding more coverage.
mgoelswirlds Aug 17, 2023
00815e1
Creating new migration objects in each onEnd test for InitializeEntit…
mgoelswirlds Aug 18, 2023
6e8ee7a
Deleting leftover method.
mgoelswirlds Aug 18, 2023
e7ddc3a
moving migration initialization to before each.
mgoelswirlds Aug 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/database/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,11 @@ Following are the prerequisites and steps for migrating V1 data to V2.
6. Stop the [Importer](/docs/importer/README.md) process.
7. Run the [migration.sh](/hedera-mirror-importer/src/main/resources/db/scripts/v2/migration.sh) script.
8. Update the mirror node configuration to point to the new Citus DB and start it.

## Repeatable migration
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
The following migrations would be rerun automatically when the first account balance file is parsed:
* InitializeEntityBalanceMigration
* TokenAccountBalanceMigration

If there is an error during the rerun, or for some reason the migration does not run, it can be manually rerun using the steps provided at:
[Importer-Initialize Entity Balance](/docs/importer/README.md#Initialize Entity Balance)
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.hedera.mirror.importer.parser.record.entity.EntityProperties;
import com.hedera.mirror.importer.parser.record.entity.EntityRecordItemListener;
import com.hedera.mirror.importer.reader.ValidatedDataInputStream;
import com.hedera.mirror.importer.repository.AccountBalanceFileRepository;
import com.hedera.mirror.importer.repository.TokenTransferRepository;
import com.hedera.mirror.importer.repository.TransactionRepository;
import com.hederahashgraph.api.proto.java.Transaction;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class ErrataMigration extends RepeatableMigration implements BalanceStrea
@Value("classpath:errata/mainnet/balance-offsets.txt")
private final Resource balanceOffsets;

private final AccountBalanceFileRepository accountBalanceFileRepository;
private final EntityRecordItemListener entityRecordItemListener;
private final EntityProperties entityProperties;
private final NamedParameterJdbcOperations jdbcOperations;
Expand All @@ -81,6 +83,7 @@ public class ErrataMigration extends RepeatableMigration implements BalanceStrea
@SuppressWarnings("java:S107")
public ErrataMigration(
Resource balanceOffsets,
AccountBalanceFileRepository accountBalanceFileRepository,
EntityRecordItemListener entityRecordItemListener,
EntityProperties entityProperties,
NamedParameterJdbcOperations jdbcOperations,
Expand All @@ -91,6 +94,7 @@ public ErrataMigration(
TransactionRepository transactionRepository) {
super(mirrorProperties.getMigration());
this.balanceOffsets = balanceOffsets;
this.accountBalanceFileRepository = accountBalanceFileRepository;
this.entityRecordItemListener = entityRecordItemListener;
this.entityProperties = entityProperties;
this.jdbcOperations = jdbcOperations;
Expand Down Expand Up @@ -122,6 +126,7 @@ public void onEnd(AccountBalanceFile accountBalanceFile) {
if (shouldApplyFixedTimeOffset(consensusTimestamp)) {
accountBalanceFile.setTimeOffset(ACCOUNT_BALANCE_FILE_FIXED_TIME_OFFSET);
}
accountBalanceFileRepository.save(accountBalanceFile);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@

import com.google.common.base.Stopwatch;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.repository.AccountBalanceFileRepository;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.jdbc.core.JdbcOperations;

@Named
public class InitializeEntityBalanceMigration extends RepeatableMigration {
@Order
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
public class InitializeEntityBalanceMigration extends TimeSensitiveBalanceMigration {

private static final String INITIALIZE_ENTITY_BALANCE_SQL =
"""
Expand Down Expand Up @@ -66,8 +70,12 @@ on conflict (id) do update
private final JdbcOperations jdbcOperations;

@Lazy
public InitializeEntityBalanceMigration(JdbcOperations jdbcOperations, MirrorProperties mirrorProperties) {
super(mirrorProperties.getMigration());
public InitializeEntityBalanceMigration(
JdbcOperations jdbcOperations,
MirrorProperties mirrorProperties,
AccountBalanceFileRepository accountBalanceFileRepository,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration(), accountBalanceFileRepository, recordFileRepository);
this.jdbcOperations = jdbcOperations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,43 @@
package com.hedera.mirror.importer.migration;

import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.db.DBProperties;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import com.hederahashgraph.api.proto.java.Key;
import jakarta.inject.Named;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Data;
import lombok.Getter;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.transaction.support.TransactionOperations;

@Named
public class SyntheticCryptoTransferApprovalMigration extends AsyncJavaMigration<Long> {
public class SyntheticCryptoTransferApprovalMigration extends AsyncJavaMigration<Long>
implements RecordStreamFileListener {

static final Version HAPI_VERSION_0_38_0 = new Version(0, 38, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
// The contract id of the first synthetic transfer that could have exhibited this problem
private static final long GRANDFATHERED_ID = 2119900L;
// The created timestamp of the grandfathered id contract
static final long LOWER_BOUND_TIMESTAMP = 1680284879342064922L;
// This problem was fixed by services release 0.38.10, this is last timestamp before that release
// This problem was fixed by services release 0.38.6,protobuf release 0.38.10, this is last timestamp before that
// release
static final long UPPER_BOUND_TIMESTAMP = 1686243920981874002L;
private static final long TIMESTAMP_INCREMENT =
Duration.ofDays(1).toNanos(); // 1 day in nanoseconds which will yield 69 async iterations
Expand Down Expand Up @@ -141,6 +152,30 @@
update token_transfer set is_approval = true where consensus_timestamp = :consensus_timestamp and token_id = :token_id and account_id = :sender
""";

private final RecordFileRepository recordFileRepository;

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

xin-hedera marked this conversation as resolved.
Show resolved Hide resolved
// The services version 0.38.0 has the fixes this migration solves.
try {
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_38_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_38_0))
.isPresent()) {
doMigrate();
}
}
} catch (IOException e) {
log.error("Error executing the migration again after consensus_timestamp {}", streamFile.getConsensusEnd());

Check warning on line 175 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/SyntheticCryptoTransferApprovalMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/SyntheticCryptoTransferApprovalMigration.java#L174-L175

Added lines #L174 - L175 were not covered by tests
}
}

private enum TRANSFER_TYPE {
CRYPTO_TRANSFER,
NFT_TRANSFER,
Expand All @@ -159,10 +194,12 @@
@Lazy
public SyntheticCryptoTransferApprovalMigration(
DBProperties dbProperties,
RecordFileRepository recordFileRepository,
MirrorProperties mirrorProperties,
NamedParameterJdbcTemplate transferJdbcTemplate,
TransactionOperations transactionOperations) {
super(mirrorProperties.getMigration(), transferJdbcTemplate, dbProperties.getSchema());
this.recordFileRepository = recordFileRepository;
this.mirrorProperties = mirrorProperties;
this.transferJdbcTemplate = transferJdbcTemplate;
this.transactionOperations = transactionOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
package com.hedera.mirror.importer.migration;

import com.google.common.base.Stopwatch;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.config.Owner;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import java.util.concurrent.atomic.AtomicBoolean;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.JdbcTemplate;

@Named
public class SyntheticNftAllowanceOwnerMigration extends RepeatableMigration {
public class SyntheticNftAllowanceOwnerMigration extends RepeatableMigration implements RecordStreamFileListener {

static final Version HAPI_VERSION_0_37_0 = new Version(0, 37, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
private final RecordFileRepository recordFileRepository;

private static final String UPDATE_NFT_ALLOWANCE_OWNER_SQL =
"""
Expand Down Expand Up @@ -107,9 +117,13 @@ insert into nft_allowance (approved_for_all, owner, payer_account_id, spender, t
private final JdbcTemplate jdbcTemplate;

@Lazy
public SyntheticNftAllowanceOwnerMigration(@Owner JdbcTemplate jdbcTemplate, MirrorProperties mirrorProperties) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration took 779 ms on mainnet.So not converting it to Async.

public SyntheticNftAllowanceOwnerMigration(
@Owner JdbcTemplate jdbcTemplate,
MirrorProperties mirrorProperties,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration());
this.jdbcTemplate = jdbcTemplate;
this.recordFileRepository = recordFileRepository;
}

@Override
Expand All @@ -129,4 +143,22 @@ protected void doMigrate() {
jdbcTemplate.execute(UPDATE_NFT_ALLOWANCE_OWNER_SQL);
log.info("Updated nft allowance owners in {}", stopwatch);
}

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

// The services version 0.37.0 has the fixes this migration solves.
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_37_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_37_0))
.isPresent()) {
doMigrate();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
package com.hedera.mirror.importer.migration;

import com.google.common.base.Stopwatch;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.config.Owner;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.record.RecordStreamFileListener;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import jakarta.inject.Named;
import java.util.concurrent.atomic.AtomicBoolean;
import org.flywaydb.core.api.MigrationVersion;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.util.Version;
import org.springframework.jdbc.core.JdbcTemplate;

@Named
public class SyntheticTokenAllowanceOwnerMigration extends RepeatableMigration {
public class SyntheticTokenAllowanceOwnerMigration extends RepeatableMigration implements RecordStreamFileListener {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This migration took 10158 ms on mainnet.So not converting it to Async.

Copy link
Collaborator

@xin-hedera xin-hedera Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt it can work in environment where the owner and regular user are different, e.g., mirror_node and mirror_importer.

The migration uses the owner jdbcTemplate bean because it deletes rows. in some environment, mirror_importer may not be able to delete rows.

When the migration runs inside onEnd, it's in the same database transaction, and the connection in the thread local storage uses regular user mirror_importer. The migration most likely uses the same connection, and may fail to delete rows.

Copy link
Collaborator

@xin-hedera xin-hedera Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you check if running it inside onEnd would actually pick a connection with different credentials other than owner?

this is a blocker for both SyntheticNftAllowanceOwnerMigration and SyntheticTokenAllowanceOwnerMigration

No I did not. Checking now.

Copy link
Member Author

@mgoelswirlds mgoelswirlds Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I did not. Will need to check this. looking now.
Even if it is a different user i.e. mirror_importer, I need to verify that it can delete the rows right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can discuss. running with mirror_importer may not break ourselves but can break other mirrornode operators in case in their environment mirror_importer doesn't have permission to delete rows.

then we may have to turn these two migrations to be async.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Lets discuss further offline.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that even when migrate is called from with the onEnd it uses the mirror_node user.


static final Version HAPI_VERSION_0_37_0 = new Version(0, 37, 0);
private final AtomicBoolean executed = new AtomicBoolean(false);
private final RecordFileRepository recordFileRepository;

private static final String UPDATE_TOKEN_ALLOWANCE_OWNER_SQL =
"""
Expand Down Expand Up @@ -110,9 +120,13 @@ insert into token_allowance (amount, amount_granted, owner, payer_account_id, sp
private final JdbcTemplate jdbcTemplate;

@Lazy
public SyntheticTokenAllowanceOwnerMigration(@Owner JdbcTemplate jdbcTemplate, MirrorProperties mirrorProperties) {
public SyntheticTokenAllowanceOwnerMigration(
@Owner JdbcTemplate jdbcTemplate,
MirrorProperties mirrorProperties,
RecordFileRepository recordFileRepository) {
super(mirrorProperties.getMigration());
this.jdbcTemplate = jdbcTemplate;
this.recordFileRepository = recordFileRepository;
}

@Override
Expand All @@ -132,4 +146,22 @@ protected void doMigrate() {
jdbcTemplate.execute(UPDATE_TOKEN_ALLOWANCE_OWNER_SQL);
log.info("Updated token allowance owners in {}", stopwatch);
}

@Override
public void onEnd(RecordFile streamFile) throws ImporterException {
if (streamFile == null) {
return;
}

// The services version 0.38.0 has the fixes this migration solves.
if (streamFile.getHapiVersion().isGreaterThanOrEqualTo(HAPI_VERSION_0_37_0)
&& executed.compareAndSet(false, true)) {
var latestFile = recordFileRepository.findLatestBefore(streamFile.getConsensusStart());
if (latestFile
.filter(f -> f.getHapiVersion().isLessThan(HAPI_VERSION_0_37_0))
.isPresent()) {
doMigrate();
}
}
}
}
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.migration;

import com.hedera.mirror.common.domain.balance.AccountBalanceFile;
import com.hedera.mirror.common.domain.transaction.RecordFile;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.balance.BalanceStreamFileListener;
import com.hedera.mirror.importer.repository.AccountBalanceFileRepository;
import com.hedera.mirror.importer.repository.RecordFileRepository;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

abstract class TimeSensitiveBalanceMigration extends RepeatableMigration
implements BalanceStreamFileListener, TransactionSynchronization {

private final AccountBalanceFileRepository accountBalanceFileRepository;
private final RecordFileRepository recordFileRepository;
private final AtomicLong firstConsensusTimestamp = new AtomicLong(0L);
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
private final AtomicBoolean succeeded = new AtomicBoolean(false);
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved

protected TimeSensitiveBalanceMigration(
Map<String, MigrationProperties> migrationPropertiesMap,
AccountBalanceFileRepository accountBalanceFileRepository,
RecordFileRepository recordFileRepository) {
super(migrationPropertiesMap);
this.accountBalanceFileRepository = accountBalanceFileRepository;
this.recordFileRepository = recordFileRepository;
}

@Override
public void onEnd(AccountBalanceFile accountBalanceFile) throws ImporterException {
try {
if (firstConsensusTimestamp.get() == -1 || succeeded.get()) {
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
return;

Check warning on line 53 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java#L53

Added line #L53 was not covered by tests
}

if (firstConsensusTimestamp.get() == 0) {
// Check if this is the first account balance file after importer startup
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
// Set current file timestamp to firstConsensusTimestamp.
if (accountBalanceFileRepository
.findLatestBefore(accountBalanceFile.getConsensusTimestamp())
.isEmpty()) {
firstConsensusTimestamp.set(accountBalanceFile.getConsensusTimestamp());
} else {
// Set firstConsensusTimestamp to -1 to add an early return in case of existing account balance
// files.
firstConsensusTimestamp.set(-1);
return;

Check warning on line 67 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java#L66-L67

Added lines #L66 - L67 were not covered by tests
}
}

// Check if at-least one recordFile after the account balance file has been parsed,the migration will then
// update rows
if (recordFileRepository
.findLatest()
.map(RecordFile::getConsensusEnd)
.filter(timestamp -> timestamp >= firstConsensusTimestamp.get())
.isPresent()) {
TransactionSynchronizationManager.registerSynchronization(this);
doMigrate();
}
} catch (IOException e) {
log.error(

Check warning on line 82 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java#L81-L82

Added lines #L81 - L82 were not covered by tests
"Error executing the migration again after consensus_timestamp {}",
accountBalanceFile.getConsensusTimestamp());

Check warning on line 84 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java#L84

Added line #L84 was not covered by tests
}
}

@Override
public void afterCommit() {
succeeded.set(true);
}

Check warning on line 91 in hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java

View check run for this annotation

Codecov / codecov/patch

hedera-mirror-importer/src/main/java/com/hedera/mirror/importer/migration/TimeSensitiveBalanceMigration.java#L90-L91

Added lines #L90 - L91 were not covered by tests
}
Loading