Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADD] Interval Join Operator Implementation #20

Open
wants to merge 70 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
8066b5a
[ADD] Added new operator - Interval Join ( Key Parallelism )
DropB1t Dec 11, 2023
559f5c0
[FIX] Fixed nondeterministic behavior of get_MergedNodes1() function
DropB1t Dec 12, 2023
d8676c7
[FIX] Added a condition to increment the index pointer more securely
DropB1t Dec 13, 2023
c1dd0ae
Merge pull request #1 from DropB1t/deterministic-merge-order
DropB1t Dec 13, 2023
4cea4c4
[FIX] Fixed interval evaluation logic
DropB1t Dec 14, 2023
4268f60
[ADD] Added get/set StreamTag methods to all batch variants
DropB1t Dec 14, 2023
c7e4b9d
[FIX] Fixed join logic function signature
DropB1t Dec 15, 2023
2b22340
Merge pull request #2 from DropB1t/optional-result-signature
DropB1t Dec 15, 2023
206057d
[FIX] Removed GPU's batch JoinStream tag attribute
DropB1t Dec 16, 2023
7f67372
[FIX] Refactored collectors logic
DropB1t Dec 16, 2023
962bab9
[FIX] Fixed tagging streams in ordering/kslack collectors
DropB1t Dec 16, 2023
4b8c9c2
[ADD] Added Data Parallelism join execution mode
DropB1t Dec 18, 2023
6c2eb42
[ADD] Added synthetic join tests
DropB1t Dec 18, 2023
d617c46
[FIX] Fixed join tests 4 and 5
DropB1t Dec 19, 2023
779492a
[ADD] Swappable archive's buffer structure
DropB1t Dec 31, 2023
9a89bab
[FIX] Fixed constructor signature
DropB1t Dec 31, 2023
c22c7f2
[FIX] minor fixes
DropB1t Jan 3, 2024
5fe135e
Added buffer A stats gathering
DropB1t Feb 7, 2024
340685f
Added buffer A balance measurement
DropB1t Feb 8, 2024
09a1c3f
Added stats flag macro + buffer stats structure
DropB1t Feb 8, 2024
0cd06f5
type fix
DropB1t Feb 8, 2024
da7e315
minor
DropB1t Feb 9, 2024
81503b7
minor fix
DropB1t Feb 14, 2024
5ecc075
cleaned comments
DropB1t Feb 18, 2024
c86c50d
Merge pull request #3 from DropB1t/join-buffer-stats
DropB1t Feb 18, 2024
3b5f357
[FIX] refactored type of tuple wrapper scruct
DropB1t Feb 18, 2024
85dce0a
[FIX] fixed join archive redundant typename
DropB1t Feb 18, 2024
531ceef
join logic refactor
DropB1t Feb 21, 2024
536d95e
Added interval join tests to regression
DropB1t Feb 25, 2024
5e7c891
Bug fixes, minor refactor, test correctness check fixed
DropB1t Feb 27, 2024
49092c9
Refactored Interval range retrieving logic
DropB1t Feb 27, 2024
e4ff80e
Minor fixes
DropB1t Feb 27, 2024
6ca0f53
Refactor getJoinRange() in join_archive.hpp
DropB1t Feb 27, 2024
14172b7
Refactored archive implementation
DropB1t Mar 2, 2024
b0c9f66
minor fixes
DropB1t Mar 2, 2024
2bba8e9
fnv-1a hashing test
DropB1t Mar 3, 2024
3c146e6
join tests fixes
DropB1t Mar 4, 2024
5bad6b9
Fix comparison function in JoinArchive and WinArchive, finished refactor
DropB1t Mar 4, 2024
758558b
Merge pull request #4 from DropB1t/archive-refactor
DropB1t Mar 4, 2024
064d9c0
Update hash function and join functor implementation
DropB1t Mar 7, 2024
ee60a5a
Merge branch 'upstream/master' into interval-join
DropB1t Mar 8, 2024
82165e0
Removed deprecated code
DropB1t Mar 8, 2024
90346b2
Mix refactor
DropB1t Mar 11, 2024
fdbae79
Debug commit
DropB1t Mar 13, 2024
214fb2e
fixed if condition
DropB1t Mar 13, 2024
e1ae30e
Removed all deprecated debug features
DropB1t Mar 13, 2024
5032831
bug fix
DropB1t Mar 13, 2024
0508204
[ADD] join_collector for data parallelism mode in interval join opera…
DropB1t Mar 15, 2024
37058fa
Fixed join_collector eosnotify logic
DropB1t Mar 16, 2024
37c4d4d
Fixed RR for Single_t tuple dispatching in join_collector.hpp
DropB1t Mar 17, 2024
3b612e4
[FIX] New round robin logic implemented
DropB1t Mar 19, 2024
fbe8dc6
[FIX] Fixed skewed batch bug (dp mode)
DropB1t Mar 19, 2024
f62b7a8
Fixed fnv-1a hashing method, used uint64_t type
DropB1t Mar 25, 2024
39688f5
feat: new fnv1-a hash function
DropB1t May 14, 2024
34166ec
Merge branch 'interval-join' into join-collector
DropB1t Jun 13, 2024
a1ed948
Merge pull request #6 from DropB1t/join-collector
DropB1t Jun 13, 2024
805caee
Fix mean buffer size calculation in printBufferStats
DropB1t Jun 15, 2024
fb835bc
[FIX] Enable measurement of tuple distribution in data partitioning f…
DropB1t Jul 1, 2024
e3ad596
fix: Refactored whole buffer size measurement logic to be more fine-g…
DropB1t Jul 14, 2024
495e748
[FIX] Changed boundaries inclusion logic, fixed ij mode comments
DropB1t Aug 26, 2024
9a8ae98
feat: Refactor archive size measurement logic for interval join
DropB1t Aug 26, 2024
e02583e
Merge pull request #7 from DropB1t/buffer-metrics-refactor
DropB1t Aug 26, 2024
9706a27
[MERGE] 'upstream/master' updates into interval-join
DropB1t Aug 26, 2024
9e42d18
[FIX] Fixed join tests files, added interval_join.hpp as doxygen inpu…
DropB1t Aug 26, 2024
671a6a2
[FIX] Redesigned comments
DropB1t Aug 26, 2024
ad634c8
[ADD] Added size in KB for mean archive size
DropB1t Aug 27, 2024
474801e
[FIX] Added purgeArchives procedure in case of Deterministic mode exe…
DropB1t Sep 3, 2024
e35eb42
[FIX] Fixed .gitignore
DropB1t Oct 14, 2024
c9e657e
[FIX] Remove WF_JOIN_MEASUREMENT macro from CMakeLists
DropB1t Oct 14, 2024
b3cc8b7
[FIX] incorrect compare function type in assert evaluation in Win Arc…
DropB1t Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# .DS_Store are banished
.DS_Store
.DS_Store
5 changes: 5 additions & 0 deletions API
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ The corresponding builder needs two parameters (for the lift and combine logics)
* Combine
__host__ __device__ void(const result_t &, const result_t &, result_t &);

