Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Refactor hashing and fingerprint functions #404

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,39 @@ package com.microsoft.hyperspace.index
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder, FingerprintBuilderFactory}

/**
* [[FileBasedSignatureProvider]] provides the logical plan signature based on files in the
* relation. File metadata, eg. size, modification time and path, of each file in the
* relation will be used to generate the signature.
*
* Note that while the order of files in a single relation does not affect the signature,
* the order of relations in the plan do affect the signature calculation.
*
* If the given logical plan does not have any supported relations, no signature is provided.
*/
class FileBasedSignatureProvider extends LogicalPlanSignatureProvider {
class FileBasedSignatureProvider(fbf: FingerprintBuilderFactory)
extends LogicalPlanSignatureProvider {

/**
* Generate the signature of logical plan.
*
* @param logicalPlan logical plan of data frame.
* @return signature, if the logical plan has supported relations; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
fingerprintVisitor(logicalPlan).map(HashingUtils.md5Hex)
}

/**
* Visit logical plan and collect info needed for fingerprint.
*
* @param logicalPlan logical plan of data frame.
* @return fingerprint, if the logical plan has supported relations; Otherwise None.
*/
private def fingerprintVisitor(logicalPlan: LogicalPlan): Option[String] = {
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
val provider = Hyperspace.getContext.sourceProviderManager
var fingerprint = ""
val fb: FingerprintBuilder = fbf.create
var updated = false
logicalPlan.foreachUp {
Copy link
Collaborator

@sezruby sezruby May 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not tested code

Suggested change
logicalPlan.foreachUp {
val hasSupportedRelations = logicalPlan.collect {
case l: LeafNode if provider.isSupportedRelation(l) =>
provider.getRelation(l).signature(fb).foreach(fb.add(_))
true
}
if (hasSupportedRelation.nonEmpty()) Some(fb.build()) else None

case l: LeafNode if provider.isSupportedRelation(l) =>
fingerprint ++= provider.getRelation(l).signature
provider.getRelation(l).signature(fb).foreach { f =>
fb.add(f)
updated = true
}
case _ =>
}

fingerprint match {
case "" => None
case _ => Some(fingerprint)
}
if (updated) Some(fb.build()) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import com.microsoft.hyperspace.{BuildInfo, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.fingerprint.Fingerprint

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
case class NoOpFingerprint() {
Expand Down Expand Up @@ -361,7 +362,7 @@ object CoveringIndex {
}

// IndexLogEntry-specific Signature that stores the signature provider and value.
case class Signature(provider: String, value: String)
case class Signature(provider: String, value: Fingerprint)

// IndexLogEntry-specific LogicalPlanFingerprint to store fingerprint of logical plan.
case class LogicalPlanFingerprint(properties: LogicalPlanFingerprint.Properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilderFactory}

/**
* [[IndexSignatureProvider]] provides signature for a logical plan based on:
Expand All @@ -29,10 +29,13 @@ import com.microsoft.hyperspace.util.HashingUtils
*
* If the plan does not comply with [[FileBasedSignatureProvider]] or [[PlanSignatureProvider]]
* requirements for signature computation, then no signature will be provided for the plan.
*
* @param fbf [[FingerprintBuilderFactory]] used for building fingerprints
*/
class IndexSignatureProvider extends LogicalPlanSignatureProvider {
private val fileBasedSignatureProvider = new FileBasedSignatureProvider
private val planSignatureProvider = new PlanSignatureProvider
class IndexSignatureProvider(fbf: FingerprintBuilderFactory)
clee704 marked this conversation as resolved.
Show resolved Hide resolved
extends LogicalPlanSignatureProvider {
private val fileBasedSignatureProvider = new FileBasedSignatureProvider(fbf)
private val planSignatureProvider = new PlanSignatureProvider(fbf)

/**
* Generate the signature of logical plan.
Expand All @@ -41,10 +44,10 @@ class IndexSignatureProvider extends LogicalPlanSignatureProvider {
* @return signature, if both [[FileBasedSignatureProvider]] and [[PlanSignatureProvider]]
* can generate signature for the logical plan; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
fileBasedSignatureProvider.signature(logicalPlan).flatMap { f =>
planSignatureProvider.signature(logicalPlan).map { p =>
HashingUtils.md5Hex(f + p)
fbf.create.add(f).add(p).build()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import scala.util.{Success, Try}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.util.hyperspace.Utils

import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilderFactory, MD5FingerprintBuilderFactory}

/**
* This trait contains the interface that provides the signature of logical plan.
*
* The implementation must have a constructor taking [[FingerprintBuilderFactory]] as an argument.
*/
trait LogicalPlanSignatureProvider {

Expand All @@ -36,15 +40,17 @@ trait LogicalPlanSignatureProvider {
* @param logicalPlan logical plan.
* @return signature if it can be computed w.r.t signature provider assumptions; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String]
def signature(logicalPlan: LogicalPlan): Option[Fingerprint]
}

/**
* Factory object for LogicalPlanSignatureProvider.
*/
object LogicalPlanSignatureProvider {
private val fbf: FingerprintBuilderFactory = new MD5FingerprintBuilderFactory
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding "algorithm" in IndexLogEntry? for clarity

"fingerprint" : {
          "properties" : {
            "signatures" : [ {
              "algorithm":"md5",
              "provider" : "com.microsoft.hyperspace.index.IndexSignatureProvider",
              "value" : "5269971142c4be0c144c61130cde5543"
            } ]
          },
          "kind" : "LogicalPlan"
        }


// Creates a default signature provider.
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider(fbf)

/**
* Creates a parameterized signature provider.
Expand All @@ -53,7 +59,11 @@ object LogicalPlanSignatureProvider {
* @return signature provider.
*/
def create(name: String): LogicalPlanSignatureProvider = {
Try(Utils.classForName(name).newInstance) match {
Try(
Utils
.classForName(name)
.getConstructor(classOf[FingerprintBuilderFactory])
.newInstance(fbf)) match {
case Success(provider: LogicalPlanSignatureProvider) => provider
case _ =>
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,26 @@ package com.microsoft.hyperspace.index

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder, FingerprintBuilderFactory}

/**
* [[PlanSignatureProvider]] provides signature for a logical plan based on
* the type of operators in it.
* A plan needs to have at least one operator so its signature can be generated.
*
* @param fbf [[FingerprintBuilderFactory]] used for building fingerprints
*/
class PlanSignatureProvider extends LogicalPlanSignatureProvider {
class PlanSignatureProvider(fbf: FingerprintBuilderFactory) extends LogicalPlanSignatureProvider {
clee704 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Generate the signature of logical plan.
*
* @param logicalPlan logical plan.
* @return signature if there is at least one operator in the plan; Otherwise None.
*/
def signature(logicalPlan: LogicalPlan): Option[String] = {
var signature = ""
logicalPlan.foreachUp(p => signature = HashingUtils.md5Hex(signature + p.nodeName))
signature match {
case "" => None
case _ => Some(signature)
}
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] = {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
val fb: FingerprintBuilder = fbf.create
logicalPlan.foreachUp(node => fb.add(node.nodeName))
Some(fb.build())
clee704 marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import com.microsoft.hyperspace.index.IndexLogEntryTags.{HYBRIDSCAN_RELATED_CONF
import com.microsoft.hyperspace.index.plans.logical.{BucketUnion, IndexHadoopFsRelation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.HyperspaceConf
import com.microsoft.hyperspace.util.fingerprint.Fingerprint

object RuleUtils {

Expand All @@ -54,7 +55,7 @@ object RuleUtils {
indexes: Seq[IndexLogEntry],
relation: FileBasedRelation): Seq[IndexLogEntry] = {
// Map of a signature provider to a signature generated for the given plan.
val signatureMap = mutable.Map[String, Option[String]]()
val signatureMap = mutable.Map[String, Option[Fingerprint]]()

def signatureValid(entry: IndexLogEntry): Boolean = {
entry.withCachedTag(relation.plan, IndexLogEntryTags.SIGNATURE_MATCHED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation}
import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.HashingUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[DefaultFileBasedSource]]
Expand All @@ -41,14 +41,15 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe

/**
* Computes the signature of the current relation.
*
* @param fb [[FingerprintBuilder]] used for building fingerprints
*/
override def signature: String = plan.relation match {
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.relation match {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) =>
val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") {
(acc: String, f: FileStatus) =>
HashingUtils.md5Hex(acc + fingerprint(f))
}
result
Comment on lines -47 to -51
Copy link
Author

@clee704 clee704 Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was the reason why we should change the return type to Option. Previously it can return an empty string, which is interpreted by the calling side as "the signature is/cannot be not computed". Because it is hard to know just looking at the method signature, it is better to make it explicit here by returning None in such cases.

val initialFingerprint = fb.build()
var fingerprint = initialFingerprint
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not tested code

Suggested change
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb))
val fingerprint = filesFromIndex(location).foldLeft(initialFingerprint){ (fp, file) =>
fp ^ createFingerprint(file, fb))
}

Some(fingerprint).filter(_ != initialFingerprint)
}

/**
Expand Down Expand Up @@ -179,9 +180,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe
}
}

private def fingerprint(fileStatus: FileStatus): String = {
fileStatus.getLen.toString + fileStatus.getModificationTime.toString +
fileStatus.getPath.toString
private def createFingerprint(fileStatus: FileStatus, fb: FingerprintBuilder): Fingerprint = {
fb.add(fileStatus.getLen)
.add(fileStatus.getModificationTime)
.add(fileStatus.getPath.toString)
.build()
}

private def filesFromIndex(index: PartitioningAwareFileIndex): Seq[FileStatus] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexLogEntry, Relation}
import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils}
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[DeltaLakeFileBasedSource]]
Expand All @@ -36,10 +37,12 @@ class DeltaLakeRelation(spark: SparkSession, override val plan: LogicalRelation)

/**
* Computes the signature of the current relation.
*
* @param fb [[FingerprintBuilder]] used for building fingerprints
*/
override def signature: String = plan.relation match {
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.relation match {
clee704 marked this conversation as resolved.
Show resolved Hide resolved
case HadoopFsRelation(location: TahoeLogFileIndex, _, _, _, _, _) =>
location.tableVersion + location.path.toString
Some(fb.add(location.tableVersion).add(location.path.toString).build())
clee704 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.types.StructType
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, IndexConstants, Relation}
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* Implementation for file-based relation used by [[IcebergFileBasedSource]]
Expand Down Expand Up @@ -61,9 +62,14 @@ class IcebergRelation(

/**
* Computes the signature of the current relation.
*
* @param fb [[FingerprintBuilder]] used for building fingerprints
*/
override def signature: String = {
snapshotId.getOrElse(table.currentSnapshot().snapshotId()).toString + table.location()
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = {
Some(
fb.add(snapshotId.getOrElse(table.currentSnapshot().snapshotId()).toString)
.add(table.location)
.build())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation,
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.{FileIdTracker, FileInfo, IndexConstants, IndexLogEntry, Relation}
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder}

/**
* ::Experimental::
Expand Down Expand Up @@ -67,8 +68,14 @@ trait FileBasedRelation extends SourceRelation {
*
* This API is used when the signature of source needs to be computed, e.g., creating an index,
* computing query plan's signature, etc.
*
* If it is not possible to compute the signature (e.g. there are no files left),
* the implementation might return None.
*
* @param fb [[FingerprintBuilder]] used for building fingerprints.
* Use it to compute the signature from discriminating properties of the relation.
*/
def signature: String
def signature(fb: FingerprintBuilder): Option[Fingerprint]
clee704 marked this conversation as resolved.
Show resolved Hide resolved

/**
* FileStatus list for all source files that the current relation references to.
Expand Down
35 changes: 0 additions & 35 deletions src/main/scala/com/microsoft/hyperspace/util/HashingUtils.scala

This file was deleted.

Loading