Skip to content

Commit

Permalink
* implement retry
Browse files Browse the repository at this point in the history
* add logic to skip partition creation if not needed

Signed-off-by: Jesse Nelson <jesse@swirldslabs.com>
  • Loading branch information
jnels124 committed Aug 10, 2023
1 parent f442d2c commit c20cea0
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.db;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class PartitionInfo {
private String parentTable;
private String partitionColumn;
private long nextFrom;
private long nextTo;
private long maxEntityId;
private boolean timePartition;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,128 +16,26 @@

package com.hedera.mirror.importer.db;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;
import lombok.*;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.boot.context.event.ApplicationPreparedEvent;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Profile;
import org.springframework.context.event.EventListener;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

@CustomLog
@Component
@RequiredArgsConstructor
@Profile("v2")
public class PartitionMaintenance {
private static final String TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN = "^(.*_timestamp|consensus_end)$";
private static final DateTimeFormatter CITUS_TIME_PARTITION_NAME_FORMATTER =
DateTimeFormatter.ofPattern("'_p'yyyy_MM_dd_HHmmss").withZone(ZoneId.of("UTC"));
private final PartitionMaintenanceService service;

// todo can probably drop these cases statements. One becomes boolean comparrison and the other can be changed to
// use same calculation
private static final String TABLE_INFO_QUERY = "SELECT "
+ " tp.parent_table, "
+ " tp.partition_column, "
+ " max(tp.to_value::bigint) as next_from, "
+ " case when tp.partition_column::varchar ~ '"
+ TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN + "'"
+ " then extract(epoch from (to_timestamp(max(tp.to_value::bigint / 1000000000)) + (?::interval))) * 1000000000 "
+ " else 2 * max(tp.to_value::bigint) - max(tp.from_value::bigint) "
+ " end as next_to, "
+ " case when tp.partition_column::varchar ~ '"
+ TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN + "'" + " then true "
+ " else false "
+ " end as time_partition "
+ " from time_partitions tp group by tp.parent_table, tp.partition_column";

private static final String CREATE_TIME_PARTITIONS_SQL = "select create_time_partitions(table_name := ?,"
+ " partition_interval := ?::interval,"
+ " start_from := ?,"
+ " end_at := ?)";

private final JdbcTemplate jdbcTemplate;
private final TransactionTemplate transactionTemplate;
private final PartitionMaintenanceConfiguration maintenanceConfig;

@EventListener(ApplicationPreparedEvent.class)
public void test(ApplicationPreparedEvent event) throws Exception {
Thread.sleep(90L * 1000);
}

// TODO:// configure ...
// @Scheduled(cron = "${hedera.mirror.importer.db.maintenance.cron:0 0 0 1 * *}")
@Scheduled(initialDelay = 0, fixedRate = 100000000000L)
// TODO:// retry logic
@Retryable(retryFor = DataAccessException.class)
@Scheduled(cron = "${hedera.mirror.importer.db.maintenance.cron:0 0 0 1 * ?}")
public void runMaintenance() {

List<PartitionInfo> newPartitions = jdbcTemplate.query(
TABLE_INFO_QUERY,
new DataClassRowMapper<>(PartitionInfo.class),
maintenanceConfig.getTimePartitionInterval());

newPartitions.forEach(table -> transactionTemplate.executeWithoutResult(status -> {
LocalDateTime start = Instant.ofEpochSecond(0L, table.getNextFrom())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();
LocalDateTime end = Instant.ofEpochSecond(0L, table.getNextTo())
.atZone(ZoneId.systemDefault())
.toLocalDateTime();

if (!table.isTimePartition()
&& table.getMaxEntityId() * maintenanceConfig.maxEntityIdRatio < table.getNextFrom()) {
log.info("No new partition needed. Skipping creation for partition {}", table);
return;
}
String interval = table.isTimePartition()
? maintenanceConfig.timePartitionInterval
: maintenanceConfig.idPartitionInterval;
Boolean created = jdbcTemplate.queryForObject(
CREATE_TIME_PARTITIONS_SQL, Boolean.class, table.parentTable, interval, start, end);

if (BooleanUtils.isTrue(created) && !table.isTimePartition()) {
// Work around granularity issue of citus table names
String createdPartitionName =
table.getParentTable() + CITUS_TIME_PARTITION_NAME_FORMATTER.format(start);
long partitionDuration = table.nextTo - table.nextFrom;
long partitionCount = table.nextFrom / partitionDuration;
String newPartitionName = String.format("%s_p%d", table.parentTable, partitionCount);
jdbcTemplate.execute("ALTER table " + createdPartitionName + " rename to " + newPartitionName);
log.info("Renamed {} to {}", createdPartitionName, newPartitionName);
service.getNextPartitions().forEach(partitionInfo -> {
try {
service.createPartition(partitionInfo);
} catch (Exception e) {
log.error("Unable to create partition {}", partitionInfo, e);
}
log.info("Partition {} created {}", table, created);
}));
}

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class PartitionInfo {
private String parentTable;
private String partitionColumn;
private long nextFrom;
private long nextTo;
private long maxEntityId;
private boolean timePartition;
}

@ConfigurationProperties("hedera.mirror.importer.db.maintenance")
@Data
private static class PartitionMaintenanceConfiguration {
private String timePartitionInterval = "1 month";
private String idPartitionInterval = ".001 seconds";
private double maxEntityIdRatio = 2.0;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.db;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("hedera.mirror.importer.db.maintenance")
@Data
class PartitionMaintenanceConfiguration {
private String timePartitionInterval = "1 month";
private String idPartitionInterval = ".001 seconds";
private double maxEntityIdRatio = 2.0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hedera.mirror.importer.db;

import static jakarta.transaction.Transactional.TxType.REQUIRES_NEW;

import jakarta.inject.Named;
import jakarta.transaction.Transactional;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.List;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.DataClassRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.retry.annotation.Retryable;

@Named
@RequiredArgsConstructor
@CustomLog
public class PartitionMaintenanceService {
private static final String TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN = "^(.*_timestamp|consensus_end)$";
private static final ZoneId PARTITION_BOUND_TIMEZONE = ZoneId.of("UTC");
private static final DateTimeFormatter CITUS_TIME_PARTITION_NAME_FORMATTER =
DateTimeFormatter.ofPattern("'_p'yyyy_MM_dd_HHmmss").withZone(PARTITION_BOUND_TIMEZONE);
private static final String TABLE_INFO_QUERY = "SELECT "
+ " tp.parent_table, "
+ " tp.partition_column, "
+ " max(tp.to_value::bigint) as next_from, "
+ " case when tp.partition_column::varchar ~ '"
+ TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN + "'"
+ " then extract(epoch from (to_timestamp(max(tp.to_value::bigint / 1000000000.0)) + (?::interval))) * 1000000000 "
+ " else extract(epoch from (to_timestamp(max(tp.to_value::bigint / 1000000000.0)) + (?::interval))) * 1000000000 "
+ " end as next_to, "
+ " (tp.partition_column::varchar ~ '" + TIMESTAMP_PARTITION_COLUMN_NAME_PATTERN
+ "') as time_partition,"
+ " (select COALESCE(max(id), 1) from entity) as max_entity_id"
+ " from time_partitions tp group by tp.parent_table, tp.partition_column";

private static final String CREATE_TIME_PARTITIONS_SQL = "select create_time_partitions(table_name := ?,"
+ " partition_interval := ?::interval,"
+ " start_from := ?,"
+ " end_at := ?)";

private final JdbcTemplate jdbcTemplate;
private final PartitionMaintenanceConfiguration maintenanceConfig;

protected List<PartitionInfo> getNextPartitions() {
return jdbcTemplate.query(
TABLE_INFO_QUERY,
new DataClassRowMapper<>(PartitionInfo.class),
maintenanceConfig.getTimePartitionInterval(),
maintenanceConfig.getIdPartitionInterval());
}

@Transactional(REQUIRES_NEW)
@Retryable(retryFor = DataAccessException.class)
protected boolean createPartition(PartitionInfo partitionInfo) {
LocalDateTime start = LocalDateTime.ofInstant(
Instant.ofEpochSecond(0L, partitionInfo.getNextFrom()), PARTITION_BOUND_TIMEZONE);
LocalDateTime end =
LocalDateTime.ofInstant(Instant.ofEpochSecond(0L, partitionInfo.getNextTo()), PARTITION_BOUND_TIMEZONE);
Duration partitionDuration = Duration.between(start, end);

// will always premake one timePartition
boolean makeTimePartition = partitionInfo.isTimePartition()
&& LocalDateTime.now(PARTITION_BOUND_TIMEZONE)
.plus(partitionDuration)
.isAfter(start);
boolean makeIdPartition = !partitionInfo.isTimePartition()
&& partitionInfo.getMaxEntityId() * maintenanceConfig.getMaxEntityIdRatio()
>= partitionInfo.getNextFrom();
boolean skipPartition = !(makeIdPartition || makeTimePartition);
if (skipPartition) {
log.info("No new partition needed. Skipping creation for partition {}", partitionInfo);
return false;
}
String interval = partitionInfo.isTimePartition()
? maintenanceConfig.getTimePartitionInterval()
: maintenanceConfig.getIdPartitionInterval();
boolean created = BooleanUtils.isTrue(jdbcTemplate.queryForObject(
CREATE_TIME_PARTITIONS_SQL, Boolean.class, partitionInfo.getParentTable(), interval, start, end));

if (created && !partitionInfo.isTimePartition()) {
// Work around granularity issue of citus partitionInfo names
String createdPartitionName =
partitionInfo.getParentTable() + CITUS_TIME_PARTITION_NAME_FORMATTER.format(start);
long partitionCount = partitionInfo.getNextFrom() / partitionDuration.toNanos();
String updatePartitionNameSql = String.format(
"alter table %s rename to %s_p%d",
createdPartitionName, partitionInfo.getParentTable(), partitionCount);
jdbcTemplate.execute(updatePartitionNameSql);
log.info("Renamed partition: {}", updatePartitionNameSql);
}
log.info("Partition {} created {}", partitionInfo, created);
return created;
}
}
2 changes: 1 addition & 1 deletion hedera-mirror-importer/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ spring:
url: jdbc:postgresql://${hedera.mirror.importer.db.host}:${hedera.mirror.importer.db.port}/${hedera.mirror.importer.db.name}?tcpKeepAlive=true
username: ${hedera.mirror.importer.db.username}
hikari:
connection-init-sql: set temp_buffers='${hedera.mirror.importer.parser.tempTableBufferSize}MB';
connection-init-sql: set temp_buffers='${hedera.mirror.importer.parser.tempTableBufferSize}MB'; set time zone 'UTC'
# Note: Flyway does not use Hikari so these properties are ignored. Use URL properties for Flyway instead.
data-source-properties:
#loggerLevel: TRACE
Expand Down

0 comments on commit c20cea0

Please sign in to comment.