Interval_Join
----------------
std::optional<result_t> (const tuple_t &, const tuple_t &)
std::optional<result_t> (const tuple_t &, const tuple_t &, RuntimeContext &)

SINK
----
void(std::optional<tuple_t> &);
Expand Down
5 changes: 3 additions & 2 deletions docs/windflow-doxygen.conf
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,9 @@ INPUT = ../wf/doxygen-mainpage.hpp \
../wf/source_shipper.hpp \
../wf/windflow.hpp \
../wf/windflow_gpu.hpp \
../wf/kafka/windflow_kafka.hpp
../wf/rocksdb/windflow_rocksdb.hpp
../wf/kafka/windflow_kafka.hpp \
../wf/rocksdb/windflow_rocksdb.hpp \
../wf/interval_join.hpp

# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses
Expand Down
5 changes: 3 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_subdirectory(graph_tests)
add_subdirectory(merge_tests)
add_subdirectory(split_tests)
add_subdirectory(win_tests)
add_subdirectory(join_tests)

# Add custom target for all the CPU tests
add_custom_target(all_cpu)
Expand All @@ -11,11 +12,11 @@ if(LIBRDKAFKA_FOUND AND LibRDKafka_LIBRARIES)
# Add the sub-folder with Kafka tests
add_subdirectory(kafka_tests)
# Add dependencies to the all_cpu target
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests kafka_tests)
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests join_tests kafka_tests)
else()
message(STATUS "librdkafka needs to be installed to generate kafka tests")
# Add dependencies to the all_cpu target
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests)
add_dependencies(all_cpu graph_tests merge_tests split_tests win_tests join_tests)
endif()

