Semantic Cache is a shared semantic cache that utilized semantic information of cached contents to make optimization of Big Data jobs easier and more efficient.
Java 11 (https://www.oracle.com/java/technologies/javase-jdk11-downloads.html)
Maven 3.6.3 (https://maven.apache.org/docs/3.6.3/release-notes.html)
Apache Hadoop 2.9.2 or higher, 3.x.x not tested (https://hadoop.apache.org/)
Apache Spark 3.0.x (https://spark.apache.org/)
Scala 2.12.10 (https://www.scala-lang.org/download/2.12.10.html)
- Install java and set environment variable JAVA_HOME.
- Install Maven and set environment variable MAVEN_HOME (or include mvn bin folder in PATH).
- Install Hadoop and setup HDFS cluster.
- Install Spark and setup Spark cluster.
- Install Scala and setup SCALA_HOME (or include scala bin folder in PATH).
- Compile Semantic Cache by running the following at root directory:
${MAVEN_HOME}/bin/mvn clean package
- Configure Semantic Cache Manager with the correct parameters using the following templates:
${CAERUS_HOME}/manager/src/main/resources/{manager.conf,log4j.properties}.template
For manager.conf
all the baseline caches should be defined (only HDFS supported for now). For example, an HDFS cluster in the storage side running on 10.0.0.10
at port 9000
with 64GiB
available can be registered as a baseline cache as follows (using /tmp/cache
as a folder to store cache contents):
caches {
storage-disk {
site = "hdfs://10.0.0.10:9000/tmp/cache"
size = 64G
}
}
More than one baseline cache can be defined, but only one per tier (compute-memory
,compute-disk
,storage-memory
,storage-disk
).
- Run Semantic Cache Manager by runnning:
${SCALA_HOME}/bin/scala -Dlog4j.configuration=<log4j.properties path> ${CAERUS_HOME}/manager/target/manager-0.0.0-jar-with-dependencies.jar <manager.conf path>
- Use the following option along with Spark when you run a Scala application to have access to Semantic Cache API:
--driver-class-path ${CAERUS_HOME}/client.spark/target/client.spark-0.0.0-jar-with-dependencies.jar
Simply use the following in the Scala Spark application code:
SemanticCache.activate(sparkSession, semanticCacheManagerURI)
Create new semantic cache client:
val semanticCache = new SemanticCache(sparkSession, semanticCacheManagerURI)
Write to cache:
val bytesWritten = semanticCache.repartitioning(loadDataFrame, partitionAttribute, tier, name)
val bytesWritten = semanticCache.fileSkippingIndexing(loadDataFrame, partitionAttribute, tier, name)
val bytesWritten = semanticCache.caching(dataframe, tier, name)
Delete from cache:
val bytesFreed = semanticCache.delete(name)
Get current status of the semantic cache:
Console.out.println(semanticCache.status)
To run a simple user example using GridPocket data (https://github.com/gridpocket/project-iostack), use the following:
// Initialize Spark.
val spark: SparkSession = SparkSession.builder()
.master(sparkURI)
.appName(name="SemanticCacheExample")
.getOrCreate()
// Activate Semantic Cache optimizations.
SemanticCache.activate(spark, semanticCacheURI)
// Initialize data format.
val sch = StructType(Array(
StructField("vid", StringType, nullable = true),
StructField("date", TimestampType, nullable = true),
StructField("index", DoubleType, nullable = true),
StructField("sumHC", DoubleType, nullable = true),
StructField("sumHP", DoubleType, nullable = true),
StructField("type", StringType, nullable = true),
StructField("size", IntegerType, nullable = true),
StructField("temp", DoubleType, nullable = true),
StructField("city", StringType, nullable = true),
StructField("region", StringType, nullable = true),
StructField("lat", DoubleType, nullable = true),
StructField("lng", DoubleType, nullable = true)))
// Create leaf node.
val loadDF = spark.read.schema(sch).option("header", value = true).csv(inputPath)
// Run queries.
loadDF.createOrReplaceTempView("meter")
val q1 = "SELECT * FROM meter WHERE temp >= 9.0 AND temp < 12.0"
val q2 = "SELECT vid,city FROM meter WHERE temp >= 9.0 AND temp < 12.0"
val q3 = "SELECT city, avg(temp) as avg_temp FROM meter WHERE temp >= 9.0 AND temp < 12.0 GROUP BY city"
val q1DF = spark.sql(q1)
val q2DF = spark.sql(q2)
val q3DF = spark.sql(q3)
q1DF.explain(mode = "extended")
q2DF.explain(mode = "extended")
q3DF.explain(mode = "extended")
q1DF.write.csv(outputPath + "/example1.csv")
q2DF.write.csv(outputPath + "/example2.csv")
q3DF.write.csv(outputPath + "/example3.csv")
spark.stop()
To fill the cache with contents, the Write API should be used. Here is an example of performing repartitioning on GridPocket data on the attribute of temperature:
// Initialize Spark.
val spark: SparkSession = SparkSession.builder()
.master(sparkURI)
.appName(name="SemanticCacheRepartitioning")
.getOrCreate()
// Initialize Semantic Cache connector.
val semanticCache = new SemanticCache(spark, semanticCacheURI)
// Initialize data format.
val sch = StructType(Array(
StructField("vid", StringType, nullable = true),
StructField("date", TimestampType, nullable = true),
StructField("index", DoubleType, nullable = true),
StructField("sumHC", DoubleType, nullable = true),
StructField("sumHP", DoubleType, nullable = true),
StructField("type", StringType, nullable = true),
StructField("size", IntegerType, nullable = true),
StructField("temp", DoubleType, nullable = true),
StructField("city", StringType, nullable = true),
StructField("region", StringType, nullable = true),
StructField("lat", DoubleType, nullable = true),
StructField("lng", DoubleType, nullable = true)))
// Create leaf node.
val loadDF = spark.read.schema(sch).option("header", value = true).csv(inputPath)
// Cache repartition content.
val bytesWritten = semanticCache.repartitioning(loadDF, partitionAttribute=attributeName, Tier.STORAGE_DISK, name=repartitionName)
assert(bytesWritten > 0L)
Console.out.println(semanticCache.status)
spark.stop()
After the repartitioning content is created, the initial leaf node changes in the Optimized Logical Plan for all three queries from
+- Relation[vid#0,date#1,index#2,sumHC#3,sumHP#4,type#5,size#6,temp#7,city#8,region#9,lat#10,lng#11] csv
to
+- Relation[vid#0,date#1,index#2,sumHC#3,sumHP#4,type#5,size#6,temp#7,city#8,region#9,lat#10,lng#11] parquet
revealing changes in the optimization process from Semantic Cache. The ensuing Physical Plan reveals even further information (files that are loaded) if printed.