Skip to content

Commit

Permalink
Sftp connection fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Li committed Aug 25, 2021
1 parent 58bcb7c commit 39311aa
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,20 @@
import com.linkedin.cdi.configuration.MultistageProperties;
import com.linkedin.cdi.exception.RetriableAuthenticationException;
import com.linkedin.cdi.factory.ConnectionClientFactory;
import com.linkedin.cdi.factory.DefaultConnectionClientFactory;
import com.linkedin.cdi.factory.sftp.SftpClient;
import com.linkedin.cdi.keys.ExtractorKeys;
import com.linkedin.cdi.keys.HttpKeys;
import com.linkedin.cdi.keys.JobKeys;
import com.linkedin.cdi.keys.SftpKeys;
import com.linkedin.cdi.util.InputStreamUtils;
import com.linkedin.cdi.util.WorkUnitStatus;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,7 +93,7 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab
return null;
}

boolean isFilewithPrefixExist = files.stream().anyMatch(file -> file.equals(finalPrefix));
boolean isFileWithPrefixExist = files.stream().anyMatch(file -> file.equals(finalPrefix));
log.info("No Of Files to be processed matching the pattern: {}", files.size());
if (StringUtils.isNotBlank(sftpSourceKeys.getFilesPattern())) {
status.setBuffer(InputStreamUtils.convertListToInputStream(getFilteredFiles(files)));
Expand All @@ -110,14 +104,14 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab
String fileToDownload = "";
if (files.size() == 1) {
fileToDownload = files.get(0);
} else if (isFilewithPrefixExist) {
} else if (isFileWithPrefixExist) {
fileToDownload = finalPrefix;
}
if (StringUtils.isNotBlank(fileToDownload)) {
log.info("Downloading file: {}", files.get(0));
try {
status.setBuffer(this.fsClient.getFileStream(fileToDownload));
} catch (FileBasedHelperException e) {
} catch (Exception e) {
log.error("Error downloading file {}", fileToDownload, e);
return null;
}
Expand Down Expand Up @@ -148,8 +142,8 @@ private SftpClient getFsClient() {
* Ex: file path supported "/a/b/*c*"
* file path not supported "/a/*b/*c*
* Get files list based on pattern
* @param filesPattern
* @return
* @param filesPattern pattern of content to list
* @return list of content
*/
private List<String> getFiles(String filesPattern) {
List<String> files = new ArrayList<>();
Expand All @@ -167,7 +161,7 @@ private List<String> getFiles(String filesPattern) {
files.set(i, filepath);
i++;
}
} catch (FileBasedHelperException | URISyntaxException e) {
} catch (Exception e) {
log.error("Unable to list files " + e.getMessage());
}
return files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.extract.sftp.SftpFsHelper;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,7 +26,7 @@ public class SftpChannelClient implements SftpClient {
private static final String SFTP_CONNECTION_TIMEOUT_KEY = "sftpConn.timeout";
private static final int DEFAULT_SFTP_CONNECTION_TIMEOUT_IN_MS = 3000; //in milliseconds

protected State state = null;
protected State state;
protected Session session = null;
protected JSch jsch = new JSch();

Expand Down Expand Up @@ -63,13 +62,13 @@ protected void initializeConnection(State state) {
* commands in parallel. All created channels are cleaned up when the session is closed.
*
* @return a new {@link ChannelSftp}
* @throws SftpException
* @throws SftpException An SftpException
*/
@Override
public ChannelSftp getSftpChannel() throws SftpException {
try {
ChannelSftp channelSftp = (ChannelSftp) this.session.openChannel("sftp");
// In millsec
// In milliseconds
int connTimeout = state.getPropAsInt(SFTP_CONNECTION_TIMEOUT_KEY, DEFAULT_SFTP_CONNECTION_TIMEOUT_IN_MS);
channelSftp.connect(connTimeout);
return channelSftp;
Expand All @@ -90,27 +89,25 @@ public void close() {

/**
* Executes a get SftpCommand and returns an input stream to the file
* @throws SftpException
*/
@Override
public InputStream getFileStream(String file) throws FileBasedHelperException {
public InputStream getFileStream(String file) {
SftpMonitor monitor = new SftpMonitor();
try {
ChannelSftp channel = getSftpChannel();
return new SftpChannelFileInputStream(channel.get(file, monitor), channel);
} catch (SftpException e) {
throw new FileBasedHelperException("Cannot download file " + file + " due to " + e.getMessage(), e);
throw new RuntimeException("Cannot download file " + file + " due to " + e.getMessage(), e);
}
}

/**
* Exceute an FTP ls command
* @param path
* Execute an FTP ls command
* @param path the target path to list content
* @return the list of files and directories
* @throws FileBasedHelperException
*/
@Override
public List<String> ls(String path) throws FileBasedHelperException {
public List<String> ls(String path) {
try {
List<String> list = new ArrayList<>();
ChannelSftp channel = getSftpChannel();
Expand All @@ -121,7 +118,47 @@ public List<String> ls(String path) throws FileBasedHelperException {
channel.disconnect();
return list;
} catch (SftpException e) {
throw new FileBasedHelperException("Cannot execute ls command on sftp connection", e);
throw new RuntimeException("Cannot execute ls command on sftp connection", e);
}
}

/**
* Get file modification time
* @param path file path on target to be checked
* @return the modification time in long format
*/
@Override
public long getFileMTime(String path) {
ChannelSftp channelSftp = null;
try {
channelSftp = getSftpChannel();
return channelSftp.lstat(path).getMTime();
} catch (SftpException e) {
throw new RuntimeException(
String.format("Failed to get modified timestamp for file at path %s due to error %s", path,
e.getMessage()), e);
} finally {
if (channelSftp != null) {
channelSftp.disconnect();
}
}
}

/**
* Get file size
* @param path file path on target to be checked
* @return the file size
*/
@Override
public long getFileSize(String path) {
try {
ChannelSftp channelSftp = getSftpChannel();
long fileSize = channelSftp.lstat(path).getSize();
channelSftp.disconnect();
return fileSize;
} catch (SftpException e) {
throw new RuntimeException(
String.format("Failed to get size for file at path %s due to error %s", path, e.getMessage()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.jcraft.jsch.SftpException;
import java.io.InputStream;
import java.util.List;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;


/**
Expand All @@ -19,7 +17,7 @@ public interface SftpClient {
/**
* Establish a secure channel
* @return a new secure channel
* @throws SftpException
* @throws SftpException An SftpException
*/
ChannelSftp getSftpChannel() throws SftpException;

Expand All @@ -30,15 +28,27 @@ public interface SftpClient {

/**
* Executes a get SftpCommand and returns an input stream to the file
* @throws SftpException
*/
InputStream getFileStream(String file) throws FileBasedHelperException;
InputStream getFileStream(String file);

/**
* Exceute an FTP ls command
* @param path
* Execute an FTP ls command
* @param path path on target host to be listed
* @return the list of files and directories
* @throws FileBasedHelperException
*/
List<String> ls(String path) throws FileBasedHelperException;
List<String> ls(String path);

/**
* Get file modification time
* @param path file path on target to be checked
* @return the modification time in long format
*/
long getFileMTime(String path);

/**
* Get file size
* @param path file path on target to be checked
* @return the file size
*/
long getFileSize(String path);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ public class SftpMonitor implements SftpProgressMonitor {
@Setter
private long logFrequency;
@Getter
private long startime;
private long startTime;

@Override
public void init(int op, String src, String dest, long max) {
this.op = op;
this.src = src;
this.dest = dest;
this.startime = System.currentTimeMillis();
this.startTime = System.currentTimeMillis();
this.logFrequency = 0L;
if (op == SftpProgressMonitor.GET) {
LOG.info("DOWNLOAD operation has started with src: " + src + " dest: " + dest + " and file length: " + (max
Expand Down Expand Up @@ -68,14 +68,14 @@ public boolean count(long count) {

@Override
public void end() {
long secs = (System.currentTimeMillis() - this.startime) / 1000L;
long secs = (System.currentTimeMillis() - this.startTime) / 1000L;
LOG.info("Transfer finished " + this.op + " src: " + this.src + " dest: " + this.dest + " in " + secs + " at "
+ getMbps());
}

private String getMbps() {
long mb = this.totalCount / 1000000L;
long secs = (System.currentTimeMillis() - this.startime) / 1000L;
long secs = (System.currentTimeMillis() - this.startTime) / 1000L;
double mbps = secs == 0L ? 0.0D : mb * 1.0D / secs;
return String.format("%.2f", mbps);
}
Expand Down

0 comments on commit 39311aa

Please sign in to comment.