-
Notifications
You must be signed in to change notification settings - Fork 114
Refactor hashing and fingerprint functions #404
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,8 +21,12 @@ import scala.util.{Success, Try} | |
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.util.hyperspace.Utils | ||
|
||
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilderFactory, MD5FingerprintBuilderFactory} | ||
|
||
/** | ||
* This trait contains the interface that provides the signature of logical plan. | ||
* | ||
* The implementation must have a constructor taking [[FingerprintBuilderFactory]] as an argument. | ||
*/ | ||
trait LogicalPlanSignatureProvider { | ||
|
||
|
@@ -36,15 +40,17 @@ trait LogicalPlanSignatureProvider { | |
* @param logicalPlan logical plan. | ||
* @return signature if it can be computed w.r.t signature provider assumptions; Otherwise None. | ||
*/ | ||
def signature(logicalPlan: LogicalPlan): Option[String] | ||
def signature(logicalPlan: LogicalPlan): Option[Fingerprint] | ||
} | ||
|
||
/** | ||
* Factory object for LogicalPlanSignatureProvider. | ||
*/ | ||
object LogicalPlanSignatureProvider { | ||
private val fbf: FingerprintBuilderFactory = new MD5FingerprintBuilderFactory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about adding "algorithm" in IndexLogEntry? for clarity
|
||
|
||
// Creates a default signature provider. | ||
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider | ||
def create(): LogicalPlanSignatureProvider = new IndexSignatureProvider(fbf) | ||
|
||
/** | ||
* Creates a parameterized signature provider. | ||
|
@@ -53,7 +59,11 @@ object LogicalPlanSignatureProvider { | |
* @return signature provider. | ||
*/ | ||
def create(name: String): LogicalPlanSignatureProvider = { | ||
Try(Utils.classForName(name).newInstance) match { | ||
Try( | ||
Utils | ||
.classForName(name) | ||
.getConstructor(classOf[FingerprintBuilderFactory]) | ||
.newInstance(fbf)) match { | ||
case Success(provider: LogicalPlanSignatureProvider) => provider | ||
case _ => | ||
throw new IllegalArgumentException( | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -30,7 +30,7 @@ import com.microsoft.hyperspace.HyperspaceException | |||||||||
import com.microsoft.hyperspace.index.{Content, FileIdTracker, Hdfs, Relation} | ||||||||||
import com.microsoft.hyperspace.index.IndexConstants.GLOBBING_PATTERN_KEY | ||||||||||
import com.microsoft.hyperspace.index.sources.FileBasedRelation | ||||||||||
import com.microsoft.hyperspace.util.HashingUtils | ||||||||||
import com.microsoft.hyperspace.util.fingerprint.{Fingerprint, FingerprintBuilder} | ||||||||||
|
||||||||||
/** | ||||||||||
* Implementation for file-based relation used by [[DefaultFileBasedSource]] | ||||||||||
|
@@ -41,14 +41,15 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe | |||||||||
|
||||||||||
/** | ||||||||||
* Computes the signature of the current relation. | ||||||||||
* | ||||||||||
* @param fb [[FingerprintBuilder]] used for building fingerprints | ||||||||||
*/ | ||||||||||
override def signature: String = plan.relation match { | ||||||||||
override def signature(fb: FingerprintBuilder): Option[Fingerprint] = plan.relation match { | ||||||||||
clee704 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
case HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _) => | ||||||||||
val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") { | ||||||||||
(acc: String, f: FileStatus) => | ||||||||||
HashingUtils.md5Hex(acc + fingerprint(f)) | ||||||||||
} | ||||||||||
result | ||||||||||
Comment on lines
-47
to
-51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was the reason why we should change the return type to Option. Previously it can return an empty string, which is interpreted by the calling side as "the signature is/cannot be not computed". Because it is hard to know just looking at the method signature, it is better to make it explicit here by returning None in such cases. |
||||||||||
val initialFingerprint = fb.build() | ||||||||||
var fingerprint = initialFingerprint | ||||||||||
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb)) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not tested code
Suggested change
|
||||||||||
Some(fingerprint).filter(_ != initialFingerprint) | ||||||||||
} | ||||||||||
|
||||||||||
/** | ||||||||||
|
@@ -179,9 +180,11 @@ class DefaultFileBasedRelation(spark: SparkSession, override val plan: LogicalRe | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
private def fingerprint(fileStatus: FileStatus): String = { | ||||||||||
fileStatus.getLen.toString + fileStatus.getModificationTime.toString + | ||||||||||
fileStatus.getPath.toString | ||||||||||
private def createFingerprint(fileStatus: FileStatus, fb: FingerprintBuilder): Fingerprint = { | ||||||||||
fb.add(fileStatus.getLen) | ||||||||||
.add(fileStatus.getModificationTime) | ||||||||||
.add(fileStatus.getPath.toString) | ||||||||||
.build() | ||||||||||
} | ||||||||||
|
||||||||||
private def filesFromIndex(index: PartitioningAwareFileIndex): Seq[FileStatus] = { | ||||||||||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not tested code