Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
applied fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ddrag-ai-dragonfly committed Apr 14, 2021
1 parent 8078480 commit c742ea9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 22 deletions.
73 changes: 51 additions & 22 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.microsoft.hyperspace.index
import com.fasterxml.jackson.annotation.JsonIgnore
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.Content.recFilesApply
import com.microsoft.hyperspace.util.PathUtils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
Expand All @@ -43,27 +45,17 @@ case class Content(root: Directory, fingerprint: NoOpFingerprint = NoOpFingerpri
@JsonIgnore
lazy val files: Seq[Path] = {
// Recursively find files from directory tree.
rec(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
recFilesApply(new Path(root.name), root, (f, prefix) => new Path(prefix, f.name))
}

@JsonIgnore
lazy val fileInfos: Set[FileInfo] = {
rec(
recFilesApply(
new Path(root.name),
root,
(f, prefix) =>
FileInfo(new Path(prefix, f.name).toString, f.size, f.modifiedTime, f.id)).toSet
}

private def rec[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
val files = directory.files.map(f => func(f, prefixPath))
files ++ directory.subDirs.flatMap { dir =>
rec(new Path(prefixPath, dir.name), dir, func)
}
}
}

object Content {
Expand Down Expand Up @@ -109,6 +101,43 @@ object Content {
None
}
}

/**
* Apply `func` to each file in directory recursively.
*
* @param prefixPath Root prefix
* @param directory Root directory
* @param func Function which would apply to current prefix and file
* @tparam T
* @return Result list of applying function to all files
*/
def recFilesApply[T](
prefixPath: Path,
directory: Directory,
func: (FileInfo, Path) => T): Seq[T] = {
@tailrec
def recAcc[A](
dirMap: List[(Path, Seq[Directory])],
func: (FileInfo, Path) => A,
acc: Seq[A] = Seq.empty): Seq[A] = {
dirMap match {
case Nil => acc
case (curPrefixPath, curDirs) :: otherDirs =>
val curAcc = for {
dir <- curDirs
file <- dir.files
} yield func(file, new Path(curPrefixPath, dir.name))

val newLevels = curDirs
.filter(_.subDirs.nonEmpty)
.map(dir => (new Path(curPrefixPath, dir.name), dir.subDirs))

recAcc(otherDirs ++ newLevels, func, curAcc ++ acc)
}
}

recAcc(List(prefixPath -> Seq(directory)), func)
}
}

/**
Expand Down Expand Up @@ -481,6 +510,12 @@ case class IndexLogEntry(
relations.head.data.properties.update
}

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

def copyWithUpdate(
latestFingerprint: LogicalPlanFingerprint,
appended: Seq[FileInfo],
Expand All @@ -505,22 +540,12 @@ case class IndexLogEntry(
Content.fromLeafFiles(deleted.map(toFileStatus), fileIdTracker)))))))))))
}

def relations: Seq[Relation] = {
// Only one relation is currently supported.
assert(source.plan.properties.relations.size == 1)
source.plan.properties.relations
}

def bucketSpec: BucketSpec =
BucketSpec(
numBuckets = numBuckets,
bucketColumnNames = indexedColumns,
sortColumnNames = indexedColumns)

def numBuckets: Int = derivedDataset.properties.numBuckets

def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed

override def equals(o: Any): Boolean = o match {
case that: IndexLogEntry =>
config.equals(that.config) &&
Expand All @@ -532,8 +557,12 @@ case class IndexLogEntry(
case _ => false
}

def numBuckets: Int = derivedDataset.properties.numBuckets

def config: IndexConfig = IndexConfig(name, indexedColumns, includedColumns)

def indexedColumns: Seq[String] = derivedDataset.properties.columns.indexed

def includedColumns: Seq[String] = derivedDataset.properties.columns.included

def signature: Signature = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,47 @@ class IndexLogEntryTest extends SparkFunSuite with SQLHelper with BeforeAndAfter
assert(actual.sourceFilesSizeInBytes == 200L)
}

test("Content.recFilesApply returns a result list of applying function to all files.") {
val directory = Directory(
"file:/",
files = Seq(FileInfo("f0", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"b",
files =
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(Directory("c"))),
Directory("d")))))

def theFunction: (FileInfo, Path) => Path = (f, prefix) => new Path(prefix, f.name)

val res = Content.recFilesApply(new Path("file:/"), directory, theFunction)

val expected =
Seq("file:/f0", "file:/a/f1", "file:/a/f2", "file:/a/b/f3", "file:/a/b/f4")
.map(new Path(_))
.toSet

val actual = res.toSet
assert(actual.equals(expected))
}

test("Content.recFilesApply returns empty list for directories without files.") {
val directory = Directory("file:/")

val res = Content.recFilesApply(
new Path("file:/"),
directory,
(f, prefix) => new Path(prefix, f.name))

assert(res.isEmpty)
}

test("Content.files api lists all files from Content object.") {
val content = Content(Directory("file:/", subDirs =
Seq(
Expand Down

0 comments on commit c742ea9

Please sign in to comment.