-
Notifications
You must be signed in to change notification settings - Fork 114
Refactor hashing and fingerprint functions #404
base: master
Are you sure you want to change the base?
Conversation
val result = filesFromIndex(location).sortBy(_.getPath.toString).foldLeft("") { | ||
(acc: String, f: FileStatus) => | ||
HashingUtils.md5Hex(acc + fingerprint(f)) | ||
} | ||
result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was the reason why we should change the return type to Option. Previously it can return an empty string, which is interpreted by the calling side as "the signature is/cannot be not computed". Because it is hard to know just looking at the method signature, it is better to make it explicit here by returning None in such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Please update the documentation on methods where new parameters were added.
- In the cases where only
Some
is returned let's drop it.
src/main/scala/com/microsoft/hyperspace/index/IndexSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/IndexSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/PlanSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/PlanSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/PlanSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelation.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/sources/iceberg/IcebergRelation.scala
Outdated
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/util/fingerprint/FingerprintBuilderFactory.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you resolve the conflict & share some simple benchmark result (createIndex / apply index with >100k source files?) Thanks!
If this is a breaking change that will make the already created indexes useless, I would like to throw in some common sense questions that should be addressed in a way or another:
I know these are a lot of questions and as good engineers we need to have a bit of "client-centric" mind to create a useful and easy-to-use product. @rapoth, @apoorvedave1, @pirz, @imback82 WDYT? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one note though: it seems that there are a couple of places where signature
method returns Some
always. I know that the signature
interface defines the return as Option
but maybe is a good time to think about it. Maybe that Option
can be removed from the interface too.
src/main/scala/com/microsoft/hyperspace/index/IndexSignatureProvider.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/PlanSignatureProvider.scala
Show resolved
Hide resolved
Thanks @andrei-ionescu for rasing these questions! Here's my take on this:
My thinking is that we should not block on a migration utility for this upcoming release. However, I requested @clee704 to write-up a design proposal for how such a migration utility would be designed along with all the dimensions we would have to cover. @clee704 should be able to get to that soon. Once it passes vote, it will be open for one of the folks to take up in the coming weeks. That being said, if you are affected by this breaking change or one of your users/customers is, please let us know. Currently, I have already provided a heads-up to the customers I know who are using Hyperspace in production and they were fine with it.
@clee704 @sezruby Could we verify what happens and add the appropriate error message + steps they would have to take to overcome the problem?
No but now is probably a good time to discuss and document this. In general, as long as we do not announce a v1.0, we do not guarantee backward/forward compatibility and do not recommend the code to be used in production. This may not have anything to do with the quality of the code but rather stability (as customers provide us with more and more feature requests, we are trying to establish a good foundation so we can avoid major breaking changes in the future). As we get closer to v1.0, we will follow semantic versioning guidelines (i.e., everything within a major version will be forward/backward compatible but across major versions, we may break things but we will provide a migration utility by then).
It will be documented in the release notes of this specific version and the frequently-asked-questions section. Suggestions are welcome! |
Thanks for putting up your thoughts.
I agree that we should not block this PR by not having the migration tool - this is acceptable now as Hyperspace is at the beginning. It would be great though to have at least a ticket/issue/proposal/feature to describe the gap and put together a plan to fill the gap. I’m glad we are on the same page.
Currently no major impact. I do have a few indexes created on big datasets 15TB and more or which I will be forced to recreate when you upgrading. |
Thanks @andrei-ionescu! I've requested @clee704 to do this. Once he wraps up his current work, he will write up a proposal . Here's the feature request I just opened #420 - please feel free to add anything else I may have missed directly in that issue. |
If this signature change is the only breaking change of v0.5, then it should be easy to update the signatures of indices from old systems as we have a version string in the serialized index log entries. This will be addressed as part of #420. Perhaps a simple migration utility to address this could be part of v0.5. |
Fixes microsoft#271. With newly added classes for fingerprinting, now it is easy to make a fingerprint from various types of data. FingerprintBuilder can be used to build a fingerprint from data. FileBasedRelation.signature now takes it as an argument, so the implementations don't have to do the hashing themselves. They can just use the provided builder. For unorderd combining, one can use bitwise XOR to combine multiple fingerprints. This way, the order becomes irrelavant.
Spark was too slow to create such many files, so I did measurement for the Below are the elapsed times of the
As you can see, the performance of The results can vary depending on many other factors such as file path lengths and the format of file names. In this measurement I assumed a single root directory whose path length is 64 and random file names of length 67 (which is the default length of parquet files in Spark). Here's the source for the test: import scala.util.Random
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedRelation
import com.microsoft.hyperspace.util.fingerprint.MD5FingerprintBuilderFactory
class PerformanceTest extends SparkFunSuite {
test("DefaultFileBasedRelation.signature performance") {
for (numFiles <- Seq(10000, 100000, 1000000)) {
val repetition = 10
val measurements = (1 to repetition).map(_ => runTest(numFiles))
println(s"numFiles = $numFiles")
println(s"max = ${measurements.max / 1000000.0} ms")
println(s"min = ${measurements.min / 1000000.0} ms")
println(s"avg = ${measurements.sum / measurements.length / 1000000.0} ms")
}
}
def runTest(numFiles: Int): Long = {
val fileBasedRel = getDefaultFileBasedRelation(numFiles)
val start = System.nanoTime()
val sig = fileBasedRel.signature(fb)
// val sig = fileBasedRel.signature
val end = System.nanoTime()
assert(sig.isDefined)
// assert(sig.nonEmpty)
end - start
}
def getDefaultFileBasedRelation(numFiles: Int): DefaultFileBasedRelation = {
val files = (1 to numFiles).map(_ => getRandomFileStatus())
val location = FakeFileIndex(files)(spark)
val parquet = new ParquetFileFormat()
val st = new StructType()
val rel = HadoopFsRelation(location, st, st, None, parquet, Map())(spark)
val logicalRel = LogicalRelation(rel)
new DefaultFileBasedRelation(spark, logicalRel)
}
def getRandomFileStatus(): FileStatus = {
val length = rand.nextLong()
val modificationTime = rand.nextLong()
val name = getRandomAlphanumericString(67)
val path = new Path(s"/$dir", name)
new FileStatus(length, false, 3, 128 * 1024 * 1024, modificationTime, path)
}
def getRandomAlphanumericString(length: Int): String = {
val chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
val s = new Array[Char](length)
for (i <- 0 until length) s(i) = chars(rand.nextInt(chars.length))
new String(s)
}
Logger.getLogger("org").setLevel(Level.OFF)
lazy val spark = SparkSession.builder().master("local").getOrCreate()
lazy val rand = new Random(42)
lazy val dir = getRandomAlphanumericString(64)
lazy val fb = new MD5FingerprintBuilderFactory().create
case class FakeFileIndex(files: Seq[FileStatus])(sparkSession: SparkSession)
extends PartitioningAwareFileIndex(sparkSession, Map(), None) {
override def allFiles(): Seq[FileStatus] = files
override def partitionSpec() = ???
override def leafFiles = ???
override def leafDirToChildrenFiles = ???
override def rootPaths = ???
override def refresh() = ???
}
} |
* without actually comparing them. | ||
* | ||
* The value of a fingerprint is typically calculated by a hash function. To | ||
* function as a fingerprinting function, the hash function must distribute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you revise this comment? is it
As a fingerprinting function, the hash ..
?
result | ||
val initialFingerprint = fb.build() | ||
var fingerprint = initialFingerprint | ||
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not tested code
filesFromIndex(location).foreach(fingerprint ^= createFingerprint(_, fb)) | |
val fingerprint = filesFromIndex(location).foldLeft(initialFingerprint){ (fp, file) => | |
fp ^ createFingerprint(file, fb)) | |
} |
val provider = Hyperspace.getContext.sourceProviderManager | ||
var fingerprint = "" | ||
val fb: FingerprintBuilder = fbf.create | ||
var updated = false | ||
logicalPlan.foreachUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not tested code
logicalPlan.foreachUp { | |
val hasSupportedRelations = logicalPlan.collect { | |
case l: LeafNode if provider.isSupportedRelation(l) => | |
provider.getRelation(l).signature(fb).foreach(fb.add(_)) | |
true | |
} | |
if (hasSupportedRelation.nonEmpty()) Some(fb.build()) else None |
} | ||
|
||
/** | ||
* Factory object for LogicalPlanSignatureProvider. | ||
*/ | ||
object LogicalPlanSignatureProvider { | ||
private val fbf: FingerprintBuilderFactory = new MD5FingerprintBuilderFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding "algorithm" in IndexLogEntry? for clarity
"fingerprint" : {
"properties" : {
"signatures" : [ {
"algorithm":"md5",
"provider" : "com.microsoft.hyperspace.index.IndexSignatureProvider",
"value" : "5269971142c4be0c144c61130cde5543"
} ]
},
"kind" : "LogicalPlan"
}
Fixes #271.
With newly added classes for fingerprinting, now it is easy to make a
fingerprint from various types of data.
FingerprintBuilder can be used to build a fingerprint from data.
FileBasedRelation.signature now takes it as an argument, so the
implementations don't have to do the hashing themselves. They can just
use the provided builder.
For unorderd combining, one can use bitwise XOR to combine multiple
fingerprints. This way, the order becomes irrelevant and can be used for
unordered collections like Set.
What is the context for this pull request?
What changes were proposed in this pull request?
Does this PR introduce any user-facing change?
The JSON format remains the same as well as MD5. However, the way the signatures are computed is changed to accommodate more efficient computation. Therefore, existing signatures won't match the new one. This is a breaking change as #268 was.
For library authors and developers who want to extend Hyperspace, the API is incompatible and must be adapted.
How was this patch tested?
Unit test