Skip to content

Commit

Permalink
[SYSTEMDS-2650] Standardize dedup lineage trace serialization format
Browse files Browse the repository at this point in the history
This patch standardizes the dedup patch serialization format, regardless of if
they are captured by the lineage() built-in or via a write operator. Previously,
all patches were stored in a separate file. Now, we always create a single file,
containing first the global trace, followed by the patches. This change also
allows us to copy the traces from the console (lineage()) and store them in a
single file for future recomputation.
  • Loading branch information
phaniarnab committed Aug 27, 2024
1 parent 64d1618 commit e291bb6
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
@Override
protected FrameBlock reconstructByLineage(LineageItem li) throws IOException {
return ((FrameObject) LineageRecomputeUtils
.parseNComputeLineageTrace(li.getData(), null))
.parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String output
@Override
protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException {
return ((MatrixObject) LineageRecomputeUtils
.parseNComputeLineageTrace(Explain.explain(li), null))
.parseNComputeLineageTrace(Explain.explain(li)))
.acquireReadAndRelease();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt)
@Override
protected TensorBlock reconstructByLineage(LineageItem li) throws IOException {
return ((TensorObject) LineageRecomputeUtils
.parseNComputeLineageTrace(li.getData(), null))
.parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ else if( input1.getDataType().isMatrix() || input1.getDataType().isFrame() ) {

LineageItem li = ec.getLineageItem(input1);
String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) :
Explain.explain(li) + LineageDedupUtils.mergeExplainDedupBlocks(ec);
Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec);
ec.setScalarOutput(outputName, new StringObject(out));
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,9 @@ private void processWriteLI(CPOperand input1, CPOperand input2, ExecutionContext
LineageItem li = get(input1);
String fName = ec.getScalarInput(input2.getName(), Types.ValueType.STRING, input2.isLiteral()).getStringValue();

if (DMLScript.LINEAGE_DEDUP) {
// gracefully serialize the dedup maps without decompressing
LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup");
}
LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage");
// Combine the global trace and dedup patches in a single file.
String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) :
Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec);
LineageItemUtils.writeTraceToHDFS(out, fName + ".lineage");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static LineageItem parseLineageInstruction(Long id, String str, Map<Long
}
return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0]), specialValueBits);
}

protected static void parseLineageTraceDedup(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
Expand All @@ -145,4 +145,22 @@ protected static void parseLineageTraceDedup(String str) {
loopItem.patchLiMap.get(pathId).put(parts[1], patchLi);
}
}

protected static String[] separateMainAndDedupPatches(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
if (allPatches.length == 1) //no dedup patches
return allPatches;

// Merge the dedup patches into a single string
String[] patches = new String[2];
patches[0] = allPatches[0];
StringBuilder sb = new StringBuilder();
for (int i=1; i<allPatches.length; i++) {
sb.append(allPatches[i]);
sb.append("\n\n");
}
patches[1] = sb.toString();
return patches;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,18 @@
public class LineageRecomputeUtils {
private static final String LVARPREFIX = "lvar";
public static final String LPLACEHOLDER = "IN#";
private static final boolean DEBUG = false;
private static final boolean DEBUG = true;
public static Map<String, DedupLoopItem> loopPatchMap = new HashMap<>();

public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) {
if (DEBUG) {
public static Data parseNComputeLineageTrace(String mainTrace) {
if (DEBUG)
System.out.println(mainTrace);
System.out.println(dedupPatches);
}
LineageItem root = LineageParser.parseLineageTrace(mainTrace);
if (dedupPatches != null)
LineageParser.parseLineageTraceDedup(dedupPatches);

// Separate the global trace and the dedup patches
String[] patches = LineageParser.separateMainAndDedupPatches(mainTrace);
LineageItem root = LineageParser.parseLineageTrace(patches[0]); //global trace
if (patches.length > 1)
LineageParser.parseLineageTraceDedup(patches[1]);

// Disable GPU execution. TODO: Support GPU
boolean GPUenabled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void testLineageTrace(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void testLineageTraceBuiltin(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ public void testLineageTrace(String testname) {

//deserialize, generate program and execute
String Rtrace = readDMLLineageFromHDFS("R");
String RDedupPatches = readDMLLineageDedupFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

//match the original and recomputed results
HashMap<CellIndex, Double> orig = readDMLMatrixFromOutputDir("R");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ public void testLineageTraceSpark(String testname) {
TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li));

//generate program
Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null);
Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage);
HashMap<MatrixValue.CellIndex, Double> X_dmlfile = readDMLMatrixFromOutputDir("X");
MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease();
TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6);

Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null);
Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage);
HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = readDMLMatrixFromOutputDir("Y");
MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease();
TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private void testLineageTraceExec(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) {
double val1 = readDMLScalarFromOutputDir("R").get(new CellIndex(1,1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private void testLineageTraceFunction(String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void testLineageTraceExec(String testname) {
String Rtrace = readDMLLineageFromHDFS("R");
AutomatedTestBase.TEST_GPU = false;
//NOTE: the generated program is CP-only.
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private void testLineageTraceParFor(int ncol, String testname) {

//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);

HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease();
Expand Down

0 comments on commit e291bb6

Please sign in to comment.