Skip to content

Commit

Permalink
[Bugfix] Fix some bugs (#591)
Browse files Browse the repository at this point in the history
1. Sql gateway query error.
2. FileFetcherHandler may cause NPE.
3. Reduce Docker image size.
  • Loading branch information
LiuBodong authored Aug 8, 2023
1 parent ce97971 commit 6faad5a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, List<FileFetcherP
}

private void doAddAdditionalJars(List<Long> additionalDependencies, List<FileFetcherParam> result) {
for (Long jarId : additionalDependencies) {
JarDTO jarDTO = jarService.getRaw(jarId);
result.add(new FileFetcherParam(jarDTO.getPath(), ResourceNames.LIB_DIRECTORY + jarDTO.getFileName()));
if (!CollectionUtils.isEmpty(additionalDependencies) && result != null) {
for (Long jarId : additionalDependencies) {
JarDTO jarDTO = jarService.getRaw(jarId);
result.add(new FileFetcherParam(jarDTO.getPath(), ResourceNames.LIB_DIRECTORY + jarDTO.getFileName()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
Expand Down Expand Up @@ -80,38 +81,39 @@ public static WsFlinkSqlGatewayQueryResultDTO fromResultSet(ResultSet resultSet)
* @throws Exception Reflect exceptions
*/
private static Object getDataFromRow(Object rowData, LogicalType logicalType, int index) throws Exception {
if (!(rowData instanceof RowData) || ! (rowData instanceof ArrayData)) {
throw new IllegalArgumentException(rowData.getClass() + " is not supported row data type!");
}
Class<?> dataClass = rowData.getClass();
if (!RowData.class.isAssignableFrom(dataClass) && !ArrayData.class.isAssignableFrom(dataClass)) {
throw new IllegalArgumentException(dataClass + " is not supported row data type!");
}
switch (logicalType.getTypeRoot()) {
case VARCHAR:
case CHAR:
return dataClass.getDeclaredMethod("getString", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getString", int.class).invoke(rowData, index).toString();
case TINYINT:
case SMALLINT:
return dataClass.getDeclaredMethod("getShort", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getShort", int.class).invoke(rowData, index);
case INTEGER:
return dataClass.getDeclaredMethod("getInt", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getInt", int.class).invoke(rowData, index);
case FLOAT:
return dataClass.getDeclaredMethod("getFloat", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getFloat", int.class).invoke(rowData, index);
case DOUBLE:
return dataClass.getDeclaredMethod("getDouble", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getDouble", int.class).invoke(rowData, index);
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
return dataClass.getDeclaredMethod("getDecimal", Integer.class, Integer.class, Integer.class)
return dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class)
.invoke(rowData, index, decimalType.getPrecision(), decimalType.getScale());
case BIGINT:
return dataClass.getDeclaredMethod("getLong", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getLong", int.class).invoke(rowData, index);
case BOOLEAN:
return dataClass.getDeclaredMethod("getBoolean", Integer.class).invoke(rowData, index);
return dataClass.getDeclaredMethod("getBoolean", int.class).invoke(rowData, index);
case NULL:
return null;
case BINARY:
return dataClass.getDeclaredMethod("getBinary", Integer.class).invoke(rowData, index);
byte[] binary = (byte[]) dataClass.getDeclaredMethod("getBinary", int.class).invoke(rowData, index);
return Hex.encodeHexString(binary);
case ROW:
RowType rowType = (RowType) logicalType;
RowData row = (RowData) dataClass.getDeclaredMethod("getRow", Integer.class, Integer.class).invoke(rowData, index, rowType.getFieldCount());
RowData row = (RowData) dataClass.getDeclaredMethod("getRow", int.class, int.class).invoke(rowData, index, rowType.getFieldCount());
Map<String, Object> mapInRow = new HashMap<>();
for (RowType.RowField rowField : rowType.getFields()) {
String fieldName = rowField.getName();
Expand All @@ -123,7 +125,7 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
MapType mapType = (MapType) logicalType;
LogicalType keyValueType = mapType.getKeyType();
LogicalType valueValueType = mapType.getValueType();
MapData mapData = (MapData) dataClass.getDeclaredMethod("getMap", Integer.class).invoke(rowData, index);
MapData mapData = (MapData) dataClass.getDeclaredMethod("getMap", int.class).invoke(rowData, index);
ArrayData keyArray = mapData.keyArray();
ArrayData valueArray = mapData.valueArray();
Map<Object, Object> mapInMap = new HashMap<>();
Expand All @@ -135,7 +137,7 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
return mapInMap;
case ARRAY:
ArrayType arrayType = (ArrayType) logicalType;
ArrayData arrayData = (ArrayData) dataClass.getDeclaredMethod("getArray", Integer.class).invoke(rowData, index);
ArrayData arrayData = (ArrayData) dataClass.getDeclaredMethod("getArray", int.class).invoke(rowData, index);
LogicalType elementType = arrayType.getElementType();
List<Object> list = new ArrayList<>();
for (int i = 0; i < arrayData.size(); i++) {
Expand All @@ -147,7 +149,7 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return dataClass.getDeclaredMethod("getTimestamp", Integer.class, Integer.class).invoke(rowData, index, ((TimeType) logicalType).getPrecision());
return dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimeType) logicalType).getPrecision());
default:
throw new IllegalArgumentException("DataType: " + logicalType + " not supported now!");
}
Expand Down
4 changes: 1 addition & 3 deletions tools/docker/build/scaleph-file-fetcher/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ ENV SCALEPH_HOME /opt/scaleph-file-fetcher
RUN mkdir -p $SCALEPH_HOME
WORKDIR $SCALEPH_HOME

COPY ./scaleph-file-fetcher/target/scaleph-file-fetcher*.tar.gz $SCALEPH_HOME/scaleph-file-fetcher.tar.gz
RUN tar -zxf $SCALEPH_HOME/scaleph-file-fetcher.tar.gz --strip-components 1 -C $SCALEPH_HOME
RUN rm $SCALEPH_HOME/scaleph-file-fetcher.tar.gz
ADD ./scaleph-file-fetcher/target/scaleph-file-fetcher*.tar.gz /opt/

ENV CLASSPATH $CLASSPATH:$SCALEPH_HOME/libs/*:$SCALEPH_HOME/conf/*

Expand Down

0 comments on commit 6faad5a

Please sign in to comment.