Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Fix Iceberg lineage for Windows (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby authored Mar 9, 2021
1 parent 2b229d4 commit c5714e4
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
* Returns list of pairs of (file path, file id) to build lineage column.
*
* File paths should be the same format as "input_file_name()" of the given relation type.
* input_file_name() could be different depending on the OS and source.
*
* For [[DefaultFileBasedRelation]], each file path should be in this format:
* `file:///path/to/file`
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)
* Returns list of pairs of (file path, file id) to build lineage column.
*
* File paths should be the same format as "input_file_name()" of the given relation type.
* input_file_name() could be different depending on the OS and source.
*
* For [[DeltaLakeRelation]], each file path should be in this format:
* `file:/path/to/file`
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.microsoft.hyperspace.index.sources.iceberg

import java.util.Locale

import collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.iceberg.{FileScanTask, Schema, Table}
Expand Down Expand Up @@ -122,15 +124,29 @@ class IcebergRelation(spark: SparkSession, override val plan: DataSourceV2Relati
* Returns list of pairs of (file path, file id) to build lineage column.
*
* File paths should be the same format as "input_file_name()" of the given relation type.
* input_file_name() could be different depending on the OS and source.
*
* For [[IcebergRelation]], each file path should be in this format:
* `/path/to/file`
* `/path/to/file` or `X:/path/to/file` for Windows file system.
*
* @param fileIdTracker [[FileIdTracker]] to create the list of (file path, file id).
* @return List of pairs of (file path, file id).
*/
override def lineagePairs(fileIdTracker: FileIdTracker): Seq[(String, Long)] = {
fileIdTracker.getFileToIdMap.toSeq.map { kv =>
(kv._1._1.replaceAll("^file:/{1,3}", "/"), kv._2)
// For Windows,
// original file path: file:/C:/path/to/file
// input_file_name(): C:/path/to/file
// For Linux,
// original file path: file:///path/to/file or file:/path/to/file
// input_file_name(): /path/to/file
if (Path.WINDOWS) {
fileIdTracker.getFileToIdMap.toSeq.map { kv =>
(kv._1._1.stripPrefix("file:/"), kv._2)
}
} else {
fileIdTracker.getFileToIdMap.toSeq.map { kv =>
(kv._1._1.replaceFirst("^file:/{1,3}", "/"), kv._2)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ trait HyperspaceSuite extends SparkFunSuite with SparkInvolvedSuite {
// "inheriting conflicting members" issue.
val path = Utils.createTempDir()
path.delete()
try f(path.toString)
// Create an environment specific path string. Utils.createTempDir() returns `/path/to/file`
// format, however some of APIs (e.g. Iceberg) cannot handle the path string properly
// in Windows. Therefore, convert the path string to an environment specific one by
// using `new Path`.
val pathStr = new Path(path.toString).toString
try f(pathStr)
finally Utils.deleteRecursively(path)
}
}

0 comments on commit c5714e4

Please sign in to comment.