diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java index 032f63503..9f003fbfc 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java @@ -18,6 +18,7 @@ package cn.sliew.scaleph.api.controller.ws; +import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayCreateCatalogParamsDTO; import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryParamsDTO; import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryResultDTO; import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo; @@ -31,6 +32,7 @@ import org.springframework.web.bind.annotation.*; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -70,8 +72,9 @@ public ResponseEntity closeSession( @Operation(summary = "获取所有Catalog信息", description = "获取所有Catalog信息") public ResponseEntity> getCatalogInfo( @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId) { - return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId)); + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) { + return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId, includeSystemFunctions)); } @@ -121,4 +124,24 @@ public ResponseEntity> completeStatement( } } + @PostMapping("{clusterId}/{sessionHandleId}/addDependencies") + @Operation(summary = "添加依赖jar包", description = "添加依赖jar包") + public ResponseEntity addDependencies( + @PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestParam("jarIdList") List jarIdList + ) { + return ResponseEntity.ok(wsFlinkSqlGatewayService.addDependencies(clusterId, sessionHandleId, jarIdList)); + } + + @PostMapping("{clusterId}/{sessionHandleId}/addCatalog") + @Operation(summary = "添加catalog", description = "添加catalog") + public ResponseEntity addCatalog( + @PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params + ) { + return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, sessionHandleId, params.getCatalogName(), params.getOptions())); + } + } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java new file mode 100644 index 000000000..9bd87e284 --- /dev/null +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayCreateCatalogParamsDTO.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.sql.gateway.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +import java.util.Map; + +@Data +@EqualsAndHashCode +@Schema(name = "SqlGateway创建Catalog的参数", description = "SqlGateway创建Catalog的参数") +@NoArgsConstructor +public class WsFlinkSqlGatewayCreateCatalogParamsDTO { + private String catalogName; + private Map options; +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java index a01900ad1..a589bc070 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java @@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.*; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.FetchOrientation; import org.apache.flink.table.gateway.api.results.GatewayInfo; @@ -37,7 +38,12 @@ import org.apache.flink.table.gateway.service.context.SessionContext; import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.resource.ResourceUri; +import java.io.IOException; +import java.net.URI; +import java.net.URLClassLoader; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -158,7 +164,7 @@ private TableInfo getTableInfo(CatalogManager catalogManager, ObjectIdentifier t return tableInfoBuilder.build(); } - public Set getCatalogInfo(SessionHandle sessionHandle) { + public Set getCatalogInfo(SessionHandle sessionHandle, boolean includeSystemFunctions) { ScalephSqlGatewaySession session = getSession(sessionHandle); SessionContext sessionContext = session.getSessionContext(); SessionContext.SessionState sessionState = sessionContext.getSessionState(); @@ -177,6 +183,9 @@ public Set getCatalogInfo(SessionHandle sessionHandle) { .stream() .map(catalogName -> { CatalogInfo.CatalogInfoBuilder catalogInfoBuilder = CatalogInfo.builder(); + if (includeSystemFunctions) { + catalogInfoBuilder.systemFunctions(systemFunctions); + } catalogInfoBuilder.catalogName(catalogName); catalogManager.getCatalog(catalogName).ifPresent(catalog -> { catalog.getFactory().ifPresent(factory -> { @@ -254,4 +263,44 @@ public List completeStatement(SessionHandle sessionHandle, String statem .map(StringData::toString) .collect(Collectors.toList()); } + + public String executeStatement(SessionHandle sessionHandle, Map configuration, String sql) { + SessionContext sessionContext = getSession(sessionHandle).getSessionContext(); + Configuration sessionConf = new Configuration(sessionContext.getSessionConf()); + configuration.forEach(sessionConf::setString); + return sessionContext.getOperationManager() + .submitOperation(handle -> + sessionContext.createOperationExecutor(sessionConf) + .executeStatement(handle, sql) + ) + .getIdentifier() + .toString(); + } + + public void addDependencies(SessionHandle sessionHandle, List jars) { + List uris = jars.stream().map(uri -> new ResourceUri(ResourceType.JAR, uri.toString())) + .collect(Collectors.toList()); + try { + getSession(sessionHandle) + .getSessionContext() + .getSessionState() + .resourceManager + .registerJarResources(uris); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void addCatalog(SessionHandle sessionHandle, String catalogName, Map options) { + SessionContext sessionContext = getSession(sessionHandle) + .getSessionContext(); + Configuration sessionConf = sessionContext.getSessionConf(); + SessionContext.SessionState sessionState = sessionContext + .getSessionState(); + URLClassLoader userClassLoader = sessionState.resourceManager.getUserClassLoader(); + Catalog catalog = FactoryUtil.createCatalog(catalogName, options, sessionConf, userClassLoader); + sessionState + .catalogManager + .registerCatalog(catalogName, catalog); + } } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java index 51586200f..35c8ef47a 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/WsFlinkSqlGatewayService.java @@ -25,6 +25,7 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -80,11 +81,12 @@ public interface WsFlinkSqlGatewayService { /** * List catalogs * - * @param clusterId Flink K8S session cluster id - * @param sessionHandleId Session handler id + * @param clusterId Flink K8S session cluster id + * @param sessionHandleId Session handler id + * @param includeSystemFunctions Whether show system function of catalog * @return Set of catalog informations */ - Set getCatalogInfo(String clusterId, String sessionHandleId); + Set getCatalogInfo(String clusterId, String sessionHandleId, boolean includeSystemFunctions); /** * Close a session @@ -140,4 +142,25 @@ ResultSet fetchResults(String clusterId, String sessionHandleId, * @throws Exception */ List completeStatement(String clusterId, String sessionId, String statement, int position) throws Exception; + + /** + * Add dependency jars to the sql-gateway + * + * @param clusterId Flink K8S session cluster id + * @param sessionId Session hande id + * @param jarIdList List of jar ids + * @return true if success + */ + Boolean addDependencies(String clusterId, String sessionId, List jarIdList); + + /** + * Add a catalog + * + * @param clusterId Flink K8S session cluster id + * @param sessionId Session hande id + * @param catalogName Catalog name + * @param options Catalog options + * @return true if success + */ + Boolean addCatalog(String clusterId, String sessionId, String catalogName, Map options); } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java index 2575598af..502923574 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/services/impl/WsFlinkSqlGatewayServiceImpl.java @@ -18,13 +18,16 @@ package cn.sliew.scaleph.engine.sql.gateway.services.impl; +import cn.sliew.scaleph.common.util.SystemUtil; import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryParamsDTO; import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo; import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephSqlGatewaySessionManager; import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; import cn.sliew.scaleph.kubernetes.service.KubernetesService; import cn.sliew.scaleph.resource.service.ClusterCredentialService; +import cn.sliew.scaleph.resource.service.JarService; import cn.sliew.scaleph.resource.service.dto.ClusterCredentialDTO; +import cn.sliew.scaleph.resource.service.dto.JarDTO; import lombok.extern.slf4j.Slf4j; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -38,17 +41,19 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; import org.apache.flink.table.gateway.service.context.DefaultContext; -import org.apache.flink.table.gateway.service.context.SessionContext; -import org.apache.flink.table.gateway.service.session.SessionManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; -import java.lang.reflect.Field; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import java.util.stream.Collectors; @Slf4j @Service @@ -62,12 +67,15 @@ public class WsFlinkSqlGatewayServiceImpl implements WsFlinkSqlGatewayService { new ConcurrentHashMap<>(); private KubernetesService kubernetesService; private ClusterCredentialService clusterCredentialService; + private JarService jarService; @Autowired public WsFlinkSqlGatewayServiceImpl(KubernetesService kubernetesService, - ClusterCredentialService clusterCredentialService) { + ClusterCredentialService clusterCredentialService, + JarService jarService) { this.kubernetesService = kubernetesService; this.clusterCredentialService = clusterCredentialService; + this.jarService = jarService; } /** @@ -169,10 +177,10 @@ public String openSession(String clusterId) { * @return */ @Override - public Set getCatalogInfo(String clusterId, String sessionHandleId) { + public Set getCatalogInfo(String clusterId, String sessionHandleId, boolean includeSystemFunctions) { SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId)); return getSessionManager(clusterId).orElseThrow() - .getCatalogInfo(sessionHandle); + .getCatalogInfo(sessionHandle, includeSystemFunctions); } /** @@ -201,20 +209,9 @@ public String closeSession(String clusterId, String sessionHandleId) { @Override public String executeSql(String clusterId, String sessionHandleId, WsFlinkSqlGatewayQueryParamsDTO params) { SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId)); - Configuration configuration = GlobalConfiguration.loadConfiguration(); - params.getConfiguration().forEach(configuration::setString); - SessionContext sessionContext = getSessionManager(clusterId).orElseThrow() - .getSession(sessionHandle) - .getSessionContext(); - Configuration sessionConf = sessionContext.getSessionConf(); - params.getConfiguration().forEach(sessionConf::setString); - return sessionContext.getOperationManager() - .submitOperation(handle -> - sessionContext.createOperationExecutor(sessionConf) - .executeStatement(handle, params.getSql()) - ) - .getIdentifier() - .toString(); + return getSessionManager(clusterId) + .orElseThrow() + .executeStatement(sessionHandle, params.getConfiguration(), params.getSql()); } /** @@ -259,4 +256,51 @@ public List completeStatement(String clusterId, String sessionId, String return getSessionManager(clusterId).orElseThrow() .completeStatement(new SessionHandle(UUID.fromString(sessionId)), statement, position); } + + @Override + public Boolean addDependencies(String clusterId, String sessionId, List jarIdList) { + try { + List jars = jarIdList.stream().map(jarId -> { + JarDTO jarDTO = jarService.getRaw(jarId); + try { + Path localPath = SystemUtil.getLocalStorageDir().resolve("jars"); + if (Files.notExists(localPath)) { + Files.createDirectories(localPath); + } + Path fileName = localPath.resolve(jarDTO.getFileName()); + if (Files.notExists(fileName)) { + try (OutputStream os = Files.newOutputStream(fileName)) { + jarService.download(jarId, os); + } + } else { + log.info("Jar file " + fileName + " already exists!"); + } + return fileName; + } catch (IOException e) { + throw new RuntimeException(e); + } + }).map(Path::toUri) + .collect(Collectors.toList()); + getSessionManager(clusterId) + .orElseThrow() + .addDependencies(new SessionHandle(UUID.fromString(sessionId)), jars); + return true; + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + return false; + } + } + + @Override + public Boolean addCatalog(String clusterId, String sessionId, String catalogName, Map options) { + try { + getSessionManager(clusterId).orElseThrow() + .addCatalog(new SessionHandle(UUID.fromString(sessionId)), + catalogName, options); + return true; + } catch (Exception e) { + log.error(e.getLocalizedMessage(), e); + return false; + } + } }