Skip to content

Commit

Permalink
Add SFTP connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Aug 12, 2021
1 parent 4a14042 commit b5dc5c0
Show file tree
Hide file tree
Showing 8 changed files with 511 additions and 2 deletions.
188 changes: 188 additions & 0 deletions cdi-core/src/main/java/com/linkedin/cdi/connection/SftpConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.connection;

import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.factory.ConnectionClientFactory;
import com.linkedin.cdi.factory.DefaultConnectionClientFactory;
import com.linkedin.cdi.factory.sftp.SftpClient;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.HttpKeys;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.keys.SftpKeys;
import com.linkedin.cdi.util.InputStreamUtils;
import com.linkedin.cdi.util.WorkUnitStatus;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Slf4j
public class SftpConnection extends MultistageConnection {
private static final Logger LOG = LoggerFactory.getLogger(SftpConnection.class);

final private SftpKeys sftpSourceKeys;
SftpClient fsClient;

public SftpConnection(State state, JobKeys jobKeys, ExtractorKeys extractorKeys) {
super(state, jobKeys, extractorKeys);
assert jobKeys instanceof SftpKeys;
sftpSourceKeys = (SftpKeys) jobKeys;
}

@Override
public WorkUnitStatus execute(WorkUnitStatus status) {
return null;
}

@Override
public boolean closeAll(String message) {
if (this.fsClient != null) {
log.info("Shutting down FileSystem connection");
this.fsClient.close();
fsClient = null;
}
return true;
}

/**
This method is the main method to list files based on source base directory and source entity
ms.source.file.pattern
if Is not blank:
List the files and output as CSV
if is blank:
ms.extract.target.file.name?
if is blank:
List the files and output as CSV
if is not blank
if file size is 1
Dump the file
if files size is >1
Dump only the file which matches the pattern
*/
@Override
public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws RetriableAuthenticationException {
WorkUnitStatus status = super.executeFirst(workUnitStatus);
String path = getPath();
String finalPrefix = getWorkUnitSpecificString(path, getExtractorKeys().getDynamicParameters());
log.info("File path found is: " + finalPrefix);
try {
if (getFsClient() == null) {
log.error("Error initializing SFTP connection");
return null;
}
} catch (Exception e) {
log.error("Error initializing SFTP connection", e);
return null;
}

//get List of files matching the pattern
List<String> files;
try {
files = getFiles(finalPrefix);
} catch (Exception e) {
log.error("Error reading file list", e);
return null;
}

boolean isFilewithPrefixExist = files.stream().anyMatch(file -> file.equals(finalPrefix));
log.info("No Of Files to be processed matching the pattern: {}", files.size());
if (StringUtils.isNotBlank(sftpSourceKeys.getFilesPattern())) {
status.setBuffer(InputStreamUtils.convertListToInputStream(getFilteredFiles(files)));
} else {
if (StringUtils.isBlank(sftpSourceKeys.getTargetFilePattern())) {
status.setBuffer(InputStreamUtils.convertListToInputStream(files));
} else {
String fileToDownload = "";
if (files.size() == 1) {
fileToDownload = files.get(0);
} else if (isFilewithPrefixExist) {
fileToDownload = finalPrefix;
}
if (StringUtils.isNotBlank(fileToDownload)) {
log.info("Downloading file: {}", files.get(0));
try {
status.setBuffer(this.fsClient.getFileStream(fileToDownload));
} catch (FileBasedHelperException e) {
log.error("Error downloading file {}", fileToDownload, e);
return null;
}
} else {
log.warn("Invalid set of parameters. Please make sure to set source directory, entity and file pattern");
}
}
}
return status;
}

private SftpClient getFsClient() {
if (this.fsClient == null) {
try {
Class<?> factoryClass = Class.forName(MultistageProperties.MSTAGE_CONNECTION_CLIENT_FACTORY.getValidNonblankWithDefault(this.getState()));
ConnectionClientFactory factory = (ConnectionClientFactory) factoryClass.getDeclaredConstructor().newInstance();
this.fsClient = factory.getSftpChannelClient(this.getState());
} catch (Exception e) {
LOG.error("Error initiating SFTP client", e);
}
}
return this.fsClient;
}

/**
* //TODO: List files based on pattern on parent nodes as well.
* The current version supports pattern only on leaf node.
* Ex: file path supported "/a/b/*c*"
* file path not supported "/a/*b/*c*
* Get files list based on pattern
* @param filesPattern
* @return
*/
private List<String> getFiles(String filesPattern) {
List<String> files = new ArrayList<>();
log.info("Files to be processed from input " + filesPattern);
try {
files = fsClient.ls(filesPattern);
int i = 0;
for (String file : files) {
URI uri = new URI(file);
String filepath = uri.toString();
if (!uri.isAbsolute()) {
File f = new File(getBaseDir(filesPattern), filepath);
filepath = f.getAbsolutePath();
}
files.set(i, filepath);
i++;
}
} catch (FileBasedHelperException | URISyntaxException e) {
log.error("Unable to list files " + e.getMessage());
}
return files;
}
private String getPath() {
return sftpSourceKeys.getFilesPath();
}

private List<String> getFilteredFiles(List<String> files) {
return files.stream().filter(file -> file.matches(sftpSourceKeys.getFilesPattern())).collect(Collectors.toList());
}

private String getBaseDir(String uri) {
File file = new File(uri);
return file.getParentFile().getAbsolutePath() + sftpSourceKeys.getPathSeparator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DefaultConnectionClientFactory implements ConnectionClientFactory {
* @param state the State of execution environment
* @return an HTTP client object
*/
@Override
public HttpClient getHttpClient(State state) {
return HttpClientBuilder.create().build();
}
Expand All @@ -43,6 +44,7 @@ public HttpClient getHttpClient(State state) {
* @param config S3 parameters
* @return an S3 HTTP client object
*/
@Override
public SdkHttpClient getS3Client(State state, AttributeMap config) {
return ApacheHttpClient.builder()
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
Expand All @@ -56,6 +58,7 @@ public SdkHttpClient getS3Client(State state, AttributeMap config) {
* @param state source or work unit state that can provide the encryption master key location
* @return a JDBC connection
*/
@Override
public Connection getJdbcConnection(String jdbcUrl, String userId, String cryptedPassword, State state) {
try {
return DriverManager.getConnection(
Expand All @@ -73,6 +76,7 @@ public Connection getJdbcConnection(String jdbcUrl, String userId, String crypte
* @param state the state of execution environment
* @return a SFTP channel client
*/
@Override
public SftpClient getSftpChannelClient(State state) {
return new SftpChannelClient(state);
}
Expand All @@ -82,6 +86,7 @@ public SftpClient getSftpChannelClient(State state) {
* @param state the state of execution environment
* @return a SchemaReader
*/
@Override
public SchemaReader getSchemaReader(State state) {
// There is no default schema reader currently
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
import com.linkedin.cdi.util.Credentials;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.sftp.SftpFsHelper;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,10 +27,12 @@ public class SftpChannelClient implements SftpClient {
private static final String SFTP_CONNECTION_TIMEOUT_KEY = "sftpConn.timeout";
private static final int DEFAULT_SFTP_CONNECTION_TIMEOUT_IN_MS = 3000; //in milliseconds

private State state = null;
private Session session = null;
private JSch jsch = new JSch();

public SftpChannelClient(State state) {
this.state = state;
initialize(state);
}

Expand Down Expand Up @@ -59,7 +66,7 @@ private void initialize(State state) {
* @throws SftpException
*/
@Override
public ChannelSftp getSftpChannel(State state) throws SftpException {
public ChannelSftp getSftpChannel() throws SftpException {
try {
ChannelSftp channelSftp = (ChannelSftp) this.session.openChannel("sftp");
// In millsec
Expand All @@ -70,4 +77,51 @@ public ChannelSftp getSftpChannel(State state) throws SftpException {
throw new SftpException(0, "Cannot open a channel to SFTP server", e);
}
}

/**
* Close the session and therefore its channels
*/
@Override
public void close() {
if (this.session != null) {
this.session.disconnect();
}
}

/**
* Executes a get SftpCommand and returns an input stream to the file
* @throws SftpException
*/
@Override
public InputStream getFileStream(String file) throws FileBasedHelperException {
SftpMonitor monitor = new SftpMonitor();
try {
ChannelSftp channel = getSftpChannel();
return new SftpChannelFileInputStream(channel.get(file, monitor), channel);
} catch (SftpException e) {
throw new FileBasedHelperException("Cannot download file " + file + " due to " + e.getMessage(), e);
}
}

/**
* Exceute an FTP ls command
* @param path
* @return the list of files and directories
* @throws FileBasedHelperException
*/
@Override
public List<String> ls(String path) throws FileBasedHelperException {
try {
List<String> list = new ArrayList<>();
ChannelSftp channel = getSftpChannel();
Vector<ChannelSftp.LsEntry> vector = channel.ls(path);
for (ChannelSftp.LsEntry entry : vector) {
list.add(entry.getFilename());
}
channel.disconnect();
return list;
} catch (SftpException e) {
throw new FileBasedHelperException("Cannot execute ls command on sftp connection", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021 LinkedIn Corporation. All rights reserved.
// Licensed under the BSD-2 Clause license.
// See LICENSE in the project root for license information.

package com.linkedin.cdi.factory.sftp;

import com.jcraft.jsch.Channel;
import java.io.IOException;
import java.io.InputStream;
import org.apache.gobblin.util.io.SeekableFSInputStream;

/**
* A {@link SeekableFSInputStream} that holds a handle on the Sftp {@link Channel} used to open the
* {@link InputStream}. The {@link Channel} is disconnected when {@link InputStream#close()} is called.
*/
public class SftpChannelFileInputStream extends SeekableFSInputStream {
private final Channel channel;

public SftpChannelFileInputStream(InputStream in, Channel channel) {
super(in);
this.channel = channel;
}

@Override
public void close() throws IOException {
super.close();
this.channel.disconnect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,39 @@

import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpException;
import java.io.InputStream;
import java.util.List;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;


/**
* The base class for dynamic schema reader based on environment.
*/
public interface SftpClient {
ChannelSftp getSftpChannel(State state) throws SftpException;
/**
* Establish a secure channel
* @return a new secure channel
* @throws SftpException
*/
ChannelSftp getSftpChannel() throws SftpException;

/**
* Close the session and therefore its channels
*/
void close();

/**
* Executes a get SftpCommand and returns an input stream to the file
* @throws SftpException
*/
InputStream getFileStream(String file) throws FileBasedHelperException;

/**
* Exceute an FTP ls command
* @param path
* @return the list of files and directories
* @throws FileBasedHelperException
*/
List<String> ls(String path) throws FileBasedHelperException;
}
Loading

0 comments on commit b5dc5c0

Please sign in to comment.