diff --git a/python/hyperspace/tests/test_indexmanagement.py b/python/hyperspace/tests/test_indexmanagement.py index a360ae731..30e75991d 100644 --- a/python/hyperspace/tests/test_indexmanagement.py +++ b/python/hyperspace/tests/test_indexmanagement.py @@ -38,44 +38,50 @@ def test_index_delete(self): idx_config = IndexConfig('idx2', ['name'], ['age']) self.hyperspace.createIndex(self.df, idx_config) self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx2" and state = "ACTIVE" """).count(), 1) + """name = "idx2" and state = "ACTIVE" """).count(), 1) self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx2" and state = "DELETED" """).count(), 0) + """name = "idx2" and state = "DELETED" """).count(), 0) self.hyperspace.deleteIndex("idx2") self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx2" and state = "DELETED" """).count(), 1) + """name = "idx2" and state = "DELETED" """).count(), 1) self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx2" and state = "ACTIVE" """).count(), 0) + """name = "idx2" and state = "ACTIVE" """).count(), 0) def test_index_restore(self): idx_config = IndexConfig('idx3', ['name'], ['age']) self.hyperspace.createIndex(self.df, idx_config) self.hyperspace.deleteIndex("idx3") self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx3" and state = "DELETED" """).count(), 1) + """name = "idx3" and state = "DELETED" """).count(), 1) self.hyperspace.restoreIndex("idx3") self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx3" and state = "ACTIVE" """).count(), 1) + """name = "idx3" and state = "ACTIVE" """).count(), 1) self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx3" and state = "DELETED" """).count(), 0) + """name = "idx3" and state = "DELETED" """).count(), 0) def test_index_vacuum(self): idx_config = IndexConfig('idx4', ['name'], ['age']) self.hyperspace.createIndex(self.df, idx_config) self.hyperspace.deleteIndex("idx4") self.assertEqual(self.hyperspace.indexes().filter( - """name = "idx4" and state = "DELETED" """).count(), 1) + """name = "idx4" and state = "DELETED" """).count(), 1) self.hyperspace.vacuumIndex("idx4") self.assertEqual(self.hyperspace.indexes().filter("""name = "idx4" """).count(), 0) - def test_index_refresh(self): + # vacuuming of active index leaves the index as active + idx_config = IndexConfig('idx5', ['name'], ['age']) + self.hyperspace.createIndex(self.df, idx_config) + self.assertEqual(self.hyperspace.indexes().filter( + """name = "idx5" and state = "ACTIVE" """).count(), 1) + + def test_index_refresh_incremental(self): idx_config = IndexConfig('idx1', ['name'], ['age']) self.hyperspace.createIndex(self.df, idx_config) # Test the inter-op works fine for refreshIndex. self.hyperspace.refreshIndex('idx1') self.hyperspace.refreshIndex('idx1', 'incremental') - def test_index_refresh(self): + def test_index_refresh_full(self): idx_config = IndexConfig('idx1', ['name'], ['age']) self.hyperspace.createIndex(self.df, idx_config) # Test the inter-op works fine for optimizeIndex. @@ -89,6 +95,7 @@ def test_index_metadata(self): df = self.hyperspace.index('idx1') df.show() + hyperspace_test = unittest.TestLoader().loadTestsFromTestCase(HyperspaceIndexManagementTests) result = unittest.TextTestRunner(verbosity=3).run(hyperspace_test) sys.exit(not result.wasSuccessful()) diff --git a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala index c55191744..25fedc86f 100644 --- a/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala +++ b/src/main/scala/com/microsoft/hyperspace/Hyperspace.scala @@ -65,9 +65,10 @@ class Hyperspace(spark: SparkSession) { } /** - * Does hard delete of indexes marked as `DELETED`. + * Does hard delete of the entire indexes if it is marked as `DELETED`. + * Does clean up index (hard delete of the old indexes) if the index is 'ACTIVE'. * - * @param indexName Name of the index to restore. + * @param indexName Name of the index to vacuum. */ def vacuumIndex(indexName: String): Unit = { indexManager.vacuum(indexName) diff --git a/src/main/scala/com/microsoft/hyperspace/actions/Constants.scala b/src/main/scala/com/microsoft/hyperspace/actions/Constants.scala index 45cba9633..c2b7166ca 100644 --- a/src/main/scala/com/microsoft/hyperspace/actions/Constants.scala +++ b/src/main/scala/com/microsoft/hyperspace/actions/Constants.scala @@ -24,6 +24,7 @@ object Constants { val DELETED = "DELETED" val REFRESHING = "REFRESHING" val VACUUMING = "VACUUMING" + val VACUUMINGOUTDATED = "VACUUMINGOUTDATED" val RESTORING = "RESTORING" val OPTIMIZING = "OPTIMIZING" val DOESNOTEXIST = "DOESNOTEXIST" diff --git a/src/main/scala/com/microsoft/hyperspace/actions/VacuumOutdatedAction.scala b/src/main/scala/com/microsoft/hyperspace/actions/VacuumOutdatedAction.scala new file mode 100644 index 000000000..d1ac723c7 --- /dev/null +++ b/src/main/scala/com/microsoft/hyperspace/actions/VacuumOutdatedAction.scala @@ -0,0 +1,144 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.hadoop.fs.Path + +import com.microsoft.hyperspace.{Hyperspace, HyperspaceException} +import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, VACUUMINGOUTDATED} +import com.microsoft.hyperspace.index.{IndexConstants, IndexDataManager, IndexLogEntry, IndexLogManager, LogEntry} +import com.microsoft.hyperspace.index.sources.delta.DeltaLakeRelationMetadata +import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, VacuumOutdatedActionEvent} +import com.microsoft.hyperspace.util.FileUtils + +/** + * Vacuum outdated data of indexes. + * + * Algorithm: + * - Delete every version except the latest versions. + */ +class VacuumOutdatedAction( + final override protected val logManager: IndexLogManager, + dataManager: IndexDataManager) + extends Action { + private lazy val previousIndexLogEntry = { + logManager.getLog(baseId) match { + case Some(e: IndexLogEntry) => e + case _ => + throw HyperspaceException("LogEntry must exist for vacuum outdated operation.") + } + } + + final override lazy val logEntry: LogEntry = { + previousIndexLogEntry.relations match { + case null => previousIndexLogEntry + + case relations if relations.nonEmpty => + val relationMetadata = Hyperspace + .getContext(spark) + .sourceProviderManager + .getRelationMetadata(relations.head) + + val updatedDerivedDataset = relationMetadata match { + case deltaLakeRelationMetadata: DeltaLakeRelationMetadata => + // Reset Delta Lake version mapping. + val resetProperty = deltaLakeRelationMetadata.resetDeltaVersionHistory( + previousIndexLogEntry.derivedDataset.properties) + + val newProperty = deltaLakeRelationMetadata.enrichIndexProperties( + resetProperty + (IndexConstants.INDEX_LOG_VERSION -> endId.toString)) + + previousIndexLogEntry.derivedDataset.withNewProperties(newProperty) + case _ => previousIndexLogEntry.derivedDataset + } + previousIndexLogEntry.copy(derivedDataset = updatedDerivedDataset) + + case _ => previousIndexLogEntry + } + } + + override def transientState: String = VACUUMINGOUTDATED + + override def finalState: String = ACTIVE + + override def validate(): Unit = { + if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) { + throw HyperspaceException( + s"VacuumOutdated is only supported in $ACTIVE state. " + + s"Current state is ${previousIndexLogEntry.state}.") + } + } + + final override def op(): Unit = { + // Delete unused directory first, then delete unused files in used directories. + val indexVersionsInUse: Set[Int] = logEntry match { + case indexLogEntry: IndexLogEntry => + dataVersionInfos(indexLogEntry) + + case other => + throw HyperspaceException( + s"VacuumOutdated is not supported for log entry class ${other.getClass.getName}") + } + + // Delete version directories not used. + dataManager.getAllVersionIds().foreach { id => + if (!indexVersionsInUse.contains(id)) { + dataManager.delete(id) + } + } + + val filesInUse = logEntry match { + case indexLogEntry: IndexLogEntry => + indexLogEntry.content.fileInfos.map { info => + info.name + } + } + + // Delete unused files. + dataManager.getAllFilePaths().foreach { path => + // Ignore files such as "_SUCCESS" and "._SUCCESS.crc". + if (!path.getName.startsWith("_") && + !path.getName.startsWith("._") && + !filesInUse.contains(path.toString)) { + FileUtils.delete(path) + } + } + } + + /** + * Extracts latest versions of an index. + * + * @return List of directory paths containing index files for latest index version. + */ + private[actions] def dataVersionInfos(entry: IndexLogEntry): Set[Int] = { + // Get used versions using the filenames of contents. + // length + 1 due to '=' between prefix and version number. + val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1 + entry + .indexDataDirectoryPaths() + .map(dirname => new Path(dirname).getName) + .collect { + case name if name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) => + name.drop(prefixLength).toInt + } + .toSet + } + + override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = { + VacuumOutdatedActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message) + } +} diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala index 643025544..3281a42f6 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexCollectionManager.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.internal.SQLConf import com.microsoft.hyperspace.HyperspaceException import com.microsoft.hyperspace.actions._ -import com.microsoft.hyperspace.actions.Constants.States.DOESNOTEXIST +import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, DOESNOTEXIST} import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK} class IndexCollectionManager( @@ -60,13 +60,23 @@ class IndexCollectionManager( } override def vacuum(indexName: String): Unit = { + // Note that the behavior of vacuum index is different when the state is ACTIVE. + // The event that action creates is also different. + withLogManager(indexName) { logManager => val hadoopConf = spark.sessionState.newHadoopConf() val indexPath = PathResolver(spark.sessionState.conf, hadoopConf) .getIndexPath(indexName) val dataManager = indexDataManagerFactory.create(indexPath, hadoopConf) - new VacuumAction(logManager, dataManager).run() + + logManager.getLatestLog() match { + case Some(index) if index.state == ACTIVE => + // clean up only if state is ACTIVE. + new VacuumOutdatedAction(logManager, dataManager).run() + case _ => + new VacuumAction(logManager, dataManager).run() + } } } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexDataManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexDataManager.scala index a79433091..f6f84f1a5 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexDataManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexDataManager.scala @@ -36,8 +36,12 @@ import com.microsoft.hyperspace.util.FileUtils * f1.parquet */ trait IndexDataManager { + def getAllFilePaths(): Seq[Path] + def getLatestVersionId(): Option[Int] + def getAllVersionIds(): Seq[Int] + def getPath(id: Int): Path def delete(id: Int): Unit @@ -48,24 +52,54 @@ class IndexDataManagerImpl(indexPath: Path, configuration: Configuration) // TODO: Investigate whether FileContext should be used instead of FileSystem for atomic renames. private lazy val fs: FileSystem = indexPath.getFileSystem(configuration) + /** + * Get latest version id of the index data directory. + */ + override def getLatestVersionId(): Option[Int] = { + val ids = getAllVersionIds() + if (ids.isEmpty) None else Some(ids.max) + } + /** * This method relies on the naming convention that directory name will be similar to hive - * partitioning scheme, i.e. "root/v__=value/f1.parquet" etc. Here the value represents the + * partitioning scheme, i.e. {{{"root/v__=value/f1.parquet"}}} etc. Here the value represents the * version id of the data. */ - override def getLatestVersionId(): Option[Int] = { + override def getAllVersionIds(): Seq[Int] = { if (!fs.exists(indexPath)) { - return None + return Nil } val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1 - val ids = fs.listStatus(indexPath).collect { + fs.listStatus(indexPath) + .collect { + case status + if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) => + status.getPath.getName.drop(prefixLength).toInt + } + } + + /** + * Get all file paths in the index directory. + */ + override def getAllFilePaths(): Seq[Path] = { + if (!fs.exists(indexPath)) { + return Nil + } + val directories = fs.listStatus(indexPath).collect { case status if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) => - status.getPath.getName.drop(prefixLength).toInt + status.getPath + } + directories.flatMap { dir => + fs.listStatus(dir).collect { + case status => status.getPath + } } - if (ids.isEmpty) None else Some(ids.max) } + /** + * Get directory path of the given id. + */ override def getPath(id: Int): Path = { new Path(indexPath, s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=${id.toString}") } diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala index 5257fa530..085cbde4f 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala @@ -513,6 +513,24 @@ case class IndexLogEntry( (name, derivedDataset, signature, content).hashCode } + /** + * Extracts paths to top-level directories paths which + * contain the latest version index files. + * + * @return List of directory paths containing index files for latest index version. + */ + def indexDataDirectoryPaths(): Seq[String] = { + var prefix = content.root.name + var directory = content.root + while (directory.subDirs.size == 1 && + !directory.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) { + prefix += s"${directory.subDirs.head.name}/" + directory = directory.subDirs.head + } + + directory.subDirs.map(d => s"$prefix${d.name}") + } + /** * A mutable map for holding auxiliary information of this index log entry while applying rules. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala index 2990f24f1..2c6e79c2a 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexManager.scala @@ -54,7 +54,9 @@ trait IndexManager { def restore(indexName: String): Unit /** - * Does hard delete of indexes marked as `DELETED`. Once vacuumed, an index can't be 'restore'd. + * If the index is marked as `DELETED`, does hard delete of indexes while does + * If it is 'ACTIVE', does clean up of indexes (hard delete of unused index files). + * Once vacuumed, hard deleted index files can't be 'restore'd. * * @param indexName Name of the index to vacuum. */ diff --git a/src/main/scala/com/microsoft/hyperspace/index/IndexStatistics.scala b/src/main/scala/com/microsoft/hyperspace/index/IndexStatistics.scala index b5ee36657..b35525e0c 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/IndexStatistics.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/IndexStatistics.scala @@ -139,15 +139,7 @@ private[hyperspace] object IndexStatistics { * @return List of directory paths containing index files for latest index version. */ private def getIndexContentDirectoryPaths(entry: IndexLogEntry): Seq[String] = { - var root = entry.content.root - var prefix = entry.content.root.name - while (root.subDirs.size == 1 && - !root.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) { - prefix += s"${root.subDirs.head.name}/" - root = root.subDirs.head - } - - root.subDirs.map(d => s"$prefix${d.name}") + entry.indexDataDirectoryPaths() } /** diff --git a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala index 3779577f3..01380f149 100644 --- a/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala +++ b/src/main/scala/com/microsoft/hyperspace/index/sources/delta/DeltaLakeRelationMetadata.scala @@ -57,5 +57,15 @@ class DeltaLakeRelationMetadata(metadata: Relation) extends FileBasedRelationMet properties ++ deltaVerHistory } + /** + * Remove DELTA_VERSION_HISTORY_PROPERTY from properties. + * + * @param properties Index properties to reset. + * @return Updated index properties for vacuum outdated data. + */ + def resetDeltaVersionHistory(properties: Map[String, String]): Map[String, String] = { + properties - DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY + } + override def canSupportUserSpecifiedSchema: Boolean = false } diff --git a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala index 44e9a4577..59c85513a 100644 --- a/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala +++ b/src/main/scala/com/microsoft/hyperspace/telemetry/HyperspaceEvent.scala @@ -84,6 +84,16 @@ case class RestoreActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: S case class VacuumActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) extends HyperspaceIndexCRUDEvent +/** + * Deletion of old index files event. Emitted when vacuum is called on an ACTIVE index. + * + * @param appInfo AppInfo for spark application. + * @param index Related index. + * @param message Message about event. + */ +case class VacuumOutdatedActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String) + extends HyperspaceIndexCRUDEvent + /** * Index Refresh Event. Emitted when refresh is called on an index. * diff --git a/src/test/scala/com/microsoft/hyperspace/actions/VacuumOutdatedActionTest.scala b/src/test/scala/com/microsoft/hyperspace/actions/VacuumOutdatedActionTest.scala new file mode 100644 index 000000000..aa661d6fd --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/actions/VacuumOutdatedActionTest.scala @@ -0,0 +1,156 @@ +/* + * Copyright (2020) The Hyperspace Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.actions + +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkFunSuite +import org.mockito.ArgumentMatchers.anyInt +import org.mockito.Mockito.{mock, verify, when} +import org.mockito.internal.verification.Times + +import com.microsoft.hyperspace.{HyperspaceException, SparkInvolvedSuite} +import com.microsoft.hyperspace.actions.Constants.States._ +import com.microsoft.hyperspace.index.{Content, Directory, FileInfo, IndexConstants, IndexDataManager, IndexLogEntry, IndexLogManager} +import com.microsoft.hyperspace.index.IndexConstants.UNKNOWN_FILE_ID +import com.microsoft.hyperspace.index.covering.CoveringIndex + +class VacuumOutdatedActionTest extends SparkFunSuite with SparkInvolvedSuite { + private val mockLogManager: IndexLogManager = mock(classOf[IndexLogManager]) + private val mockDataManager: IndexDataManager = mock(classOf[IndexDataManager]) + private val mockIndexLogEntry: IndexLogEntry = mock(classOf[IndexLogEntry]) + private val mockContent: Content = mock(classOf[Content]) + + override def beforeAll(): Unit = { + super.beforeAll() + when(mockLogManager.getLatestId()).thenReturn(None) + } + + def versionDirectories(versions: Seq[Int]): Seq[String] = { + versions.map(version => + s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version") + } + + test("validate() passes if old index logs are found with ACTIVE state.") { + when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry)) + when(mockIndexLogEntry.state).thenReturn(ACTIVE) + val action = new VacuumOutdatedAction(mockLogManager, mockDataManager) + // No exception thrown is considered a pass + action.validate() + } + + test("validate() fails if old index logs found with non-ACTIVE state") { + when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry)) + when(mockIndexLogEntry.state).thenReturn(CREATING) + val action = new VacuumOutdatedAction(mockLogManager, mockDataManager) + val ex = intercept[HyperspaceException](action.validate()) + assert( + ex.getMessage.contains( + "VacuumOutdated is only supported in ACTIVE state. Current state is CREATING.")) + } + + test("op() calls which deletes nothing since every data is up-to-date") { + val pathPrefix: String = s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=2" + val sampleFileName = s"$pathPrefix/part-00053-.c000.snappy.parquet" + val sampleFilePath = new Path(sampleFileName) + + when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry)) + + when(mockDataManager.getAllVersionIds()).thenReturn(Seq(0, 1, 2)) + when(mockDataManager.getAllFilePaths()).thenReturn(Seq(sampleFilePath)) + + when(mockIndexLogEntry.indexDataDirectoryPaths()) + .thenReturn(versionDirectories(Seq(0, 1, 2))) + when(mockIndexLogEntry.content).thenReturn(mockContent) + + when(mockContent.fileInfos).thenReturn(Set(FileInfo(sampleFileName, 0, 0, 0))) + + val action = new VacuumOutdatedAction(mockLogManager, mockDataManager) + action.op() + verify(mockDataManager, new Times(0)).delete(-1) + verify(mockDataManager, new Times(0)).delete(0) + verify(mockDataManager, new Times(0)).delete(1) + verify(mockDataManager, new Times(0)).delete(2) + verify(mockDataManager, new Times(0)).delete(3) + } + + test("op() calls delete for all outdated data") { + val pathPrefix: String = s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=2" + val sampleFileName1 = s"$pathPrefix/part-00053-.c000.snappy.parquet" + val sampleFilePath1 = new Path(sampleFileName1) + val sampleFileName2 = s"$pathPrefix/part-00027-.c000.snappy.parquet" + + val sampleFilePath2 = new Path(sampleFileName2) + + when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry)) + + when(mockDataManager.getAllVersionIds()).thenReturn(Seq(0, 1, 2, 3)) + when(mockDataManager.getAllFilePaths()).thenReturn(Seq(sampleFilePath1, sampleFilePath2)) + + when(mockIndexLogEntry.indexDataDirectoryPaths()).thenReturn(versionDirectories(Seq(2, 3))) + when(mockIndexLogEntry.content).thenReturn(mockContent) + + when(mockContent.fileInfos).thenReturn( + Set(FileInfo(sampleFileName1, 0, 0, 0), FileInfo(sampleFileName2, 0, 0, 1))) + + val action = new VacuumOutdatedAction(mockLogManager, mockDataManager) + + action.op() + verify(mockDataManager).delete(0) + verify(mockDataManager).delete(1) + verify(mockDataManager, new Times(0)).delete(2) + verify(mockDataManager, new Times(0)).delete(3) + verify(mockDataManager, new Times(0)).delete(-1) + } + + test("versionInfos gets correct version info.") { + val versions = Seq(4, 5) + + val action = new VacuumOutdatedAction(mockLogManager, mockDataManager) + val versionDirectory = + versions.map( + version => + Directory( + s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version", + files = Seq(FileInfo(s"index_$version", 0, 0, UNKNOWN_FILE_ID)))) + + val content = Content( + Directory( + "file:/", + subDirs = Seq(Directory( + "a", + files = + Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)), + subDirs = Seq( + Directory( + "b", + files = + Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)), + subDirs = versionDirectory)))))) + + val entry = IndexLogEntry.create( + "indexName", + CoveringIndex(Seq("col1"), Seq("col2", "col3"), null, 200, Map()), + content, + null, + Map()) + + val expected = versions.toSet + val actual = action.dataVersionInfos(entry) + assert(actual.equals(expected)) + } + +} diff --git a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala index 21d52535f..df207a899 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/DeltaLakeIntegrationTest.scala @@ -21,6 +21,7 @@ import java.sql.Timestamp import scala.collection.mutable import io.delta.tables.DeltaTable +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -28,11 +29,12 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.execution.datasources._ import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig} -import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry +import com.microsoft.hyperspace.TestUtils.{getFileIdTracker, latestIndexLogEntry} import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_QUICK import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation import com.microsoft.hyperspace.index.sources.delta.DeltaLakeRelation import com.microsoft.hyperspace.util.PathUtils +import com.microsoft.hyperspace.util.PathUtils.DataPathFilter class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { override val indexLocationDirName = "deltaLakeIntegrationTest" @@ -457,6 +459,100 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { } } + test("Verify time travel query works well with VacuumIndex.") { + withTempPathAsString { path => + import spark.implicits._ + val df = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks") + df.write.format("delta").save(path) + + val tsMap = mutable.Map[Long, String]() + tsMap.put(0, getSparkFormattedTimestamps(System.currentTimeMillis).head) + + val indexName = "deltaIndex3" + val deltaDf = spark.read.format("delta").load(path) + + appendAndRefresh(df, path, indexName, None, tsMap) // delta version 1 + appendAndRefresh(df, path, indexName, None, tsMap) // delta version 2 + val indexConfig = IndexConfig(indexName, Seq("clicks"), Seq("Query")) + withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") { + hyperspace.createIndex(deltaDf, indexConfig) + } + + withIndex(indexName) { + checkExpectedIndexUsed(indexName, path, None, 1) + + withSQLConf(TestConfig.HybridScanEnabled: _*) { + appendAndRefresh(df, path, indexName, None, tsMap) + // delta version 3, index log version 1. + appendAndRefresh(df, path, indexName, Some("incremental"), tsMap) + // delta version 4, index log version 3 (refresh). + + // Without delta table version, the latest log version should be applied. + checkExpectedIndexUsed(indexName, path, None, 3) + // For delta table version 0, candidate log version is 1. + checkExpectedIndexUsed(indexName, path, Some(0, tsMap(1)), 1) + + appendAndRefresh(df, path, indexName, None, tsMap) + // delta version 5, index log version 3. + appendAndRefresh(df, path, indexName, None, tsMap) + // delta version 6, index log version 3. + appendAndRefresh(df, path, indexName, Some("incremental"), tsMap) + // delta version 7, index log version 5 (refresh). + hyperspace.optimizeIndex(indexName) + // delta version 7, index log version 7 (optimize). + appendAndRefresh(df, path, indexName, Some("incremental"), tsMap) + // delta version 8, index log version 9 (refresh). + hyperspace.optimizeIndex(indexName) + // delta version 8, index log version 11 (optimize). + appendAndRefresh(df, path, indexName, None, tsMap) + // delta version 9, index long version 11. + + val beforeDataFiles = listFiles(path, getFileIdTracker(systemPath, indexConfig)) + val beforeIndexDataFiles = + listFiles(getIndexDataPath(indexName, 0), getFileIdTracker(systemPath, indexConfig)) + + // Calling vacuumIndex on active index deletes outdated data and history. + hyperspace.vacuumIndex(indexName) + + val afterDataFiles = listFiles(path, getFileIdTracker(systemPath, indexConfig)) + val afterIndexDataFiles = + listFiles(getIndexDataPath(indexName, 0), getFileIdTracker(systemPath, indexConfig)) + + // Data files should not affected by vacuum index + assert(beforeDataFiles === afterDataFiles) + // Two files should be deleted in the version 0 index data directory + assert((beforeIndexDataFiles.toSet -- afterIndexDataFiles.toSet).size == 2) + + // These paths should not be deleted + val fs = new Path("/").getFileSystem(new Configuration) + val shouldExistPath = Seq(0, 5) + shouldExistPath.map(idx => { + val indexDataPath = getIndexDataPath(indexName, idx) + assert(fs.exists(new Path(indexDataPath))) + }) + + // These path should be deleted + val shouldDeletedPath = Seq(1, 2, 3, 4) + shouldDeletedPath.map(idx => { + val indexDataPath = getIndexDataPath(indexName, idx) + assert(!fs.exists(new Path(indexDataPath))) + }) + + // Whenever index is used, since every history is also vacuumed, + // expected index version is always 13 (the last version after vacuum outdated). + checkExpectedIndexUsed(indexName, path, None, 13) + checkExpectedIndexUsed(indexName, path, Some(1, tsMap(1)), 13) + checkExpectedIndexUsed(indexName, path, Some(2, tsMap(2)), 13) + checkExpectedIndexUsed(indexName, path, Some(5, tsMap(5)), 13) + checkExpectedIndexUsed(indexName, path, Some(6, tsMap(6)), 13) + checkExpectedIndexUsed(indexName, path, Some(7, tsMap(7)), 13) + checkExpectedIndexUsed(indexName, path, Some(8, tsMap(8)), 13) + checkExpectedIndexUsed(indexName, path, Some(9, tsMap(9)), 13) + } + } + } + } + test("DeltaLakeRelation.closestIndex should handle indexes without delta versions.") { withTempPathAsString { path => import spark.implicits._ @@ -596,4 +692,20 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite { expectedPathsSubStr.exists(p.toString.contains(_))) && expectedPathsSubStr.forall(p => rootPaths.exists(_.toString.contains(p))) } + + private def listFiles(path: String, fileIdTracker: FileIdTracker): Seq[FileInfo] = { + val absolutePath = PathUtils.makeAbsolute(path) + val fs = absolutePath.getFileSystem(new Configuration) + fs.listStatus(absolutePath) + .toSeq + .filter(f => DataPathFilter.accept(f.getPath)) + .map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true)) + } + + private def getIndexDataPath(indexName: String, idx: Int): String = { + val indexBasePath = spark.conf.get("spark.hyperspace.system.path") + + s"${indexBasePath}/${indexName}/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=${idx}" + } + } diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala index 99434fd39..2d41ba33c 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogEntryTest.scala @@ -16,7 +16,7 @@ package com.microsoft.hyperspace.index -import java.io.{File, FileNotFoundException} +import java.io.File import java.nio.file import java.nio.file.{Files, Paths} @@ -401,7 +401,7 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper { } test( - "Directory.fromDirectory and fromLeafFileswhere files are at same level but different" + + "Directory.fromDirectory and fromLeafFiles where files are at same level but different" + "dirs.") { // File Structure // testDir/temp/a/f1 diff --git a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala index 6bc24b9bf..23693b56b 100644 --- a/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala +++ b/src/test/scala/com/microsoft/hyperspace/index/IndexLogManagerImplTest.scala @@ -112,7 +112,7 @@ class IndexLogManagerImplTest extends HyperspaceSuite { // find position to insert \0 val jsonContent = JsonUtils.toJson(sampleIndexLogEntry) val sourceIndex = jsonContent.indexOf("\"source\"") - val damagedJsonContent = jsonContent.substring(0, sourceIndex + 8) + "\0" + jsonContent + val damagedJsonContent = jsonContent.substring(0, sourceIndex + 8) + "\u0000" + jsonContent .substring(sourceIndex + 8); FileUtils.createFile(