Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/clang-format-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ jobs:
- name: Run clang-format style check for C/C++/Protobuf programs.
uses: jidicula/clang-format-action@v4.10.1
with:
clang-format-version: '13'
clang-format-version: '15'
check-path: ${{ matrix.path }}
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- name: Install g++
run: |
sudo apt update
sudo apt install -y gcc g++ libboost-all-dev
sudo apt install -y gcc g++ libboost-all-dev libgflags-dev
- name: Configure CMake
# Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make.
# See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type
Expand Down
125 changes: 67 additions & 58 deletions benchmark/src/Benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
* limitations under the License.
*/

#include <gflags/gflags.h>

#include <chrono>
#include <filesystem>

#include "Common/Context.hpp"
#include "Common/Stream.hpp"
#include "Common/Types.hpp"
Expand All @@ -23,11 +28,6 @@
#include "Utils/Flags.hpp"
#include "Utils/Logger.hpp"

#include <gflags/gflags.h>

#include <chrono>
#include <filesystem>

using namespace std;
using namespace AllianceDB;

Expand All @@ -38,7 +38,8 @@ void MetricsReport(const Param &param, const Context &ctx);
// Arguments.
void VerifyResults(const Param &param, Context &ctx, Context &ctx_v);
DEFINE_uint32(verify, 1, "Verify results");
DEFINE_uint32(algo, 1, "Join algo");//"LWJ", "HandshakeJoin", "SplitJoin", "IBWJ", "HashJoin", "SplitJoinOrigin"
DEFINE_uint32(algo, 1, "Join algo"); //"LWJ", "HandshakeJoin", "SplitJoin",
//"IBWJ", "HashJoin", "SplitJoinOrigin"
DEFINE_uint32(window_length, 500, "Window size");
DEFINE_uint32(sliding_size, 50, "Sliding length");
DEFINE_uint32(lazy, 0, "Lazy size");
Expand All @@ -51,75 +52,83 @@ DEFINE_uint32(num_threads, 2, "Number of workers");
* @brief This is the main entry point of the entire program.
* We use this as the entry point for benchmarking.
*/
int main(int argc, char **argv) {
Param param;
gflags::ParseCommandLineFlags(&argc, &argv, true);
param = GetParam(argv, param);
int main(int argc, char **argv)
{
Param param;
gflags::ParseCommandLineFlags(&argc, &argv, true);
param = GetParam(argv, param);

StreamPtr R = make_shared<Stream>(param, StreamType::R);
StreamPtr S = make_shared<Stream>(param, StreamType::S);
StreamPtr R = make_shared<Stream>(param, StreamType::R);
StreamPtr S = make_shared<Stream>(param, StreamType::S);

R->Load();
S->Load();
R->Load();
S->Load();

param.num_tuples = min(R->Tuples().size(), S->Tuples().size());
param.num_windows = (param.num_tuples - param.window_length) / param.sliding_size
+ 1;//the total number of windows depends on the sliding_size.
param.num_tuples = min(R->Tuples().size(), S->Tuples().size());
param.num_windows = (param.num_tuples - param.window_length) / param.sliding_size +
1; // the total number of windows depends on the sliding_size.

// Check if there is no remainder
bool hasNoRemainder = (param.num_tuples - param.window_length) % param.sliding_size == 0;
// Check if there is no remainder
bool hasNoRemainder = (param.num_tuples - param.window_length) % param.sliding_size == 0;

// Print the result
INFO("No remainder: %s", hasNoRemainder ? "true" : "false");
// Print the result
INFO("No remainder: %s", hasNoRemainder ? "true" : "false");

Context ctx(param, R, S);
Context ctx_v(param, R, S);
Context ctx(param, R, S);
Context ctx_v(param, R, S);

param.Print();
param.Print();

switch (param.algo) {
switch (param.algo)
{
case AlgoType::HandshakeJoin:
case AlgoType::SplitJoin:
case AlgoType::SplitJoinOrigin: {
auto engine = make_unique<EagerEngine>(param, ctx);
engine->Run(ctx);
break;
case AlgoType::SplitJoinOrigin:
{
auto engine = make_unique<EagerEngine>(param, ctx);
engine->Run(ctx);
break;
}
default:
{
FATAL("algo not supported")
}
default: {
FATAL("algo not supported")
}
}

MetricsReport(param, ctx);
VerifyResults(param, ctx, ctx_v);
MetricsReport(param, ctx);
VerifyResults(param, ctx, ctx_v);

return 0;
return 0;
}
void VerifyResults(const Param &param, Context &ctx, Context &ctx_v) {
if (param.verify) {
auto engine = make_unique<VerifyEngine>(param);
engine->Run(ctx_v);
auto rt = ctx_v.joinResults->Compare(ctx.joinResults);
INFO("Results verified to be accurate: %s", (rt == 0) ? "true" : "false")
}
void VerifyResults(const Param &param, Context &ctx, Context &ctx_v)
{
if (param.verify)
{
auto engine = make_unique<VerifyEngine>(param);
engine->Run(ctx_v);
auto rt = ctx_v.joinResults->Compare(ctx.joinResults);
INFO("Results verified to be accurate: %s", (rt == 0) ? "true" : "false")
}
}

void MetricsReport(const Param &param, const Context &ctx) {
auto duration = chrono::duration_cast<chrono::nanoseconds>(ctx.endTime - ctx.startTime);
cout << "time_ms: " << duration.count() / 1e6 << endl;
cout << "tps: " << param.num_tuples * 2 * 1e9 / duration.count() << endl;
void MetricsReport(const Param &param, const Context &ctx)
{
auto duration = chrono::duration_cast<chrono::nanoseconds>(ctx.endTime - ctx.startTime);
cout << "time_ms: " << duration.count() / 1e6 << endl;
cout << "tps: " << param.num_tuples * 2 * 1e9 / duration.count() << endl;
}

Param &GetParam(char *const *argv, Param &param) {
param.verify = FLAGS_verify;
param.algo = static_cast<AlgoType>(FLAGS_algo);
param.window_length = FLAGS_window_length;
param.sliding_size = FLAGS_sliding_size;
param.lazy = FLAGS_lazy;
param.rate = FLAGS_rate;
param.num_threads = FLAGS_num_threads;
param.r = FLAGS_r;
param.s = FLAGS_s;
param.bin_dir = filesystem::weakly_canonical(filesystem::path(argv[0])).parent_path();
return param;
Param &GetParam(char *const *argv, Param &param)
{
param.verify = FLAGS_verify;
param.algo = static_cast<AlgoType>(FLAGS_algo);
param.window_length = FLAGS_window_length;
param.sliding_size = FLAGS_sliding_size;
param.lazy = FLAGS_lazy;
param.rate = FLAGS_rate;
param.num_threads = FLAGS_num_threads;
param.r = FLAGS_r;
param.s = FLAGS_s;
param.bin_dir = filesystem::weakly_canonical(filesystem::path(argv[0])).parent_path();
return param;
}
21 changes: 12 additions & 9 deletions include/Common/Context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
#include "Common/Stream.hpp"
#include "Utils/Timer.hpp"

namespace AllianceDB {
struct Context {
const Param &param;
ResultPtr joinResults;
StreamPtr streamR, streamS;
namespace AllianceDB
{
struct Context
{
const Param &param;
ResultPtr joinResults;
StreamPtr streamR, streamS;

std::chrono::high_resolution_clock::time_point startTime;
std::chrono::high_resolution_clock::time_point endTime;
std::chrono::high_resolution_clock::time_point startTime;
std::chrono::high_resolution_clock::time_point endTime;

Context(const Param &param, StreamPtr R, StreamPtr S)
: param(param), joinResults(std::make_shared<JoinResult>(param)), streamR(R), streamS(S) {}
Context(const Param &param, StreamPtr R, StreamPtr S)
: param(param), joinResults(std::make_shared<JoinResult>(param)), streamR(R), streamS(S)
{}
};

} // namespace AllianceDB
Expand Down
49 changes: 26 additions & 23 deletions include/Common/Param.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,32 @@

#include <iostream>

namespace AllianceDB {
struct Param {
uint32_t verify = 1;
AlgoType algo = AlgoType::SplitJoin;
std::string bin_dir;
uint32 window_length;
uint32 sliding_size;
uint32 rate;
uint32 num_threads;
uint32 lazy;
uint32 num_tuples = 0;
uint32 num_windows = 0;
std::string r = "Test1-R.txt", s = "Test1-S.txt";
void Print() {
std::cout << "algo: " << algo_names[static_cast<uint32>(algo)] << std::endl;
std::cout << "num_tuples: " << num_tuples << std::endl;
std::cout << "window: " << window_length << std::endl;
std::cout << "sliding_size: " << sliding_size << std::endl;
std::cout << "rate: " << rate << std::endl;
std::cout << "num_threads: " << num_threads << std::endl;
std::cout << "r: " << r << std::endl;
std::cout << "s: " << s << std::endl;
}
namespace AllianceDB
{
struct Param
{
uint32_t verify = 1;
AlgoType algo = AlgoType::SplitJoin;
std::string bin_dir;
uint32 window_length;
uint32 sliding_size;
uint32 rate;
uint32 num_threads;
uint32 lazy;
uint32 num_tuples = 0;
uint32 num_windows = 0;
std::string r = "Test1-R.txt", s = "Test1-S.txt";
void Print()
{
std::cout << "algo: " << algo_names[static_cast<uint32>(algo)] << std::endl;
std::cout << "num_tuples: " << num_tuples << std::endl;
std::cout << "window: " << window_length << std::endl;
std::cout << "sliding_size: " << sliding_size << std::endl;
std::cout << "rate: " << rate << std::endl;
std::cout << "num_threads: " << num_threads << std::endl;
std::cout << "r: " << r << std::endl;
std::cout << "s: " << s << std::endl;
}
};

} // namespace AllianceDB
Expand Down
3 changes: 2 additions & 1 deletion include/Common/Result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ struct JoinResult
void Print();
bool operator==(JoinResult &rhs) const;
size_t Compare(std::shared_ptr<JoinResult> join_result);
int CompareWindow(std::vector<ResultTuple> &window_results_verify, std::vector<ResultTuple> &window_results);
int CompareWindow(std::vector<ResultTuple> &window_results_verify,
std::vector<ResultTuple> &window_results);
};

using ResultPtr = std::shared_ptr<JoinResult>;
Expand Down
1 change: 1 addition & 0 deletions include/Common/Types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum class AlgoType
IBWJ = 3,
HashJoin = 4,
SplitJoinOrigin = 5,
Verify = 999,
};

constexpr std::string_view algo_names[32] = {
Expand Down
20 changes: 11 additions & 9 deletions include/Engine/EagerEngine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@
#include "Common/Window.h"
#include "Join/Join.hpp"

namespace AllianceDB {
class EagerEngine {
public:
EagerEngine(const Param &param, Context &ctx);
void Run(Context &ctx);
namespace AllianceDB
{
class EagerEngine
{
public:
EagerEngine(const Param &param, Context &ctx);
void Run(Context &ctx);

private:
const Param &param;
std::vector<JoinerPtr> Joiners;
JoinerPtr Joiner();
private:
const Param &param;
std::vector<JoinerPtr> Joiners;
JoinerPtr Joiner();
};

} // namespace AllianceDB
Expand Down
17 changes: 9 additions & 8 deletions include/Engine/VerifyEngine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@

#include <thread>

namespace AllianceDB {
namespace AllianceDB
{
typedef std::shared_ptr<class VerifyEngine> VerifyEnginePtr;

class VerifyEngine {
private:
const Param &param;
class VerifyEngine
{
private:
const Param &param;

public:
VerifyEngine(const Param &param);

void Run(Context &ctx);
public:
VerifyEngine(const Param &param);

void Run(Context &ctx);
};

} // namespace AllianceDB
Expand Down
16 changes: 9 additions & 7 deletions include/Join/Join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include "Common/Result.hpp"
#include "Common/Window.h"

namespace AllianceDB {
//Note: every joinAlgo maintains multiple JoinCores.
class JoinAlgo {
public:
virtual void Feed(TuplePtr tuple) = 0;
virtual void Wait() = 0;
virtual void Start(Context &ctx) = 0;
namespace AllianceDB
{
// Note: every joinAlgo maintains multiple JoinCores.
class JoinAlgo
{
public:
virtual void Feed(TuplePtr tuple) = 0;
virtual void Wait() = 0;
virtual void Start(Context &ctx) = 0;
};

using JoinerPtr = std::shared_ptr<JoinAlgo>;
Expand Down
Loading