Skip to content

Commit

Permalink
[Bugfix] Fix sql-gateway query result error (#592)
Browse files Browse the repository at this point in the history
  • Loading branch information
LiuBodong authored Aug 8, 2023
1 parent 6faad5a commit 6ac4db0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public ResponseEntity<WsFlinkSqlGatewayQueryResultDTO> fetchResults(@PathVariabl
WsFlinkSqlGatewayQueryResultDTO wsFlinkSqlGatewayQueryResultDTO = WsFlinkSqlGatewayQueryResultDTO.fromResultSet(resultSet);
return ResponseEntity.ok(wsFlinkSqlGatewayQueryResultDTO);
} catch (Exception e) {
return ResponseEntity.of(Optional.empty());
throw new IllegalArgumentException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,25 @@
package cn.sliew.scaleph.engine.sql.gateway.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RawValueData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -93,6 +104,9 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
case SMALLINT:
return dataClass.getDeclaredMethod("getShort", int.class).invoke(rowData, index);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
return dataClass.getDeclaredMethod("getInt", int.class).invoke(rowData, index);
case FLOAT:
return dataClass.getDeclaredMethod("getFloat", int.class).invoke(rowData, index);
Expand All @@ -103,15 +117,18 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
return dataClass.getDeclaredMethod("getDecimal", int.class, int.class, int.class)
.invoke(rowData, index, decimalType.getPrecision(), decimalType.getScale());
case BIGINT:
case INTERVAL_DAY_TIME:
return dataClass.getDeclaredMethod("getLong", int.class).invoke(rowData, index);
case BOOLEAN:
return dataClass.getDeclaredMethod("getBoolean", int.class).invoke(rowData, index);
case NULL:
return null;
case BINARY:
case VARBINARY:
byte[] binary = (byte[]) dataClass.getDeclaredMethod("getBinary", int.class).invoke(rowData, index);
return Hex.encodeHexString(binary);
case ROW:
case STRUCTURED_TYPE:
RowType rowType = (RowType) logicalType;
RowData row = (RowData) dataClass.getDeclaredMethod("getRow", int.class, int.class).invoke(rowData, index, rowType.getFieldCount());
Map<String, Object> mapInRow = new HashMap<>();
Expand All @@ -122,6 +139,7 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
}
return mapInRow;
case MAP:
case MULTISET:
MapType mapType = (MapType) logicalType;
LogicalType keyValueType = mapType.getKeyType();
LogicalType valueValueType = mapType.getValueType();
Expand All @@ -144,12 +162,17 @@ private static Object getDataFromRow(Object rowData, LogicalType logicalType, in
list.add(getDataFromRow(arrayData, elementType, i));
}
return list;
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimeType) logicalType).getPrecision());
return dataClass.getDeclaredMethod("getTimestamp", int.class, int.class).invoke(rowData, index, ((TimestampType) logicalType).getPrecision());
case DISTINCT_TYPE:
DistinctType distinctType = (DistinctType) logicalType;
LogicalType sourceType = distinctType.getSourceType();
return getDataFromRow(rowData, sourceType, index);
case RAW:
case TIMESTAMP_WITH_TIME_ZONE:
case SYMBOL:
case UNRESOLVED:
default:
throw new IllegalArgumentException("DataType: " + logicalType + " not supported now!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private TableInfo getTableInfo(CatalogManager catalogManager, ObjectIdentifier t
List<ColumnInfo> columns = schema.getColumns().stream().map(column -> {
ColumnInfo.ColumnInfoBuilder columnInfoBuilder = ColumnInfo.builder()
.columnName(column.getName())
.dataType(column.getDataType().toString())
.dataType(column.getDataType().getLogicalType().toString())
.isPersist(column.isPersisted())
.isPhysical(column.isPhysical());
column.getComment().ifPresent(columnInfoBuilder::comment);
Expand Down

0 comments on commit 6ac4db0

Please sign in to comment.