Skip to content

Commit

Permalink
[Feature] Allow adding dependencies and catalog in sql-gateway (#581)
Browse files Browse the repository at this point in the history
  • Loading branch information
LiuBodong authored Jul 27, 2023
1 parent a451ef0 commit acf1a83
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.api.controller.ws;

import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayCreateCatalogParamsDTO;
import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryParamsDTO;
import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryResultDTO;
import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo;
Expand All @@ -31,6 +32,7 @@
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -70,8 +72,9 @@ public ResponseEntity<String> closeSession(
@Operation(summary = "获取所有Catalog信息", description = "获取所有Catalog信息")
public ResponseEntity<Set<CatalogInfo>> getCatalogInfo(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId));
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId, includeSystemFunctions));
}


Expand Down Expand Up @@ -121,4 +124,24 @@ public ResponseEntity<List<String>> completeStatement(
}
}

@PostMapping("{clusterId}/{sessionHandleId}/addDependencies")
@Operation(summary = "添加依赖jar包", description = "添加依赖jar包")
public ResponseEntity<Boolean> addDependencies(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam("jarIdList") List<Long> jarIdList
) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.addDependencies(clusterId, sessionHandleId, jarIdList));
}

@PostMapping("{clusterId}/{sessionHandleId}/addCatalog")
@Operation(summary = "添加catalog", description = "添加catalog")
public ResponseEntity<Boolean> addCatalog(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params
) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, sessionHandleId, params.getCatalogName(), params.getOptions()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.sql.gateway.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

import java.util.Map;

