Skip to content

Commit

Permalink
Merge branch 'main' into update-checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
psxjoy committed Dec 20, 2024
2 parents 0915670 + 665e49e commit 2e96d7c
Show file tree
Hide file tree
Showing 33 changed files with 85 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public PaimonBucketAssigner(RowType rowType, List<String> bucketKey, int bucketN
this.flussRowWrapper = new FlussRowWrapper();
}

private static int[] getBucketKeyIndex(RowType rowType, List<String> bucketKey) {
private int[] getBucketKeyIndex(RowType rowType, List<String> bucketKey) {
int[] bucketKeyIndex = new int[bucketKey.size()];
for (int i = 0; i < bucketKey.size(); i++) {
bucketKeyIndex[i] = rowType.getFieldIndex(bucketKey.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
/** The metric group for scanner, including {@link LogScanner} and {@link SnapshotScanner}. */
@Internal
public class ScannerMetricGroup extends AbstractMetricGroup {
private static final String name = "scanner";

private static final String NAME = "scanner";
private static final int WINDOW_SIZE = 1024;

private final TablePath tablePath;
Expand All @@ -57,7 +58,7 @@ public class ScannerMetricGroup extends AbstractMetricGroup {
private volatile long pollStartMs;

public ScannerMetricGroup(ClientMetricGroup parent, TablePath tablePath) {
super(parent.getMetricRegistry(), makeScope(parent, name), parent);
super(parent.getMetricRegistry(), makeScope(parent, NAME), parent);
this.tablePath = tablePath;

fetchRequestCount = new ThreadSafeSimpleCounter();
Expand Down Expand Up @@ -122,7 +123,7 @@ private long lastPollSecondsAgo() {

@Override
protected String getGroupName(CharacterFilter filter) {
return name;
return NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class LogFetcher implements Closeable {
private final LogRecordReadContext remoteReadContext;
@Nullable private final Projection projection;
private final RpcClient rpcClient;
private final Configuration conf;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final boolean isCheckCrcs;
Expand Down Expand Up @@ -121,7 +120,6 @@ public LogFetcher(
this.projection = projection;
this.rpcClient = rpcClient;
this.logScannerStatus = logScannerStatus;
this.conf = conf;
this.maxFetchBytes = (int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES).getBytes();
this.maxBucketFetchBytes =
(int) conf.get(ConfigOptions.LOG_FETCH_MAX_BYTES_FOR_BUCKET).getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public FsPath(String pathString) {
}

// uri path is the rest of the string -- query & fragment not supported
final String path = pathString.substring(start, pathString.length());
final String path = pathString.substring(start);

initialize(scheme, authority, path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,9 @@ public boolean delete(final FsPath f, final boolean recursive) throws IOExceptio
File[] containedFiles = file.listFiles();
if (containedFiles == null) {
throw new IOException(
"Directory "
+ file.toString()
+ " does not exist or an I/O error occurred");
"Directory " + file + " does not exist or an I/O error occurred");
} else if (containedFiles.length != 0) {
throw new IOException("Directory " + file.toString() + " is not empty");
throw new IOException("Directory " + file + " is not empty");
}
}

Expand Down
15 changes: 5 additions & 10 deletions fluss-common/src/main/java/com/alibaba/fluss/row/BinaryString.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

Expand Down Expand Up @@ -525,7 +524,7 @@ public BinaryString toUpperCase() {
// fallback
return javaToUpperCase();
}
int upper = Character.toUpperCase((int) b);
int upper = Character.toUpperCase(b);
if (upper > 127) {
// fallback
return javaToUpperCase();
Expand Down Expand Up @@ -559,7 +558,7 @@ public BinaryString toLowerCase() {
// fallback
return javaToLowerCase();
}
int lower = Character.toLowerCase((int) b);
int lower = Character.toLowerCase(b);
if (lower > 127) {
// fallback
return javaToLowerCase();
Expand Down Expand Up @@ -808,13 +807,9 @@ public static int encodeUTF8(String str, byte[] bytes) {
}

public static int defaultEncodeUTF8(String str, byte[] bytes) {
try {
byte[] buffer = str.getBytes("UTF-8");
System.arraycopy(buffer, 0, bytes, 0, buffer.length);
return buffer.length;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("encodeUTF8 error", e);
}
byte[] buffer = str.getBytes(StandardCharsets.UTF_8);
System.arraycopy(buffer, 0, bytes, 0, buffer.length);
return buffer.length;
}

public static String decodeUTF8(byte[] input, int offset, int byteLen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition)
int dashIdx = range.indexOf('-');
if (dashIdx == -1) {
// only one port in range:
final int port = Integer.valueOf(range);
final int port = Integer.parseInt(range);
if (!isValidHostPort(port)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
Expand All @@ -195,15 +195,15 @@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition)
rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();
} else {
// evaluate range
final int start = Integer.valueOf(range.substring(0, dashIdx));
final int start = Integer.parseInt(range.substring(0, dashIdx));
if (!isValidHostPort(start)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
+ "and 65535, but was "
+ start
+ ".");
}
final int end = Integer.valueOf(range.substring(dashIdx + 1, range.length()));
final int end = Integer.parseInt(range.substring(dashIdx + 1));
if (!isValidHostPort(end)) {
throw new IllegalConfigurationException(
"Invalid port configuration. Port must be between 0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class UnionIterator<T> implements Iterator<T>, Iterable<T> {

private Iterator<T> currentIterator;

private ArrayList<Iterator<T>> furtherIterators = new ArrayList<>();
private final ArrayList<Iterator<T>> furtherIterators = new ArrayList<>();

private int nextIterator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void testReaperThreadStartFailed() throws Exception {

try {
new SafetyNetCloseableRegistry(OutOfMemoryReaperThread::new);
} catch (OutOfMemoryError error) {
} catch (OutOfMemoryError ignored) {
}
assertThat(SafetyNetCloseableRegistry.isReaperThreadRunning()).isFalse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void assertIndexedLogRecordBatchAndRowEquals(
LogRecordReadContext readContext =
LogRecordReadContext.createIndexedReadContext(rowType, TestData.DEFAULT_SCHEMA_ID);
try (CloseableIterator<LogRecord> actualIter = actual.records(readContext);
CloseableIterator<LogRecord> expectIter = expected.records(readContext); ) {
CloseableIterator<LogRecord> expectIter = expected.records(readContext)) {
int i = 0;
while (actualIter.hasNext() && expectIter.hasNext()) {
DefaultLogRecord actualRecord = (DefaultLogRecord) actualIter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static CompactedRow genCompactedRowForAllType() {
for (int i = 0; i < dataTypes.length; i++) {
rowEncoder.encodeField(i, fieldGetters[i].getFieldOrNull(indexedRow));
}
return (CompactedRow) rowEncoder.finishRow();
return rowEncoder.finishRow();
}

private static void setRandomNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void testProjectRow() {
assertThat(row.getInt(0)).isEqualTo(1000);
assertThat(row.getString(2)).isEqualTo(BinaryString.fromString("hello"));

IndexedRow projectRow = (IndexedRow) row.projectRow(new int[] {0, 2});
IndexedRow projectRow = row.projectRow(new int[] {0, 2});
assertThat(projectRow.getInt(0)).isEqualTo(1000);
assertThat(projectRow.getString(1)).isEqualTo(BinaryString.fromString("hello"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public LogRecordBatchAssert isEqualTo(LogRecordBatch expected) {
.isEqualTo(expected.getRecordCount());
try (LogRecordReadContext readContext = createReadContext(expected.schemaId());
CloseableIterator<LogRecord> actualIter = actual.records(readContext);
CloseableIterator<LogRecord> expectIter = expected.records(readContext); ) {
CloseableIterator<LogRecord> expectIter = expected.records(readContext)) {
while (expectIter.hasNext()) {
assertThat(actualIter.hasNext()).isTrue();
assertThatLogRecord(actualIter.next())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void testUnion() {

iter.clear();
iter.addList(Arrays.asList(1, 2, 3, 4, 5, 6, 7));
iter.addList(Collections.<Integer>emptyList());
iter.addList(Collections.emptyList());
iter.addList(Arrays.asList(8, 9, 10, 11));

int val = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Map<Integer, Long> latestOffsets(
@Override
public Map<Integer, Long> earliestOffsets(
@Nullable String partitionName, Collection<Integer> buckets) {
Map<Integer, Long> bucketWithOffset = new HashMap<>();
Map<Integer, Long> bucketWithOffset = new HashMap<>(buckets.size());
for (Integer bucket : buckets) {
bucketWithOffset.put(bucket, EARLIEST_OFFSET);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,7 @@ public static org.apache.flink.configuration.ConfigOption<?> toFlinkOption(
option = builder.stringType().defaultValue(defaultValue);
} else if (clazz.equals(MemorySize.class)) {
// use string type in Flink option instead to make convert back easier
option =
builder.stringType()
.defaultValue(((MemorySize) flussOption.defaultValue()).toString());
option = builder.stringType().defaultValue(flussOption.defaultValue().toString());
} else if (clazz.isEnum()) {
//noinspection unchecked
option =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private FlussSerializationConverter createInternalConverter(LogicalType flinkDat
case VARCHAR:
return (flinkField) -> {
StringData stringData = (StringData) flinkField;
return BinaryString.fromString(stringData.toString());
return BinaryString.fromBytes(stringData.toBytes());
};
case DECIMAL:
return (flinkField) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private static final class SeekableByteArrayInputStream extends InputStream
implements Seekable, PositionedReadable {
private final byte[] buffer;
private int position;
private int count;
private final int count;

public SeekableByteArrayInputStream(byte[] buffer) {
this.buffer = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void putSnapshotIdAndLogStartOffset(
}
}

Long getLogStartOffset(
private Long getLogStartOffset(
Identifier identifier,
FileStoreTable fileStoreTable,
TableBucket tableBucket,
Expand Down Expand Up @@ -343,7 +343,7 @@ Long getLogStartOffset(
return null;
}

List<Split> getSplits(
private List<Split> getSplits(
Identifier identifier,
long snapshotId,
FileStoreTable fileStoreTable,
Expand Down Expand Up @@ -377,9 +377,7 @@ List<Split> getSplits(
private PaimonAndFlussCommittable toCommittable(
List<PaimonWrapperManifestCommittable> committables) {
Map<Identifier, List<ManifestCommittable>> paimonManifestCommittable = new HashMap<>();

Map<Long, Map<TableBucket, Long>> flussLogEndOffsetByTableId = new HashMap<>();

Map<Identifier, Long> tableIdByPaimonIdentifier = new HashMap<>();
Map<Long, String> partitionNameById = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ public interface TestObjectMBean {

/** Test MBean Object. */
public static class TestObject implements TestObjectMBean {
private final int foo = 1;

@Override
public int getFoo() {
return foo;
return 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@

/** The metric group for clients. */
public class ClientMetricGroup extends AbstractMetricGroup {
private static final String name = "client";

private static final String NAME = "client";

private final String clientId;

public ClientMetricGroup(MetricRegistry registry, String clientId) {
super(registry, new String[] {name}, null);
super(registry, new String[] {NAME}, null);
this.clientId = clientId;
}

@Override
protected String getGroupName(CharacterFilter filter) {
return name;
return NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public enum Errors {
INVALID_CONFIG_EXCEPTION(39, "The config is invalid.", InvalidConfigException::new),
LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION(
40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new);
;

private static final Logger LOG = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ private void takeDBNativeSnapshot(@Nonnull File outputDirectory) throws Exceptio
Checkpoint snapshot = Checkpoint.create(db)) {
snapshot.createCheckpoint(outputDirectory.toString());
} catch (Exception ex) {
Exception exception = ex;
try {
FileUtils.deleteDirectory(outputDirectory);
} catch (IOException cleanupEx) {
ex = ExceptionUtils.firstOrSuppressed(cleanupEx, ex);
exception = ExceptionUtils.firstOrSuppressed(cleanupEx, exception);
}
throw ex;
throw exception;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
public class ArrowWalBuilder implements WalBuilder {

private final MemoryLogRecordsArrowBuilder recordsBuilder;
private final AbstractPagedOutputView outputView;

public ArrowWalBuilder(int schemaId, ArrowWriter writer, AbstractPagedOutputView outputView) {
this.recordsBuilder = MemoryLogRecordsArrowBuilder.builder(schemaId, writer, outputView);
this.outputView = outputView;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@
/** The metric group for coordinator server. */
public class CoordinatorMetricGroup extends AbstractMetricGroup {

private static final String name = "coordinator";
private static final String NAME = "coordinator";

protected final String clusterId;
protected final String hostname;
protected final String serverId;

public CoordinatorMetricGroup(
MetricRegistry registry, String clusterId, String hostname, String serverId) {
super(registry, new String[] {clusterId, hostname, name}, null);
super(registry, new String[] {clusterId, hostname, NAME}, null);
this.clusterId = clusterId;
this.hostname = hostname;
this.serverId = serverId;
}

@Override
protected String getGroupName(CharacterFilter filter) {
return name;
return NAME;
}

@Override
Expand Down
Loading

0 comments on commit 2e96d7c

Please sign in to comment.