if(ROCKSDB_FOUND AND ROCKSDB_LIBRARIES)
Expand Down
33 changes: 33 additions & 0 deletions tests/join_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Compiler and flags
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS_DEBUG "-g -O0")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -g -finline-functions")

# Macros to be provided to the compiler
add_definitions(-DFF_BOUNDED_BUFFER)
# -DWF_JOIN_MEASUREMENT to enable the measurement of how uniformerly the tuples are distributed among the joiners
# -DWF_TRACING_ENABLED to enable the tracing with Dashboard

# Header files of WindFlow and FastFlow
include_directories(${PROJECT_SOURCE_DIR}/wf ${ff_root_dir})

# Linking to pthread
# cdt gvc cgraph to enable the tracing with Dashboard
link_libraries(pthread)

# Set output directory
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ../../bin/join_tests)

# Cpp files to be compiled
file(GLOB SOURCES "*.cpp")

# Add a target for each cpp file and a unique target for all the tests in this folder
add_custom_target(join_tests)

foreach(testsourcefile ${SOURCES})
get_filename_component(barename ${testsourcefile} NAME)
string(REPLACE ".cpp" "" testname ${barename})
add_executable(${testname} ${testsourcefile})
add_dependencies(join_tests ${testname})
endforeach(testsourcefile ${SOURCES})
276 changes: 276 additions & 0 deletions tests/join_tests/join_common.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/**************************************************************************************
* Copyright (c) 2023- Gabriele Mencagli and Yuriy Rymarchuk
*
* This file is part of WindFlow.
*
* WindFlow is free software dual licensed under the GNU LGPL or MIT License.
* You can redistribute it and/or modify it under the terms of the
* * GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version
* OR
* * MIT License: https://github.com/ParaGroup/WindFlow/blob/master/LICENSE.MIT
*
* WindFlow is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License and
* the MIT License along with WindFlow. If not, see <http://www.gnu.org/licenses/>
* and <http://opensource.org/licenses/MIT/>.
**************************************************************************************
*/

/*
* Data types and operator functors used by the join tests.
*/

// includes
#include<cmath>
#include<string>

using namespace std;
using namespace wf;

// Global variable for the result
atomic<long> global_sum;

// Struct of the input tuple
struct tuple_t
{
size_t key;
int64_t value;
};

#if 1
template<>
struct std::hash<tuple_t>
{
size_t operator()(const tuple_t &t) const
{
size_t h1 = std::hash<int64_t>()(t.value);
size_t h2 = std::hash<size_t>()(t.key);
return h1 ^ h2;
}
};
#endif

struct res_t
{
size_t key;
int64_t value;
size_t from;
};