@Data
@EqualsAndHashCode
@Schema(name = "SqlGateway创建Catalog的参数", description = "SqlGateway创建Catalog的参数")
@NoArgsConstructor
public class WsFlinkSqlGatewayCreateCatalogParamsDTO {
private String catalogName;
private Map<String, String> options;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
Expand All @@ -37,7 +38,12 @@
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;

import java.io.IOException;
import java.net.URI;
import java.net.URLClassLoader;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -158,7 +164,7 @@ private TableInfo getTableInfo(CatalogManager catalogManager, ObjectIdentifier t
return tableInfoBuilder.build();
}

public Set<CatalogInfo> getCatalogInfo(SessionHandle sessionHandle) {
public Set<CatalogInfo> getCatalogInfo(SessionHandle sessionHandle, boolean includeSystemFunctions) {
ScalephSqlGatewaySession session = getSession(sessionHandle);
SessionContext sessionContext = session.getSessionContext();
SessionContext.SessionState sessionState = sessionContext.getSessionState();
Expand All @@ -177,6 +183,9 @@ public Set<CatalogInfo> getCatalogInfo(SessionHandle sessionHandle) {
.stream()
.map(catalogName -> {
CatalogInfo.CatalogInfoBuilder catalogInfoBuilder = CatalogInfo.builder();
if (includeSystemFunctions) {
catalogInfoBuilder.systemFunctions(systemFunctions);
}
catalogInfoBuilder.catalogName(catalogName);
catalogManager.getCatalog(catalogName).ifPresent(catalog -> {
catalog.getFactory().ifPresent(factory -> {
Expand Down Expand Up @@ -254,4 +263,44 @@ public List<String> completeStatement(SessionHandle sessionHandle, String statem
.map(StringData::toString)
.collect(Collectors.toList());
}

public String executeStatement(SessionHandle sessionHandle, Map<String, String> configuration, String sql) {
SessionContext sessionContext = getSession(sessionHandle).getSessionContext();
Configuration sessionConf = new Configuration(sessionContext.getSessionConf());
configuration.forEach(sessionConf::setString);
return sessionContext.getOperationManager()
.submitOperation(handle ->
sessionContext.createOperationExecutor(sessionConf)
.executeStatement(handle, sql)
)
.getIdentifier()
.toString();
}

public void addDependencies(SessionHandle sessionHandle, List<URI> jars) {
List<ResourceUri> uris = jars.stream().map(uri -> new ResourceUri(ResourceType.JAR, uri.toString()))
.collect(Collectors.toList());
try {
getSession(sessionHandle)
.getSessionContext()
.getSessionState()
.resourceManager
.registerJarResources(uris);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public void addCatalog(SessionHandle sessionHandle, String catalogName, Map<String, String> options) {
SessionContext sessionContext = getSession(sessionHandle)
.getSessionContext();
Configuration sessionConf = sessionContext.getSessionConf();
SessionContext.SessionState sessionState = sessionContext
.getSessionState();
URLClassLoader userClassLoader = sessionState.resourceManager.getUserClassLoader();
Catalog catalog = FactoryUtil.createCatalog(catalogName, options, sessionConf, userClassLoader);
sessionState
.catalogManager
.registerCatalog(catalogName, catalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.gateway.api.results.ResultSet;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -80,11 +81,12 @@ public interface WsFlinkSqlGatewayService {
/**
* List catalogs
*
* @param clusterId Flink K8S session cluster id
* @param sessionHandleId Session handler id
* @param clusterId Flink K8S session cluster id
* @param sessionHandleId Session handler id
* @param includeSystemFunctions Whether show system function of catalog
* @return Set of catalog informations
*/
Set<CatalogInfo> getCatalogInfo(String clusterId, String sessionHandleId);
Set<CatalogInfo> getCatalogInfo(String clusterId, String sessionHandleId, boolean includeSystemFunctions);

/**
* Close a session
Expand Down Expand Up @@ -140,4 +142,25 @@ ResultSet fetchResults(String clusterId, String sessionHandleId,
* @throws Exception
*/
List<String> completeStatement(String clusterId, String sessionId, String statement, int position) throws Exception;

/**
* Add dependency jars to the sql-gateway
*
* @param clusterId Flink K8S session cluster id
* @param sessionId Session hande id
* @param jarIdList List of jar ids
* @return true if success
*/
Boolean addDependencies(String clusterId, String sessionId, List<Long> jarIdList);

/**
* Add a catalog
*
* @param clusterId Flink K8S session cluster id
* @param sessionId Session hande id
* @param catalogName Catalog name
* @param options Catalog options
* @return true if success
*/
Boolean addCatalog(String clusterId, String sessionId, String catalogName, Map<String, String> options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

package cn.sliew.scaleph.engine.sql.gateway.services.impl;

import cn.sliew.scaleph.common.util.SystemUtil;
import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryParamsDTO;
import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo;
import cn.sliew.scaleph.engine.sql.gateway.internal.ScalephSqlGatewaySessionManager;
import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService;
import cn.sliew.scaleph.kubernetes.service.KubernetesService;
import cn.sliew.scaleph.resource.service.ClusterCredentialService;
import cn.sliew.scaleph.resource.service.JarService;
import cn.sliew.scaleph.resource.service.dto.ClusterCredentialDTO;
import cn.sliew.scaleph.resource.service.dto.JarDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
Expand All @@ -38,17 +41,19 @@
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.gateway.service.context.SessionContext;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.lang.reflect.Field;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Slf4j
@Service
Expand All @@ -62,12 +67,15 @@ public class WsFlinkSqlGatewayServiceImpl implements WsFlinkSqlGatewayService {
new ConcurrentHashMap<>();
private KubernetesService kubernetesService;
private ClusterCredentialService clusterCredentialService;
private JarService jarService;

@Autowired
public WsFlinkSqlGatewayServiceImpl(KubernetesService kubernetesService,
ClusterCredentialService clusterCredentialService) {
ClusterCredentialService clusterCredentialService,
JarService jarService) {
this.kubernetesService = kubernetesService;
this.clusterCredentialService = clusterCredentialService;
this.jarService = jarService;
}

/**
Expand Down Expand Up @@ -169,10 +177,10 @@ public String openSession(String clusterId) {
* @return
*/
@Override
public Set<CatalogInfo> getCatalogInfo(String clusterId, String sessionHandleId) {
public Set<CatalogInfo> getCatalogInfo(String clusterId, String sessionHandleId, boolean includeSystemFunctions) {
SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId));
return getSessionManager(clusterId).orElseThrow()
.getCatalogInfo(sessionHandle);
.getCatalogInfo(sessionHandle, includeSystemFunctions);
}

/**
Expand Down Expand Up @@ -201,20 +209,9 @@ public String closeSession(String clusterId, String sessionHandleId) {
@Override
public String executeSql(String clusterId, String sessionHandleId, WsFlinkSqlGatewayQueryParamsDTO params) {
SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId));
Configuration configuration = GlobalConfiguration.loadConfiguration();
params.getConfiguration().forEach(configuration::setString);
SessionContext sessionContext = getSessionManager(clusterId).orElseThrow()
.getSession(sessionHandle)
.getSessionContext();
Configuration sessionConf = sessionContext.getSessionConf();
params.getConfiguration().forEach(sessionConf::setString);
return sessionContext.getOperationManager()
.submitOperation(handle ->
sessionContext.createOperationExecutor(sessionConf)
.executeStatement(handle, params.getSql())
)
.getIdentifier()
.toString();
return getSessionManager(clusterId)
.orElseThrow()
.executeStatement(sessionHandle, params.getConfiguration(), params.getSql());
}

/**
Expand Down Expand Up @@ -259,4 +256,51 @@ public List<String> completeStatement(String clusterId, String sessionId, String
return getSessionManager(clusterId).orElseThrow()
.completeStatement(new SessionHandle(UUID.fromString(sessionId)), statement, position);
}

@Override
public Boolean addDependencies(String clusterId, String sessionId, List<Long> jarIdList) {
try {
List<URI> jars = jarIdList.stream().map(jarId -> {
JarDTO jarDTO = jarService.getRaw(jarId);
try {
Path localPath = SystemUtil.getLocalStorageDir().resolve("jars");
if (Files.notExists(localPath)) {
Files.createDirectories(localPath);
}
Path fileName = localPath.resolve(jarDTO.getFileName());
if (Files.notExists(fileName)) {
try (OutputStream os = Files.newOutputStream(fileName)) {
jarService.download(jarId, os);
}
} else {
log.info("Jar file " + fileName + " already exists!");
}
return fileName;
} catch (IOException e) {
throw new RuntimeException(e);
}
}).map(Path::toUri)
.collect(Collectors.toList());
getSessionManager(clusterId)
.orElseThrow()
.addDependencies(new SessionHandle(UUID.fromString(sessionId)), jars);
return true;
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
return false;
}
}

@Override
public Boolean addCatalog(String clusterId, String sessionId, String catalogName, Map<String, String> options) {
try {
getSessionManager(clusterId).orElseThrow()
.addCatalog(new SessionHandle(UUID.fromString(sessionId)),
catalogName, options);
return true;
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
return false;
}
}
}

0 comments on commit acf1a83

Please sign in to comment.