Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulling in changes which were developed during the file format development. #2

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
31 changes: 30 additions & 1 deletion src/main/java/com/ibm/crail/spark/tools/ParseOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ParseOptions implements Serializable {
private String outputFileFormat;
private Map<String,String> dataSinkOptions;
private TPCDSOptions tpcdsOptions;
private int blockSize;

static <K, V> scala.collection.immutable.Map<K, V> toScalaImmutableMap(java.util.Map<K, V> javaMap) {
final java.util.List<scala.Tuple2<K, V>> list = new java.util.ArrayList<>(javaMap.size());
Expand All @@ -71,6 +72,7 @@ public ParseOptions(){
this.rangeInt = Integer.MAX_VALUE;
this.affixRandom = false;
this.outputFileFormat = "parquet";
this.blockSize = -1;
this.dataSinkOptions = new Hashtable<String, String>();

this.tpcdsOptions = new TPCDSOptions("",
Expand All @@ -94,7 +96,7 @@ public ParseOptions(){
" variable payload data will be generated once, and used multiple times (default " + this.affixRandom +")");
options.addOption("r", "rows", true, "<long> total number of rows (default: " + this.rowCount +")");
options.addOption("c", "case", true, "case class schema currently supported are: \n" +
" ParquetExample (default), IntWithPayload, and tpcds. \n" +
" ParquetExample (default), IntWithPayload, IntWithPayload2x, and tpcds. \n" +
" These classes are in ./schema/ in src.");
options.addOption("o", "output", true, "<String> the output file name (default: " + this.output+")");
options.addOption("t", "tasks", true, "<int> number of tasks to generate this data (default: " + this.tasks+")");
Expand All @@ -112,6 +114,8 @@ public ParseOptions(){
options.addOption("O", "options", true, "<str,str> key,value strings that will be passed to the data source of spark in writing." +
" E.g., for parquet you may want to re-consider parquet.block.size. The default is 128MB (the HDFS block size). ");

options.addOption("bs", "blockSize", true, "[int] to set the block size for file formats that support it, for now it is Parquet and ORC");

// case class TPCDSOptions(var dsdgen_dir:String = "/home/atr/zrl/external/github/databricks/tpcds-kit/tools/",
// var scale_factor:String = "1",
// var data_location:String = "file:/data/tpcds-F1",
Expand Down Expand Up @@ -285,6 +289,9 @@ public void parse(String[] args) {
if (cmd.hasOption("t")) {
this.tasks = Integer.parseInt(cmd.getOptionValue("t").trim());
}
if (cmd.hasOption("bs")) {
this.blockSize = Integer.parseInt(cmd.getOptionValue("bs").trim());
}

if (cmd.hasOption("R")) {
this.rangeInt = Integer.parseInt(cmd.getOptionValue("R").trim());
Expand Down Expand Up @@ -355,5 +362,27 @@ public void parse(String[] args) {
showErrorAndExit("Please set the directory for dsdgen with -tdsd");
}
}

if(this.outputFileFormat.compareToIgnoreCase("orc") == 0){
// check and set the compression type.
// Parquet options are picked up by the spark conf variable
// but the ORC options needs to be set on the sink options.
if(this.getCompressionType().compareToIgnoreCase("uncompressed") == 0){
this.dataSinkOptions.put("orc.compress","none");
} else {
this.dataSinkOptions.put("orc.compress",this.compressionType);
}
}

if(this.blockSize != -1){
String size = Integer.toString(this.blockSize);
if(this.outputFileFormat.compareToIgnoreCase("parquet") == 0) {
this.dataSinkOptions.put("parquet.block.size", size);
} else if (this.outputFileFormat.compareToIgnoreCase("orc") == 0){
this.dataSinkOptions.put("orc.stripe.size", size);
} else {
throw new IllegalArgumentException("-bs is only supported for ORC and Parquet so far, current format is " + this.outputFileFormat);
}
}
}
}
56 changes: 33 additions & 23 deletions src/main/scala/com/ibm/crail/spark/tools/DataGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,76 +21,86 @@

package com.ibm.crail.spark.tools

import java.util.concurrent.ThreadLocalRandom

import scala.util.Random

/**
* Created by atr on 10/5/16.
*/
object DataGenerator extends Serializable {
val random = new Random(System.nanoTime())
val scalaRandom = new Random(System.nanoTime())

//val ThreadLocalRandom.current() = ThreadLocalRandom.current()
//ThreadLocalRandom.current().setSeed(System.nanoTime())

/* affix payload variables */
var affixStringBuilder:StringBuilder = null
var affixByteArray:Array[Byte] = null
val baseRandomArray:Array[Byte] = new Array[Byte](1024*1024)
ThreadLocalRandom.current().nextBytes(this.baseRandomArray)

def getNextString(size: Int, affix: Boolean):String = {
if(affix){
this.synchronized {
if (affixStringBuilder == null) {
affixStringBuilder = new StringBuilder(
random.alphanumeric.take(size).mkString)
scalaRandom.alphanumeric.take(size).mkString)
}
}
/* just randomly change 1 byte - this is to make sure parquet
* does not ignore the data */
affixStringBuilder.setCharAt(random.nextInt(size),
random.nextPrintableChar())
affixStringBuilder.setCharAt(ThreadLocalRandom.current().nextInt(size),
scalaRandom.nextPrintableChar())
affixStringBuilder.mkString
} else {
random.alphanumeric.take(size).mkString
scalaRandom.alphanumeric.take(size).mkString
}
}

def getNextByteArray(size: Int, affix: Boolean):Array[Byte] = {
val toReturn = new Array[Byte](size)
if(!affix){
/* if not affix, then return completely new values in a new array */
random.nextBytes(toReturn)
ThreadLocalRandom.current().nextBytes(toReturn)
} else {
this.synchronized{
if(affixByteArray == null){
affixByteArray = new Array[Byte](size)
/* initialize */
random.nextBytes(affixByteArray)
}
}
/* just randomly change 1 byte - this is to make sure parquet
/* now we need to fill out the passed array from our source */
var leftBytes = toReturn.length
var srcOffset = 0
var dstOffset = 0
while(leftBytes > 0){
val toCopy = Math.min(leftBytes, this.baseRandomArray.length)
/* just randomly change 1 byte - this is to make sure parquet
* does not ignore the data - char will be casted to byte */
affixByteArray(random.nextInt(size)) = random.nextPrintableChar().toByte
/* now we copy affix array */
Array.copy(affixByteArray, 0, toReturn, 0, size)
this.baseRandomArray(ThreadLocalRandom.current().nextInt(toCopy)) = scalaRandom.nextPrintableChar().toByte

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we randomly get the same byte? shouldn't we try until it changes?

Array.copy(this.baseRandomArray, srcOffset, toReturn, dstOffset, toCopy)
dstOffset+=toCopy
srcOffset+= toCopy
if (srcOffset == this.baseRandomArray.length)
srcOffset = 0
leftBytes-=toCopy
}
}
toReturn
}

def getNextInt:Int = {
random.nextInt()
ThreadLocalRandom.current().nextInt()
}

def getNextInt(max:Int):Int = {
random.nextInt(max)
ThreadLocalRandom.current().nextInt(max)
}

def getNextLong:Long= {
random.nextLong()
ThreadLocalRandom.current().nextLong()
}

def getNextDouble:Double= {
random.nextDouble()
ThreadLocalRandom.current().nextDouble()
}

def getNextFloat: Float = {
random.nextFloat()
ThreadLocalRandom.current().nextFloat()
}

def getNextValue(s:String, size: Int, affix:Boolean): String ={
Expand Down
98 changes: 98 additions & 0 deletions src/main/scala/com/ibm/crail/spark/tools/DataGeneratorV2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.ibm.crail.spark.tools

import java.nio.ByteBuffer

import scala.collection.mutable.StringBuilder
import scala.util.Random

/**
* Created by atr on 28.11.17.
*/
class DataGeneratorV2 extends Serializable {
val random = new Random(System.nanoTime())

/* affix payload variables */
var affixStringBuilder:StringBuilder = null
var affixByteArray:Array[Byte] = null

val poolOfRandomStuff:Array[Byte] = new Array[Byte](1024*1024)
random.nextBytes(poolOfRandomStuff)
val bb:ByteBuffer = ByteBuffer.wrap(poolOfRandomStuff)
bb.clear()

def getNextString(size: Int, affix: Boolean):String = {
if(affix){
this.synchronized {
if (affixStringBuilder == null) {
affixStringBuilder = new StringBuilder(
random.alphanumeric.take(size).mkString)
}
}
/* just randomly change 1 byte - this is to make sure parquet
* does not ignore the data */
affixStringBuilder.setCharAt(random.nextInt(size),
random.nextPrintableChar())
affixStringBuilder.mkString
} else {
random.alphanumeric.take(size).mkString
}
}

def getNextByteArray(size: Int, affix: Boolean):Array[Byte] = {
val toReturn = new Array[Byte](size)
if(!affix){
/* if not affix, then return completely new values in a new array */
random.nextBytes(toReturn)
} else {
this.synchronized{
if(affixByteArray == null){
affixByteArray = new Array[Byte](size)
/* initialize */
random.nextBytes(affixByteArray)
}
}
/* just randomly change 1 byte - this is to make sure parquet
* does not ignore the data - char will be casted to byte */
affixByteArray(random.nextInt(size)) = random.nextPrintableChar().toByte

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

/* now we copy affix array */
Array.copy(affixByteArray, 0, toReturn, 0, size)
}
toReturn
}

def getNextInt:Int = {
if(bb.remaining() < 4) {
bb.clear()
}
bb.getInt()
}

def getNextInt(max:Int):Int = {
getNextInt % max
}

def getNextLong:Long= {
if(bb.remaining() < 8) {
bb.clear()
}
bb.getLong
}

def getNextDouble:Double= {
if(bb.remaining() < 8) {
bb.clear()
}
bb.getDouble
}

def getNextFloat: Float = {
if(bb.remaining() < 4)
bb.clear()
bb.getFloat()
}

def getNextValue(s:String, size: Int, affix:Boolean): String ={
getNextString(size, affix)
}
}

Loading