From c5714e4fcd641060421bb384e18293a3e525760d Mon Sep 17 00:00:00 2001 From: EJ Song <51077614+sezruby@users.noreply.github.com> Date: Tue, 9 Mar 2021 12:56:49 -0800 Subject: [PATCH] Fix Iceberg lineage for Windows (#375) --- .../default/DefaultFileBasedRelation.scala | 2 ++ .../sources/delta/DeltaLakeRelation.scala | 2 ++ .../sources/iceberg/IcebergRelation.scala | 22 ++++++++++++++++--- .../hyperspace/index/HyperspaceSuite.scala | 7 +++++- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala index 448b8c943..5d14f7085 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/default/DefaultFileBasedRelation.scala @@ -224,6 +224,8 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe * Returns list of pairs of (file path, file id) to build lineage column. * * File paths should be the same format as "input_file_name()" of the given relation type. + * input_file_name() could be different depending on the OS and source. + * * For [[DefaultFileBasedRelation]], each file path should be in this format: * `file:///path/to/file` * diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala index 3871e07dc..60d584afb 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala @@ -121,6 +121,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation) * Returns list of pairs of (file path, file id) to build lineage column. * * File paths should be the same format as "input_file_name()" of the given relation type. + * input_file_name() could be different depending on the OS and source. + * * For [[DeltaLakeRelation]], each file path should be in this format: * `file:/path/to/file` * diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala index 77cb4bf5c..bfc31c686 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala @@ -16,6 +16,8 @@ package com.microsoft.hyperspace.index.sources.iceberg +import java.util.Locale + import collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.iceberg.{FileScanTask, Schema, Table} @@ -122,15 +124,29 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati * Returns list of pairs of (file path, file id) to build lineage column. * * File paths should be the same format as "input_file_name()" of the given relation type. + * input_file_name() could be different depending on the OS and source. + * * For [[IcebergRelation]], each file path should be in this format: - * `/path/to/file` + * `/path/to/file` or `X:/path/to/file` for Windows file system. * * @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id). * @return List of pairs of (file path, file id). */ override def lineagePairs(fileIdTracker: FileIdTracker): Seq[(String, Long)] = { - fileIdTracker.getFileToIdMap.toSeq.map { kv => - (kv._1._1.replaceAll("^file:/{1,3}", "/"), kv._2) + // For Windows, + // original file path: file:/C:/path/to/file + // input_file_name(): C:/path/to/file + // For Linux, + // original file path: file:///path/to/file or file:/path/to/file + // input_file_name(): /path/to/file + if (Path.WINDOWS) { + fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1.stripPrefix("file:/"), kv._2) + } + } else { + fileIdTracker.getFileToIdMap.toSeq.map { kv => + (kv._1._1.replaceFirst("^file:/{1,3}", "/"), kv._2) + } } } diff --git a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala index b6a374845..85178f806 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/HyperspaceSuite.scala @@ -111,7 +111,12 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite { // "inheriting conflicting members" issue. val path = Utils.createTempDir() path.delete() - try f(path.toString) + // Create an environment specific path string. Utils.createTempDir() returns `/path/to/file` + // format, however some of APIs (e.g. Iceberg) cannot handle the path string properly + // in Windows. Therefore, convert the path string to an environment specific one by + // using `new Path`. + val pathStr = new Path(path.toString).toString + try f(pathStr) finally Utils.deleteRecursively(path) } }