// Source functor for generating positive numbers
class Source_Positive_Functor
{
private:
size_t len; // stream length per key
size_t keys; // number of keys
uint64_t next_ts; // next timestamp
bool generateWS; // true if watermarks must be generated

public:
// Constructor
Source_Positive_Functor(size_t _len,
size_t _keys,
bool _generateWS):
len(_len),
keys(_keys),
next_ts(0),
generateWS(_generateWS) {}

// operator()
void operator()(Source_Shipper<tuple_t> &shipper)
{
static thread_local std::mt19937 generator;
generator.seed(1234);
std::uniform_int_distribution<int> distribution(0, 250);
for (size_t i=1; i<=len; i++) { // generation loop
for (size_t k=1; k<=keys; k++) {
tuple_t t;
t.key = k;
t.value = i;
shipper.pushWithTimestamp(std::move(t), next_ts);
if (generateWS) {
shipper.setNextWatermark(next_ts);
}
auto offset = (distribution(generator)+1);
next_ts += offset*1000; // in ms
}
}
}
};

// Source functor for generating negative numbers
class Source_Negative_Functor
{
private:
size_t len; // stream length per key
size_t keys; // number of keys
vector<int> values; // list of values
uint64_t next_ts; // next timestamp
bool generateWS; // true if watermarks must be generated

public:
// Constructor
Source_Negative_Functor(size_t _len,
size_t _keys,
bool _generateWS):
len(_len),
keys(_keys),
values(_keys, 0),
next_ts(0),
generateWS(_generateWS) {}

// operator()
void operator()(Source_Shipper<tuple_t> &shipper)
{
static thread_local std::mt19937 generator;
generator.seed(4321);
std::uniform_int_distribution<int> distribution(0, 250);
for (size_t i=1; i<=len; i++) { // generation loop
for (size_t k=1; k<=keys; k++) {
values[k-1]--;
tuple_t t;
t.key = k;
t.value = values[k-1];
shipper.pushWithTimestamp(std::move(t), next_ts);
if (generateWS) {
shipper.setNextWatermark(next_ts);
}
auto offset = (distribution(generator)+1);
next_ts += offset*1000; // in ms
}
}
}
};

// Map functor
class Map_Functor
{
public:
// operator()
void operator()(tuple_t &t)
{
t.value = t.value + 2;
}
};

// Join functor
class Join_Functor
{
public:
// operator()
optional<tuple_t> operator()(const tuple_t &a, const tuple_t &b, RuntimeContext &rc)
{
tuple_t out;
out.value = a.value * b.value;
out.key = a.key;
return out;
}
};

// Distinct Join functor
class Distinct_Join_Functor
{
public:
// operator()
optional<tuple_t> operator()(const tuple_t &a, const tuple_t &b)
{
if (a.value != b.value) {
tuple_t out;
out.value = a.value * b.value;
out.key = a.key;
return out;
}
return {};
}
};

// Filter functor with keyby distribution
class Filter_Functor_KB
{
private:
int mod;

public:
// constructor
Filter_Functor_KB(int _mod): mod(_mod) {}

// operator()
bool operator()(tuple_t &t, RuntimeContext &rc)
{
assert(t.key % rc.getParallelism() == rc.getReplicaIndex());
if (t.value % mod == 0) {
return true;
}
else {
return false;
}
}
};

// Filter functor
class Filter_Functor
{
private:
int mod;

public:
// constructor
Filter_Functor(int _mod): mod(_mod) {}

// operator()
bool operator()(tuple_t &t)
{
if (t.value % mod == 0) {
return true;
}
else {
return false;
}
}
};

// FlatMap functor
class FlatMap_Functor
{
public:
// operator()
void operator()(const tuple_t &t, Shipper<tuple_t> &shipper)
{
for (size_t i=0; i<2; i++) {
shipper.push(t);
}
}
};

// Sink functor
class Sink_Functor
{
private:
size_t received; // counter of received results
long totalsum;

public:
// Constructor
Sink_Functor():
received(0),
totalsum(0) {}

// operator()
void operator()(optional<tuple_t> &out, RuntimeContext &rc)
{
if (out) {
received++;
totalsum += (*out).value;
size_t key = (*out).key;
int64_t value = (*out).value;
}
else {
global_sum.fetch_add(totalsum);
}
}
};
Loading