Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from microsoft/master
Browse files Browse the repository at this point in the history
Sync
  • Loading branch information
dmytroDragan authored Apr 14, 2021
2 parents c900cf5 + b8a44b4 commit 336728d
Show file tree
Hide file tree
Showing 40 changed files with 745 additions and 607 deletions.
41 changes: 28 additions & 13 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@ jobs:
versionSpec: '8'
jdkArchitectureOption: 'x64'
jdkSourceOption: 'PreInstalled'
- script: sbt ++2.11.12 clean
# Use sbt 1.4.9. The default sbt launcher in ubuntu-18.04 20210405 image is
# 1.5.0, but the version has an issue to compile with 0.13.18.
# See: https://github.com/sbt/sbt/issues/6447
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt//bin/sbt ++2.11.12 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.11.12 update
- script: /tmp/sbt/bin/sbt ++2.11.12 update
displayName: 'Running $sbt update'
- script: sbt ++2.11.12 compile
- script: /tmp/sbt/bin/sbt ++2.11.12 compile
displayName: 'Running $sbt compile'
- script: sbt ++2.11.12 test
- script: /tmp/sbt/bin/sbt ++2.11.12 test
displayName: 'Running $sbt test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt ++2.11.12 package
- script: /tmp/sbt/bin/sbt ++2.11.12 package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
Expand All @@ -46,17 +53,21 @@ jobs:
pool:
vmImage: 'ubuntu-18.04'
steps:
- script: sbt ++2.12.8 clean
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt/bin/sbt ++2.12.8 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.12.8 update
- script: /tmp/sbt/bin/sbt ++2.12.8 update
displayName: 'Running $sbt update'
- script: sbt ++2.12.8 compile
- script: /tmp/sbt/bin/sbt ++2.12.8 compile
displayName: 'Running $sbt compile'
- script: sbt ++2.12.8 test
- script: /tmp/sbt/bin/sbt ++2.12.8 test
displayName: 'Running $sbt test'
# If not a pull request, publish artifacts.
- ${{ if and(ne(variables['System.TeamProject'], 'public'), notin(variables['Build.Reason'], 'PullRequest')) }}:
- script: sbt ++2.12.8 package
- script: /tmp/sbt/bin/sbt ++2.12.8 package
displayName: 'Running $sbt package'
- task: CopyFiles@2
displayName: 'Copy hyperspace-core JAR'
Expand Down Expand Up @@ -86,11 +97,15 @@ jobs:
versionSpec: '8'
jdkArchitectureOption: 'x64'
jdkSourceOption: 'PreInstalled'
- script: sbt ++2.11.12 clean
- script: wget -O /tmp/sbt.tgz "https://github.com/sbt/sbt/releases/download/v1.4.9/sbt-1.4.9.tgz"
displayName: 'Download sbt 1.4.9'
- script: tar zxf /tmp/sbt.tgz -C /tmp/
displayName: 'Extract sbt'
- script: /tmp/sbt/bin/sbt ++2.11.12 clean
displayName: 'Running $sbt clean'
- script: sbt ++2.11.12 update
- script: /tmp/sbt/bin/sbt ++2.11.12 update
displayName: 'Running $sbt update'
- script: sbt ++2.11.12 compile
- script: /tmp/sbt/bin/sbt ++2.11.12 compile
displayName: 'Running $sbt compile'
- task: Bash@3
inputs:
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ This file is divided into 3 sections:
*/
\E)?\Q/*
* Copyright (2020) The Hyperspace Project Authors.
* Copyright (\E(?:2020|2021)\Q) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.index.sources.FileBasedRelation
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils, SchemaUtils}
import com.microsoft.hyperspace.util.{HyperspaceConf, PathUtils, ResolverUtils}
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn

/**
* CreateActionBase provides functionality to write dataframe as covering index.
Expand Down Expand Up @@ -83,17 +84,19 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(sourcePlanProperties.relations.head)
.enrichIndexProperties(
sourcePlanProperties.relations.head,
prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> versionId.toString)
++ hasLineageProperty(spark) ++ hasParquetAsSourceFormatProperty(relation))

IndexLogEntry(
IndexLogEntry.create(
indexConfig.indexName,
CoveringIndex(
CoveringIndex.Properties(
CoveringIndex.Properties
.Columns(resolvedIndexedColumns, resolvedIncludedColumns),
.Columns(
resolvedIndexedColumns.map(_.normalizedName),
resolvedIncludedColumns.map(_.normalizedName)),
IndexLogEntry.schemaString(indexDataFrame.schema),
numBuckets,
coveringIndexProperties)),
Expand All @@ -113,10 +116,8 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

// Run job
val repartitionedIndexDataFrame = {
// For nested fields, resolvedIndexedColumns will have flattened names with `.` (dots),
// thus they need to be enclosed in backticks to access them as top-level columns.
// Note that backticking the non-nested columns is a no-op.
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(c => col(s"`$c`")): _*)
// We are repartitioning with normalized columns (e.g., flattened nested column).
indexDataFrame.repartition(numBuckets, resolvedIndexedColumns.map(_.toNormalizedColumn): _*)
}

// Save the index with the number of buckets specified.
Expand All @@ -125,7 +126,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
repartitionedIndexDataFrame,
indexDataPath.toString,
numBuckets,
resolvedIndexedColumns,
resolvedIndexedColumns.map(_.normalizedName),
SaveMode.Overwrite)
}

Expand Down Expand Up @@ -159,7 +160,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)

private def resolveConfig(
df: DataFrame,
indexConfig: IndexConfig): (Seq[(String, Boolean)], Seq[(String, Boolean)]) = {
indexConfig: IndexConfig): (Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val spark = df.sparkSession
val plan = df.queryExecution.analyzed
val indexedColumns = indexConfig.indexedColumns
Expand All @@ -171,7 +172,7 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
case (Some(indexed), Some(included)) => (indexed, included)
case _ =>
val unresolvedColumns = (indexedColumns ++ includedColumns)
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_._1))))
.map(c => (c, ResolverUtils.resolve(spark, Seq(c), plan).map(_.map(_.name))))
.collect { case (c, r) if r.isEmpty => c }
throw HyperspaceException(
s"Columns '${unresolvedColumns.mkString(",")}' could not be resolved " +
Expand All @@ -182,14 +183,9 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
private def prepareIndexDataFrame(
spark: SparkSession,
df: DataFrame,
indexConfig: IndexConfig): (DataFrame, Seq[String], Seq[String]) = {
indexConfig: IndexConfig): (DataFrame, Seq[ResolvedColumn], Seq[ResolvedColumn]) = {
val (resolvedIndexedColumns, resolvedIncludedColumns) = resolveConfig(df, indexConfig)
val columnsFromIndexConfig =
resolvedIndexedColumns.map(_._1) ++ resolvedIncludedColumns.map(_._1)

val prefixedIndexedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIndexedColumns)
val prefixedIncludedColumns = SchemaUtils.prefixNestedFieldNames(resolvedIncludedColumns)
val prefixedColumnsFromIndexConfig = prefixedIndexedColumns ++ prefixedIncludedColumns
val projectColumns = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.toColumn)

val indexDF = if (hasLineage(spark)) {
val relation = getRelation(spark, df)
Expand All @@ -198,10 +194,12 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
// 1. DATA_FILE_ID_COLUMN column contains source data file id for each index record.
// 2. If source data is partitioned, all partitioning key(s) are added to index schema
// as columns if they are not already part of the schema.
val partitionColumns = relation.partitionSchema.map(_.name)
val partitionColumnNames = relation.partitionSchema.map(_.name)
val resolvedColumnNames = (resolvedIndexedColumns ++ resolvedIncludedColumns).map(_.name)
val missingPartitionColumns =
partitionColumns.filter(ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns
partitionColumnNames
.filter(ResolverUtils.resolve(spark, _, resolvedColumnNames).isEmpty)
.map(col(_))

// File id value in DATA_FILE_ID_COLUMN column (lineage column) is stored as a
// Long data type value. Each source data file has a unique file id, assigned by
Expand All @@ -218,25 +216,15 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
val dataPathColumn = "_data_path"
val lineagePairs = relation.lineagePairs(fileIdTracker)
val lineageDF = lineagePairs.toDF(dataPathColumn, IndexConstants.DATA_FILE_NAME_ID)
val prefixedAllIndexColumns = prefixedColumnsFromIndexConfig ++ missingPartitionColumns

df.withColumn(dataPathColumn, input_file_name())
.join(lineageDF.hint("broadcast"), dataPathColumn)
.select(prepareColumns(allIndexColumns, prefixedAllIndexColumns) :+
col(IndexConstants.DATA_FILE_NAME_ID): _*)
.select(
projectColumns ++ missingPartitionColumns :+ col(IndexConstants.DATA_FILE_NAME_ID): _*)
} else {
df.select(prepareColumns(columnsFromIndexConfig, prefixedColumnsFromIndexConfig): _*)
df.select(projectColumns: _*)
}

(indexDF, prefixedIndexedColumns, prefixedIncludedColumns)
}

private def prepareColumns(
originalColumns: Seq[String],
prefixedColumns: Seq[String]): Seq[Column] = {
originalColumns.zip(prefixedColumns).map {
case (original, prefixed) =>
col(original).as(prefixed)
}
(indexDF, resolvedIndexedColumns, resolvedIncludedColumns)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.microsoft.hyperspace.actions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.execution.datasources.BucketingUtils
Expand Down Expand Up @@ -148,8 +147,8 @@ class OptimizeAction(
properties = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(previousIndexLogEntry.relations.head)
.enrichIndexProperties(
previousIndexLogEntry.relations.head,
prevIndexProperties + (IndexConstants.INDEX_LOG_VERSION -> endId.toString))))

if (filesToIgnore.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.util.SchemaUtils
import com.microsoft.hyperspace.util.ResolverUtils.ResolvedColumn

/**
* Base abstract class containing common code for different types of index refresh actions.
Expand Down Expand Up @@ -71,8 +71,11 @@ private[actions] abstract class RefreshActionBase(
// Reconstruct a df from schema
protected lazy val df = {
val relations = previousIndexLogEntry.relations
val latestRelation =
Hyperspace.getContext(spark).sourceProviderManager.refreshRelationMetadata(relations.head)
val latestRelation = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(relations.head)
.refresh()
val dataSchema = DataType.fromJson(latestRelation.dataSchemaJson).asInstanceOf[StructType]
val df = spark.read
.schema(dataSchema)
Expand All @@ -95,8 +98,8 @@ private[actions] abstract class RefreshActionBase(
previousIndexLogEntry.name,
// As indexed & included columns in previousLogEntry are resolved & prefixed names,
// need to remove the prefix to resolve with the dataframe for refresh.
SchemaUtils.removePrefixNestedFieldNames(ddColumns.indexed).map(_._1),
SchemaUtils.removePrefixNestedFieldNames(ddColumns.included).map(_._1))
ddColumns.indexed.map(ResolvedColumn(_).name),
ddColumns.included.map(ResolvedColumn(_).name))
}

final override val transientState: String = REFRESHING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class RefreshIncrementalAction(
val internalFileFormatName = Hyperspace
.getContext(spark)
.sourceProviderManager
.internalFileFormatName(previousIndexLogEntry.relations.head)
.getRelationMetadata(previousIndexLogEntry.relations.head)
.internalFileFormatName()

// Create a df with only appended files from original list of files.
val dfWithAppendedFiles = spark.read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ class IndexCollectionManager(
}
}

override def getIndex(indexName: String, logVersion: Int): Option[IndexLogEntry] = {
withLogManager(indexName) { logManager =>
logManager.getLog(logVersion).map(_.asInstanceOf[IndexLogEntry])
}
}

override def getIndexVersions(indexName: String, states: Seq[String]): Seq[Int] = {
withLogManager(indexName) { logManager =>
logManager.getIndexVersions(states)
}
}

private def indexLogManagers: Seq[IndexLogManager] = {
val hadoopConf = spark.sessionState.newHadoopConf()
val rootPath = PathResolver(conf, hadoopConf).systemPath
Expand All @@ -162,12 +174,6 @@ class IndexCollectionManager(
}
}

override def getIndex(indexName: String, logVersion: Int): Option[IndexLogEntry] = {
withLogManager(indexName) { logManager =>
logManager.getLog(logVersion).map(_.asInstanceOf[IndexLogEntry])
}
}

private def withLogManager[T](indexName: String)(f: IndexLogManager => T): T = {
getLogManager(indexName) match {
case Some(logManager) => f(logManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ object IndexConstants {
private[hyperspace] val LINEAGE_PROPERTY = "lineage"
// Indicate whether the source file format is parquet.
private[hyperspace] val HAS_PARQUET_AS_SOURCE_FORMAT_PROPERTY = "hasParquetAsSourceFormat"
// Indicate Hyperspace version.
private[hyperspace] val HYPERSPACE_VERSION_PROPERTY: String = "hyperspaceVersion"
// Indicate index log version.
private[hyperspace] val INDEX_LOG_VERSION = "indexLogVersion"

Expand Down
38 changes: 34 additions & 4 deletions src/main/scala/com/microsoft/hyperspace/index/IndexLogEntry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.{BuildInfo, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.{PathUtils, SchemaUtils}
import com.microsoft.hyperspace.util.PathUtils

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
case class NoOpFingerprint() {
Expand Down Expand Up @@ -430,7 +430,12 @@ object SparkPlan {
// IndexLogEntry-specific Source that uses SparkPlan as a plan.
case class Source(plan: SparkPlan)

// IndexLogEntry that captures index-related information.
/**
* IndexLogEntry that captures index-related information.
* Don't use this method to create a new IndexLogEntry, unless you specify all hyperspace project
* default properties.
* Refer the create method of IndexLogEntry Object for further details.
*/
case class IndexLogEntry(
name: String,
derivedDataset: CoveringIndex,
Expand Down Expand Up @@ -518,6 +523,7 @@ case class IndexLogEntry(
numBuckets.equals(that.numBuckets) &&
content.root.equals(that.content.root) &&
source.equals(that.source) &&
properties.equals(that.properties) &&
state.equals(that.state)
case _ => false
}
Expand Down Expand Up @@ -610,6 +616,30 @@ object IndexLogEntry {
val VERSION: String = "0.1"

def schemaString(schema: StructType): String = schema.json

/**
* Use this method to create a new IndexLogEntry, which automatically includes
* all common default hyperspace project properties.
* TODO: force dev to use this method as this takes into account all
* project properties that needed to be added by default. Currently, dev can also
* create IndexLogEntry from case class.
* https://github.com/microsoft/hyperspace/issues/370
* Also add require for hyperspace project version when we introduce breaking change.
*/
def create(
name: String,
derivedDataset: CoveringIndex,
content: Content,
source: Source,
properties: Map[String, String]): IndexLogEntry = {
IndexLogEntry(
name,
derivedDataset,
content,
source,
properties + ((IndexConstants.HYPERSPACE_VERSION_PROPERTY, BuildInfo.version))
)
}
}

/**
Expand All @@ -629,7 +659,7 @@ class FileIdTracker {

def getMaxFileId: Long = maxId

def getFileToIdMap: HashMap[key, Long] = fileToIdMap
def getFileToIdMapping: Seq[(key, Long)] = fileToIdMap.toSeq

def getFileId(path: String, size: Long, modifiedTime: Long): Option[Long] =
fileToIdMap.get((path, size, modifiedTime))
Expand Down
Loading

0 comments on commit 336728d

Please sign in to comment.