Skip to content

Commit

Permalink
[SYSTEMDS-3708] Additional sort-merge raJoin method
Browse files Browse the repository at this point in the history
LDE project SoSe'24, part III.
Closes 2044.
  • Loading branch information
gghsu authored and mboehm7 committed Jul 6, 2024
1 parent a9b6db4 commit f81b76d
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 15 deletions.
118 changes: 106 additions & 12 deletions scripts/builtin/raJoin.dml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# colA Integer indicating the column index of matrix A to execute inner join command
# B Matrix of right left data [shape: N x M]
# colA Integer indicating the column index of matrix B to execute inner join command
# method Join implementation method (nested-loop)
# method Join implementation method (nested-loop, sort-merge)
# ------------------------------------------------------------------------------
#
# OUTPUT:
Expand All @@ -37,21 +37,115 @@
# ------------------------------------------------------------------------------

m_raJoin = function (Matrix[Double] A, Integer colA, Matrix[Double] B,
Integer colB, String method="nested-loop")
Integer colB, String method="sort-merge")
return (Matrix[Double] Y)
{
# matrix of result data
Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )

for (i in 1:nrow(A)) {
for (j in 1:nrow(B)) {
if (as.scalar(A[i, colA] == B[j, colB])) {
# Combine the matching row from A and B to match
match = cbind(A[i,], B[j,])
# merge the match row into result Y
Y = rbind(Y, match)
# Sort the input Matrix with specific column in order to ensure same output order
A = order(target = A, by = colA, decreasing=FALSE, index.return=FALSE)
B = order(target = B, by = colB, decreasing=FALSE, index.return=FALSE)

if (method == "nested-loop") {
# matrix of result data
Y = matrix(0, rows=0, cols=ncol(A) + ncol(B) )

for (i in 1:nrow(A)) {
for (j in 1:nrow(B)) {
if (as.scalar(A[i, colA] == B[j, colB])) {
# Combine the matching row from A and B to match
match = cbind(A[i,], B[j,])
# merge the match row into result Y
Y = rbind(Y, match)
}
}
}
}
else if (method == "sort-merge") {
# get join key columns
left = A[, colA]
right = B[, colB]

# Sort join keys
leftIdx = order(target = left, decreasing=FALSE)
rightIdx = order(target = right, decreasing=FALSE)

# Ensure histograms are aligned by creating a common set of keys
commonKeys = max(max(left), max(right));

# Build histograms for the left and right key columns
leftHist = table(left, 1, commonKeys, 1)
rightHist = table(right, 1, commonKeys, 1)

# Compute the number of rows for each pair of matching keys
histMul = leftHist * rightHist

# Compute the prefx sums of histograms
cumLeftHist = cumsum(leftHist)
cumRightHist = cumsum(rightHist)
cumHistMul = cumsum(histMul)

# Initialize the output size and output offsets
outSize = cumHistMul[nrow(cumHistMul), 1]
if(as.scalar(outSize > 0)) {
offset = seq(1, as.scalar(outSize), 1)

# Find the bucket of matching keys to which each output belongs
outBucket = parallelBinarySearch(offset, cumHistMul)

# Determine the number of rows in outBucket
num_rows = nrow(outBucket)

# Initialize a matrix to store the result
updatedoffset = matrix(0, rows=num_rows, cols=1)
leftOutIdx = matrix(0, rows=num_rows, cols=1)
rightOutIdx = matrix(0, rows=num_rows, cols=1)

# Compute the element-wise subtraction and store in result
# TODO performance - try avoid iterating over rows
for(i in 1:num_rows) {
updatedoffset[i, 1] = offset[i, 1] - (cumHistMul[as.scalar(outBucket[i, 1]), 1] - histMul[as.scalar(outBucket[i, 1]), 1]) -1
leftOutIdx[i, 1] = as.scalar(cumLeftHist[as.scalar(outBucket[i, 1]), 1] - leftHist[as.scalar(outBucket[i, 1]), 1] + floor(updatedoffset[i, 1] / rightHist[as.scalar(outBucket[i, 1]), 1])) +1
rightOutIdx[i, 1] = as.scalar(cumRightHist[as.scalar(outBucket[i, 1]), 1] - rightHist[as.scalar(outBucket[i, 1]), 1] + (updatedoffset[i, 1] %% rightHist[as.scalar(outBucket[i, 1]), 1])) +1
}

nrows = length(offset)
ncolsA = ncol(A)
ncolsB = ncol(B)
Y = matrix(0, rows=nrows, cols=ncolsA + ncolsB)

# Populate the output matrix Y
for (i in 1:nrows) {
Y[i, 1:ncolsA] = A[as.scalar(leftOutIdx[i, 1]), ]
Y[i, (ncolsA + 1):(ncolsA + ncolsB)] = B[as.scalar(rightOutIdx[i, 1]), ]
}
}
# TODO hash-based method which constructs permutation tables to replicate
# tuples of lhs and rhs and simply concatenates these tuples via cbind
else{
Y = matrix(0, rows=0, cols=1)
}
}
}

# Function to perform parallel binary search
parallelBinarySearch = function (Matrix[double] offset, Matrix[double] cumHistMul)
return (Matrix[double] matched_result)
{
n = nrow(cumHistMul)
result = matrix(0, rows=nrow(offset), cols=1)
for (i in 1:nrow(offset)) {
low = 1
high = n
while (low <= high) {
mid = as.integer((low + high) / 2)
if ( as.scalar(offset[i] <= cumHistMul[mid]) ) {
result[i] = mid
high = mid - 1
} else {
low = mid + 1
}
}
}

matched_result = result
}

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public void setUp() {
addTestConfiguration(TEST_NAME,new TestConfiguration(TEST_CLASS_DIR, TEST_NAME,new String[]{"result"}));
}

// TODO test all join methods

@Test
public void testRaJoinTest() {
//generate actual dataset and variables
Expand All @@ -64,9 +66,9 @@ public void testRaJoinTest() {
// Expected output matrix
double[][] Y = {
{1, 2, 3, 1, 2, 9},
{1, 3, 6, 1, 2, 9},
{4, 7, 8, 4, 7, 8},
{4, 7, 8, 4, 5, 10},
{1, 3, 6, 1, 2, 9},
{4, 3, 5, 4, 7, 8},
{4, 3, 5, 4, 5, 10},
};
Expand Down Expand Up @@ -107,7 +109,7 @@ public void testRaJoinTestwithDifferentColumn2() {
double[][] A = {
{1, 2, 3, 4, 5},
{6, 7, 8, 9, 10},
{11, 12, 13, 14, 15},
{11, 12, 13, 14, 8},
{16, 17, 18, 19, 20},
{21, 22, 23, 24, 25}
};
Expand Down
2 changes: 1 addition & 1 deletion src/test/scripts/functions/builtin/raJoin.dml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ colA = as.integer($2)
B = read($3)
colB = as.integer($4)

result = raJoin(A, colA, B, colB, "nested-loop");
result = raJoin(A, colA, B, colB, "sort-merge");
write(result, $5);

0 comments on commit f81b76d

Please sign in to comment.