From 15f4c0417d0f396cd2d7427b087ecb289108d12b Mon Sep 17 00:00:00 2001 From: wangwj Date: Fri, 20 Dec 2024 16:39:54 +0800 Subject: [PATCH] [hotfix] Fix some typo and format some code (#108) --- .../lakehouse/paimon/PaimonBucketAssigner.java | 2 +- .../fluss/client/metrics/ScannerMetricGroup.java | 7 ++++--- .../fluss/client/scanner/log/LogFetcher.java | 2 -- .../main/java/com/alibaba/fluss/fs/FsPath.java | 2 +- .../alibaba/fluss/fs/local/LocalFileSystem.java | 6 ++---- .../java/com/alibaba/fluss/row/BinaryString.java | 15 +++++---------- .../java/com/alibaba/fluss/utils/NetUtils.java | 6 +++--- .../com/alibaba/fluss/utils/UnionIterator.java | 2 +- .../fluss/fs/SafetyNetCloseableRegistryTest.java | 2 +- .../com/alibaba/fluss/record/LogTestBase.java | 2 +- .../fluss/row/TestInternalRowGenerator.java | 2 +- .../alibaba/fluss/row/indexed/IndexedRowTest.java | 2 +- .../fluss/testutils/LogRecordBatchAssert.java | 2 +- .../alibaba/fluss/utils/UnionIteratorTest.java | 2 +- .../initializer/BucketOffsetsRetrieverImpl.java | 2 +- .../connector/flink/utils/FlinkConversions.java | 4 +--- .../fluss/fs/hdfs/HadoopDataInputStreamTest.java | 2 +- .../sink/committer/PaimonStoreMultiCommitter.java | 6 ++---- .../alibaba/fluss/metrics/jmx/JMXServerTest.java | 3 +-- .../fluss/rpc/metrics/ClientMetricGroup.java | 7 ++++--- .../com/alibaba/fluss/rpc/protocol/Errors.java | 1 - .../kv/snapshot/RocksIncrementalSnapshot.java | 5 +++-- .../fluss/server/kv/wal/ArrowWalBuilder.java | 2 -- .../metrics/group/CoordinatorMetricGroup.java | 6 +++--- .../metrics/group/TabletServerMetricGroup.java | 6 +++--- .../TestCompletedSnapshotHandleStore.java | 15 ++------------- .../replica/fetcher/ReplicaFetcherThreadTest.java | 4 ++-- 27 files changed, 46 insertions(+), 71 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java index 3661db29..3389ece0 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lakehouse/paimon/PaimonBucketAssigner.java @@ -48,7 +48,7 @@ public PaimonBucketAssigner(RowType rowType, List bucketKey, int bucketN this.flussRowWrapper = new FlussRowWrapper(); } - private static int[] getBucketKeyIndex(RowType rowType, List bucketKey) { + private int[] getBucketKeyIndex(RowType rowType, List bucketKey) { int[] bucketKeyIndex = new int[bucketKey.size()]; for (int i = 0; i < bucketKey.size(); i++) { bucketKeyIndex[i] = rowType.getFieldIndex(bucketKey.get(i)); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java index 999383eb..5af906d6 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/metrics/ScannerMetricGroup.java @@ -37,7 +37,8 @@ /** The metric group for scanner, including {@link LogScanner} and {@link SnapshotScanner}. */ @Internal public class ScannerMetricGroup extends AbstractMetricGroup { - private static final String name = "scanner"; + + private static final String NAME = "scanner"; private static final int WINDOW_SIZE = 1024; private final TablePath tablePath; @@ -57,7 +58,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup { private volatile long pollStartMs; public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) { - super(parent.getMetricRegistry(), makeScope(parent, name), parent); + super(parent.getMetricRegistry(), makeScope(parent, NAME), parent); this.tablePath = tablePath; fetchRequestCount = new ThreadSafeSimpleCounter(); @@ -122,7 +123,7 @@ private long lastPollSecondsAgo() { @Override protected String getGroupName(CharacterFilter filter) { - return name; + return NAME; } @Override diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java index f047572b..00a6c455 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/LogFetcher.java @@ -87,7 +87,6 @@ public class LogFetcher implements Closeable { private final LogRecordReadContext remoteReadContext; @Nullable private final Projection projection; private final RpcClient rpcClient; - private final Configuration conf; private final int maxFetchBytes; private final int maxBucketFetchBytes; private final boolean isCheckCrcs; @@ -121,7 +120,6 @@ public LogFetcher( this.projection = projection; this.rpcClient = rpcClient; this.logScannerStatus = logScannerStatus; - this.conf = conf; this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes(); this.maxBucketFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes(); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/fs/FsPath.java b/fluss-common/src/main/java/com/alibaba/fluss/fs/FsPath.java index 4c344299..6476d0f5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/fs/FsPath.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/fs/FsPath.java @@ -189,7 +189,7 @@ public FsPath(String pathString) { } // uri path is the rest of the string -- query & fragment not supported - final String path = pathString.substring(start, pathString.length()); + final String path = pathString.substring(start); initialize(scheme, authority, path); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/fs/local/LocalFileSystem.java b/fluss-common/src/main/java/com/alibaba/fluss/fs/local/LocalFileSystem.java index d8a325a4..04619ea0 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/fs/local/LocalFileSystem.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/fs/local/LocalFileSystem.java @@ -134,11 +134,9 @@ public boolean delete(final FsPath f, final boolean recursive) throws IOExceptio File[] containedFiles = file.listFiles(); if (containedFiles == null) { throw new IOException( - "Directory " - + file.toString() - + " does not exist or an I/O error occurred"); + "Directory " + file + " does not exist or an I/O error occurred"); } else if (containedFiles.length != 0) { - throw new IOException("Directory " + file.toString() + " is not empty"); + throw new IOException("Directory " + file + " is not empty"); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/BinaryString.java b/fluss-common/src/main/java/com/alibaba/fluss/row/BinaryString.java index 377a0a73..5b369793 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/BinaryString.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/BinaryString.java @@ -22,7 +22,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -525,7 +524,7 @@ public BinaryString toUpperCase() { // fallback return javaToUpperCase(); } - int upper = Character.toUpperCase((int) b); + int upper = Character.toUpperCase(b); if (upper > 127) { // fallback return javaToUpperCase(); @@ -559,7 +558,7 @@ public BinaryString toLowerCase() { // fallback return javaToLowerCase(); } - int lower = Character.toLowerCase((int) b); + int lower = Character.toLowerCase(b); if (lower > 127) { // fallback return javaToLowerCase(); @@ -808,13 +807,9 @@ public static int encodeUTF8(String str, byte[] bytes) { } public static int defaultEncodeUTF8(String str, byte[] bytes) { - try { - byte[] buffer = str.getBytes("UTF-8"); - System.arraycopy(buffer, 0, bytes, 0, buffer.length); - return buffer.length; - } catch (UnsupportedEncodingException e) { - throw new RuntimeException("encodeUTF8 error", e); - } + byte[] buffer = str.getBytes(StandardCharsets.UTF_8); + System.arraycopy(buffer, 0, bytes, 0, buffer.length); + return buffer.length; } public static String decodeUTF8(byte[] input, int offset, int byteLen) { diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/NetUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/NetUtils.java index 6ede0731..54191eb5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/NetUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/NetUtils.java @@ -184,7 +184,7 @@ public static Iterator getPortRangeFromString(String rangeDefinition) int dashIdx = range.indexOf('-'); if (dashIdx == -1) { // only one port in range: - final int port = Integer.valueOf(range); + final int port = Integer.parseInt(range); if (!isValidHostPort(port)) { throw new IllegalConfigurationException( "Invalid port configuration. Port must be between 0" @@ -195,7 +195,7 @@ public static Iterator getPortRangeFromString(String rangeDefinition) rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator(); } else { // evaluate range - final int start = Integer.valueOf(range.substring(0, dashIdx)); + final int start = Integer.parseInt(range.substring(0, dashIdx)); if (!isValidHostPort(start)) { throw new IllegalConfigurationException( "Invalid port configuration. Port must be between 0" @@ -203,7 +203,7 @@ public static Iterator getPortRangeFromString(String rangeDefinition) + start + "."); } - final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length())); + final int end = Integer.parseInt(range.substring(dashIdx + 1)); if (!isValidHostPort(end)) { throw new IllegalConfigurationException( "Invalid port configuration. Port must be between 0" diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/UnionIterator.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/UnionIterator.java index 41fb03a3..cf2dfe4c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/UnionIterator.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/UnionIterator.java @@ -35,7 +35,7 @@ public class UnionIterator implements Iterator, Iterable { private Iterator currentIterator; - private ArrayList> furtherIterators = new ArrayList<>(); + private final ArrayList> furtherIterators = new ArrayList<>(); private int nextIterator; diff --git a/fluss-common/src/test/java/com/alibaba/fluss/fs/SafetyNetCloseableRegistryTest.java b/fluss-common/src/test/java/com/alibaba/fluss/fs/SafetyNetCloseableRegistryTest.java index 15f5e56e..8aacca4d 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/fs/SafetyNetCloseableRegistryTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/fs/SafetyNetCloseableRegistryTest.java @@ -254,7 +254,7 @@ void testReaperThreadStartFailed() throws Exception { try { new SafetyNetCloseableRegistry(OutOfMemoryReaperThread::new); - } catch (OutOfMemoryError error) { + } catch (OutOfMemoryError ignored) { } assertThat(SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse(); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/LogTestBase.java b/fluss-common/src/test/java/com/alibaba/fluss/record/LogTestBase.java index e8ac0f5c..a2197228 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/LogTestBase.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/LogTestBase.java @@ -80,7 +80,7 @@ protected void assertIndexedLogRecordBatchAndRowEquals( LogRecordReadContext readContext = LogRecordReadContext.createIndexedReadContext(rowType, TestData.DEFAULT_SCHEMA_ID); try (CloseableIterator actualIter = actual.records(readContext); - CloseableIterator expectIter = expected.records(readContext); ) { + CloseableIterator expectIter = expected.records(readContext)) { int i = 0; while (actualIter.hasNext() && expectIter.hasNext()) { DefaultLogRecord actualRecord = (DefaultLogRecord) actualIter.next(); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/row/TestInternalRowGenerator.java b/fluss-common/src/test/java/com/alibaba/fluss/row/TestInternalRowGenerator.java index d135e2d0..e08e0cb9 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/row/TestInternalRowGenerator.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/row/TestInternalRowGenerator.java @@ -126,7 +126,7 @@ public static CompactedRow genCompactedRowForAllType() { for (int i = 0; i < dataTypes.length; i++) { rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(indexedRow)); } - return (CompactedRow) rowEncoder.finishRow(); + return rowEncoder.finishRow(); } private static void setRandomNull( diff --git a/fluss-common/src/test/java/com/alibaba/fluss/row/indexed/IndexedRowTest.java b/fluss-common/src/test/java/com/alibaba/fluss/row/indexed/IndexedRowTest.java index 85b277c7..07603c69 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/row/indexed/IndexedRowTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/row/indexed/IndexedRowTest.java @@ -160,7 +160,7 @@ void testProjectRow() { assertThat(row.getInt(0)).isEqualTo(1000); assertThat(row.getString(2)).isEqualTo(BinaryString.fromString("hello")); - IndexedRow projectRow = (IndexedRow) row.projectRow(new int[] {0, 2}); + IndexedRow projectRow = row.projectRow(new int[] {0, 2}); assertThat(projectRow.getInt(0)).isEqualTo(1000); assertThat(projectRow.getString(1)).isEqualTo(BinaryString.fromString("hello")); diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/LogRecordBatchAssert.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/LogRecordBatchAssert.java index ebe6641a..88adfa7f 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/LogRecordBatchAssert.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/LogRecordBatchAssert.java @@ -82,7 +82,7 @@ public LogRecordBatchAssert isEqualTo(LogRecordBatch expected) { .isEqualTo(expected.getRecordCount()); try (LogRecordReadContext readContext = createReadContext(expected.schemaId()); CloseableIterator actualIter = actual.records(readContext); - CloseableIterator expectIter = expected.records(readContext); ) { + CloseableIterator expectIter = expected.records(readContext)) { while (expectIter.hasNext()) { assertThat(actualIter.hasNext()).isTrue(); assertThatLogRecord(actualIter.next()) diff --git a/fluss-common/src/test/java/com/alibaba/fluss/utils/UnionIteratorTest.java b/fluss-common/src/test/java/com/alibaba/fluss/utils/UnionIteratorTest.java index 2a366925..5ee8958e 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/utils/UnionIteratorTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/utils/UnionIteratorTest.java @@ -50,7 +50,7 @@ void testUnion() { iter.clear(); iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); - iter.addList(Collections.emptyList()); + iter.addList(Collections.emptyList()); iter.addList(Arrays.asList(8, 9, 10, 11)); int val = 1; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java index 6309add5..f50d38a4 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/enumerator/initializer/BucketOffsetsRetrieverImpl.java @@ -52,7 +52,7 @@ public Map latestOffsets( @Override public Map earliestOffsets( @Nullable String partitionName, Collection buckets) { - Map bucketWithOffset = new HashMap<>(); + Map bucketWithOffset = new HashMap<>(buckets.size()); for (Integer bucket : buckets) { bucketWithOffset.put(bucket, EARLIEST_OFFSET); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java index 0f038fd8..7eb26706 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java @@ -290,9 +290,7 @@ public static org.apache.flink.configuration.ConfigOption toFlinkOption( option = builder.stringType().defaultValue(defaultValue); } else if (clazz.equals(MemorySize.class)) { // use string type in Flink option instead to make convert back easier - option = - builder.stringType() - .defaultValue(((MemorySize) flussOption.defaultValue()).toString()); + option = builder.stringType().defaultValue(flussOption.defaultValue().toString()); } else if (clazz.isEnum()) { //noinspection unchecked option = diff --git a/fluss-filesystems/fluss-fs-hadoop/src/test/java/com/alibaba/fluss/fs/hdfs/HadoopDataInputStreamTest.java b/fluss-filesystems/fluss-fs-hadoop/src/test/java/com/alibaba/fluss/fs/hdfs/HadoopDataInputStreamTest.java index 4c5e66a3..6d056278 100644 --- a/fluss-filesystems/fluss-fs-hadoop/src/test/java/com/alibaba/fluss/fs/hdfs/HadoopDataInputStreamTest.java +++ b/fluss-filesystems/fluss-fs-hadoop/src/test/java/com/alibaba/fluss/fs/hdfs/HadoopDataInputStreamTest.java @@ -93,7 +93,7 @@ private static final class SeekableByteArrayInputStream extends InputStream implements Seekable, PositionedReadable { private final byte[] buffer; private int position; - private int count; + private final int count; public SeekableByteArrayInputStream(byte[] buffer) { this.buffer = buffer; diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/sink/committer/PaimonStoreMultiCommitter.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/sink/committer/PaimonStoreMultiCommitter.java index da8bbe50..0965c6d5 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/sink/committer/PaimonStoreMultiCommitter.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/sink/committer/PaimonStoreMultiCommitter.java @@ -309,7 +309,7 @@ private void putSnapshotIdAndLogStartOffset( } } - Long getLogStartOffset( + private Long getLogStartOffset( Identifier identifier, FileStoreTable fileStoreTable, TableBucket tableBucket, @@ -343,7 +343,7 @@ Long getLogStartOffset( return null; } - List getSplits( + private List getSplits( Identifier identifier, long snapshotId, FileStoreTable fileStoreTable, @@ -377,9 +377,7 @@ List getSplits( private PaimonAndFlussCommittable toCommittable( List committables) { Map> paimonManifestCommittable = new HashMap<>(); - Map> flussLogEndOffsetByTableId = new HashMap<>(); - Map tableIdByPaimonIdentifier = new HashMap<>(); Map partitionNameById = new HashMap<>(); diff --git a/fluss-metrics/fluss-metrics-jmx/src/test/java/com/alibaba/fluss/metrics/jmx/JMXServerTest.java b/fluss-metrics/fluss-metrics-jmx/src/test/java/com/alibaba/fluss/metrics/jmx/JMXServerTest.java index a613b12d..e6f1bd93 100644 --- a/fluss-metrics/fluss-metrics-jmx/src/test/java/com/alibaba/fluss/metrics/jmx/JMXServerTest.java +++ b/fluss-metrics/fluss-metrics-jmx/src/test/java/com/alibaba/fluss/metrics/jmx/JMXServerTest.java @@ -85,11 +85,10 @@ public interface TestObjectMBean { /** Test MBean Object. */ public static class TestObject implements TestObjectMBean { - private final int foo = 1; @Override public int getFoo() { - return foo; + return 1; } } } diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/metrics/ClientMetricGroup.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/metrics/ClientMetricGroup.java index 6e942b13..add6319c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/metrics/ClientMetricGroup.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/metrics/ClientMetricGroup.java @@ -24,18 +24,19 @@ /** The metric group for clients. */ public class ClientMetricGroup extends AbstractMetricGroup { - private static final String name = "client"; + + private static final String NAME = "client"; private final String clientId; public ClientMetricGroup(MetricRegistry registry, String clientId) { - super(registry, new String[] {name}, null); + super(registry, new String[] {NAME}, null); this.clientId = clientId; } @Override protected String getGroupName(CharacterFilter filter) { - return name; + return NAME; } @Override diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 11ad3195..eeb1cb8c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -175,7 +175,6 @@ public enum Errors { INVALID_CONFIG_EXCEPTION(39, "The config is invalid.", InvalidConfigException::new), LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION( 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new); - ; private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java index 0606b124..f055ea1c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/RocksIncrementalSnapshot.java @@ -164,12 +164,13 @@ private void takeDBNativeSnapshot(@Nonnull File outputDirectory) throws Exceptio Checkpoint snapshot = Checkpoint.create(db)) { snapshot.createCheckpoint(outputDirectory.toString()); } catch (Exception ex) { + Exception exception = ex; try { FileUtils.deleteDirectory(outputDirectory); } catch (IOException cleanupEx) { - ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex); + exception = ExceptionUtils.firstOrSuppressed(cleanupEx, exception); } - throw ex; + throw exception; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java index 15880f06..dcd57b17 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/wal/ArrowWalBuilder.java @@ -28,11 +28,9 @@ public class ArrowWalBuilder implements WalBuilder { private final MemoryLogRecordsArrowBuilder recordsBuilder; - private final AbstractPagedOutputView outputView; public ArrowWalBuilder(int schemaId, ArrowWriter writer, AbstractPagedOutputView outputView) { this.recordsBuilder = MemoryLogRecordsArrowBuilder.builder(schemaId, writer, outputView); - this.outputView = outputView; } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/CoordinatorMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/CoordinatorMetricGroup.java index e5ce2d85..86c0ee46 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/CoordinatorMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/CoordinatorMetricGroup.java @@ -25,7 +25,7 @@ /** The metric group for coordinator server. */ public class CoordinatorMetricGroup extends AbstractMetricGroup { - private static final String name = "coordinator"; + private static final String NAME = "coordinator"; protected final String clusterId; protected final String hostname; @@ -33,7 +33,7 @@ public class CoordinatorMetricGroup extends AbstractMetricGroup { public CoordinatorMetricGroup( MetricRegistry registry, String clusterId, String hostname, String serverId) { - super(registry, new String[] {clusterId, hostname, name}, null); + super(registry, new String[] {clusterId, hostname, NAME}, null); this.clusterId = clusterId; this.hostname = hostname; this.serverId = serverId; @@ -41,7 +41,7 @@ public CoordinatorMetricGroup( @Override protected String getGroupName(CharacterFilter filter) { - return name; + return NAME; } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java index 0b6032ed..26332047 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/TabletServerMetricGroup.java @@ -31,7 +31,7 @@ /** The metric group for tablet server. */ public class TabletServerMetricGroup extends AbstractMetricGroup { - private static final String name = "tabletserver"; + private static final String NAME = "tabletserver"; private final Map metricGroupByPhysicalTable = new ConcurrentHashMap<>(); @@ -46,7 +46,7 @@ public class TabletServerMetricGroup extends AbstractMetricGroup { public TabletServerMetricGroup( MetricRegistry registry, String clusterId, String hostname, int serverId) { - super(registry, new String[] {clusterId, hostname, name}, null); + super(registry, new String[] {clusterId, hostname, NAME}, null); this.clusterId = clusterId; this.hostname = hostname; this.serverId = serverId; @@ -66,7 +66,7 @@ protected final void putVariables(Map variables) { @Override protected String getGroupName(CharacterFilter filter) { - return name; + return NAME; } public Counter replicationBytesIn() { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestCompletedSnapshotHandleStore.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestCompletedSnapshotHandleStore.java index 87119ad3..e745b669 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestCompletedSnapshotHandleStore.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/snapshot/TestCompletedSnapshotHandleStore.java @@ -19,7 +19,6 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.utils.function.FunctionWithException; import com.alibaba.fluss.utils.function.SupplierWithException; -import com.alibaba.fluss.utils.types.Tuple2; import java.util.Collections; import java.util.List; @@ -28,9 +27,6 @@ /** An implementation of {@link CompletedSnapshotStore} for test purpose. */ public class TestCompletedSnapshotHandleStore implements CompletedSnapshotHandleStore { - private final SupplierWithException>, Exception> - getAllSupplier; - private final SupplierWithException, Exception> getLatestSupplier; @@ -38,11 +34,8 @@ public class TestCompletedSnapshotHandleStore implements CompletedSnapshotHandle public TestCompletedSnapshotHandleStore( FunctionWithException addFunction, - SupplierWithException>, Exception> - getAllSupplier, SupplierWithException, Exception> getLatestSupplier) { this.addFunction = addFunction; - this.getAllSupplier = getAllSupplier; this.getLatestSupplier = getLatestSupplier; } @@ -86,11 +79,8 @@ private Builder() {} private FunctionWithException addFunction = (ignore) -> null; - private SupplierWithException>, Exception> - getAllSupplier = Collections::emptyList; - private SupplierWithException, Exception> - getLatestSupplier = () -> Optional.empty(); + getLatestSupplier = Optional::empty; public Builder setAddFunction( FunctionWithException addFunction) { @@ -106,8 +96,7 @@ public Builder setGetLatestSupplier( } public TestCompletedSnapshotHandleStore build() { - return new TestCompletedSnapshotHandleStore( - addFunction, getAllSupplier, getLatestSupplier); + return new TestCompletedSnapshotHandleStore(addFunction, getLatestSupplier); } } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java index a1e4491d..7f780ca5 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java @@ -90,7 +90,6 @@ public class ReplicaFetcherThreadTest { private ReplicaManager leaderRM; private ServerNode leader; private ReplicaManager followerRM; - private ServerNode follower; private ReplicaFetcherThread followerFetcher; @BeforeAll @@ -110,7 +109,8 @@ public void setup() throws Exception { followerRM = createReplicaManager(followerServerId); // with local test leader end point. leader = new ServerNode(leaderServerId, "localhost", 9099, ServerType.TABLET_SERVER); - follower = new ServerNode(followerServerId, "localhost", 10001, ServerType.TABLET_SERVER); + ServerNode follower = + new ServerNode(followerServerId, "localhost", 10001, ServerType.TABLET_SERVER); followerFetcher = new ReplicaFetcherThread( "test-fetcher-thread",