From 39311aa59677d420bf26371c022fc344974336f1 Mon Sep 17 00:00:00 2001 From: Chris Li Date: Wed, 25 Aug 2021 12:43:24 -0700 Subject: [PATCH] Sftp connection fixes --- .../cdi/connection/SftpConnection.java | 18 ++---- .../cdi/factory/sftp/SftpChannelClient.java | 61 +++++++++++++++---- .../linkedin/cdi/factory/sftp/SftpClient.java | 28 ++++++--- .../cdi/factory/sftp/SftpMonitor.java | 8 +-- 4 files changed, 78 insertions(+), 37 deletions(-) diff --git a/cdi-core/src/main/java/com/linkedin/cdi/connection/SftpConnection.java b/cdi-core/src/main/java/com/linkedin/cdi/connection/SftpConnection.java index 09d937b..82c2754 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/connection/SftpConnection.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/connection/SftpConnection.java @@ -7,26 +7,20 @@ import com.linkedin.cdi.configuration.MultistageProperties; import com.linkedin.cdi.exception.RetriableAuthenticationException; import com.linkedin.cdi.factory.ConnectionClientFactory; -import com.linkedin.cdi.factory.DefaultConnectionClientFactory; import com.linkedin.cdi.factory.sftp.SftpClient; import com.linkedin.cdi.keys.ExtractorKeys; -import com.linkedin.cdi.keys.HttpKeys; import com.linkedin.cdi.keys.JobKeys; import com.linkedin.cdi.keys.SftpKeys; import com.linkedin.cdi.util.InputStreamUtils; import com.linkedin.cdi.util.WorkUnitStatus; import java.io.File; -import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; -import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,7 +93,7 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab return null; } - boolean isFilewithPrefixExist = files.stream().anyMatch(file -> file.equals(finalPrefix)); + boolean isFileWithPrefixExist = files.stream().anyMatch(file -> file.equals(finalPrefix)); log.info("No Of Files to be processed matching the pattern: {}", files.size()); if (StringUtils.isNotBlank(sftpSourceKeys.getFilesPattern())) { status.setBuffer(InputStreamUtils.convertListToInputStream(getFilteredFiles(files))); @@ -110,14 +104,14 @@ public WorkUnitStatus executeFirst(WorkUnitStatus workUnitStatus) throws Retriab String fileToDownload = ""; if (files.size() == 1) { fileToDownload = files.get(0); - } else if (isFilewithPrefixExist) { + } else if (isFileWithPrefixExist) { fileToDownload = finalPrefix; } if (StringUtils.isNotBlank(fileToDownload)) { log.info("Downloading file: {}", files.get(0)); try { status.setBuffer(this.fsClient.getFileStream(fileToDownload)); - } catch (FileBasedHelperException e) { + } catch (Exception e) { log.error("Error downloading file {}", fileToDownload, e); return null; } @@ -148,8 +142,8 @@ private SftpClient getFsClient() { * Ex: file path supported "/a/b/*c*" * file path not supported "/a/*b/*c* * Get files list based on pattern - * @param filesPattern - * @return + * @param filesPattern pattern of content to list + * @return list of content */ private List getFiles(String filesPattern) { List files = new ArrayList<>(); @@ -167,7 +161,7 @@ private List getFiles(String filesPattern) { files.set(i, filepath); i++; } - } catch (FileBasedHelperException | URISyntaxException e) { + } catch (Exception e) { log.error("Unable to list files " + e.getMessage()); } return files; diff --git a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpChannelClient.java b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpChannelClient.java index 31e9a10..3d55c9a 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpChannelClient.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpChannelClient.java @@ -17,7 +17,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.State; import org.apache.gobblin.source.extractor.extract.sftp.SftpFsHelper; -import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +26,7 @@ public class SftpChannelClient implements SftpClient { private static final String SFTP_CONNECTION_TIMEOUT_KEY = "sftpConn.timeout"; private static final int DEFAULT_SFTP_CONNECTION_TIMEOUT_IN_MS = 3000; //in milliseconds - protected State state = null; + protected State state; protected Session session = null; protected JSch jsch = new JSch(); @@ -63,13 +62,13 @@ protected void initializeConnection(State state) { * commands in parallel. All created channels are cleaned up when the session is closed. * * @return a new {@link ChannelSftp} - * @throws SftpException + * @throws SftpException An SftpException */ @Override public ChannelSftp getSftpChannel() throws SftpException { try { ChannelSftp channelSftp = (ChannelSftp) this.session.openChannel("sftp"); - // In millsec + // In milliseconds int connTimeout = state.getPropAsInt(SFTP_CONNECTION_TIMEOUT_KEY, DEFAULT_SFTP_CONNECTION_TIMEOUT_IN_MS); channelSftp.connect(connTimeout); return channelSftp; @@ -90,27 +89,25 @@ public void close() { /** * Executes a get SftpCommand and returns an input stream to the file - * @throws SftpException */ @Override - public InputStream getFileStream(String file) throws FileBasedHelperException { + public InputStream getFileStream(String file) { SftpMonitor monitor = new SftpMonitor(); try { ChannelSftp channel = getSftpChannel(); return new SftpChannelFileInputStream(channel.get(file, monitor), channel); } catch (SftpException e) { - throw new FileBasedHelperException("Cannot download file " + file + " due to " + e.getMessage(), e); + throw new RuntimeException("Cannot download file " + file + " due to " + e.getMessage(), e); } } /** - * Exceute an FTP ls command - * @param path + * Execute an FTP ls command + * @param path the target path to list content * @return the list of files and directories - * @throws FileBasedHelperException */ @Override - public List ls(String path) throws FileBasedHelperException { + public List ls(String path) { try { List list = new ArrayList<>(); ChannelSftp channel = getSftpChannel(); @@ -121,7 +118,47 @@ public List ls(String path) throws FileBasedHelperException { channel.disconnect(); return list; } catch (SftpException e) { - throw new FileBasedHelperException("Cannot execute ls command on sftp connection", e); + throw new RuntimeException("Cannot execute ls command on sftp connection", e); + } + } + + /** + * Get file modification time + * @param path file path on target to be checked + * @return the modification time in long format + */ + @Override + public long getFileMTime(String path) { + ChannelSftp channelSftp = null; + try { + channelSftp = getSftpChannel(); + return channelSftp.lstat(path).getMTime(); + } catch (SftpException e) { + throw new RuntimeException( + String.format("Failed to get modified timestamp for file at path %s due to error %s", path, + e.getMessage()), e); + } finally { + if (channelSftp != null) { + channelSftp.disconnect(); + } + } + } + + /** + * Get file size + * @param path file path on target to be checked + * @return the file size + */ + @Override + public long getFileSize(String path) { + try { + ChannelSftp channelSftp = getSftpChannel(); + long fileSize = channelSftp.lstat(path).getSize(); + channelSftp.disconnect(); + return fileSize; + } catch (SftpException e) { + throw new RuntimeException( + String.format("Failed to get size for file at path %s due to error %s", path, e.getMessage()), e); } } } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpClient.java b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpClient.java index 9cd5605..09df9c8 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpClient.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpClient.java @@ -8,8 +8,6 @@ import com.jcraft.jsch.SftpException; import java.io.InputStream; import java.util.List; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; /** @@ -19,7 +17,7 @@ public interface SftpClient { /** * Establish a secure channel * @return a new secure channel - * @throws SftpException + * @throws SftpException An SftpException */ ChannelSftp getSftpChannel() throws SftpException; @@ -30,15 +28,27 @@ public interface SftpClient { /** * Executes a get SftpCommand and returns an input stream to the file - * @throws SftpException */ - InputStream getFileStream(String file) throws FileBasedHelperException; + InputStream getFileStream(String file); /** - * Exceute an FTP ls command - * @param path + * Execute an FTP ls command + * @param path path on target host to be listed * @return the list of files and directories - * @throws FileBasedHelperException */ - List ls(String path) throws FileBasedHelperException; + List ls(String path); + + /** + * Get file modification time + * @param path file path on target to be checked + * @return the modification time in long format + */ + long getFileMTime(String path); + + /** + * Get file size + * @param path file path on target to be checked + * @return the file size + */ + long getFileSize(String path); } diff --git a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpMonitor.java b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpMonitor.java index fee9e7e..f30a9e5 100644 --- a/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpMonitor.java +++ b/cdi-core/src/main/java/com/linkedin/cdi/factory/sftp/SftpMonitor.java @@ -31,14 +31,14 @@ public class SftpMonitor implements SftpProgressMonitor { @Setter private long logFrequency; @Getter - private long startime; + private long startTime; @Override public void init(int op, String src, String dest, long max) { this.op = op; this.src = src; this.dest = dest; - this.startime = System.currentTimeMillis(); + this.startTime = System.currentTimeMillis(); this.logFrequency = 0L; if (op == SftpProgressMonitor.GET) { LOG.info("DOWNLOAD operation has started with src: " + src + " dest: " + dest + " and file length: " + (max @@ -68,14 +68,14 @@ public boolean count(long count) { @Override public void end() { - long secs = (System.currentTimeMillis() - this.startime) / 1000L; + long secs = (System.currentTimeMillis() - this.startTime) / 1000L; LOG.info("Transfer finished " + this.op + " src: " + this.src + " dest: " + this.dest + " in " + secs + " at " + getMbps()); } private String getMbps() { long mb = this.totalCount / 1000000L; - long secs = (System.currentTimeMillis() - this.startime) / 1000L; + long secs = (System.currentTimeMillis() - this.startTime) / 1000L; double mbps = secs == 0L ? 0.0D : mb * 1.0D / secs; return String.format("%.2f", mbps); }