From 544f5ce9d6721fd03bf349334db7bd6cd291a7b5 Mon Sep 17 00:00:00 2001 From: yudi0201 Date: Sun, 12 Jun 2022 14:04:08 -0400 Subject: [PATCH] add yahoo streaming benchmark in Flink --- .../src/main/java/org/streambench/Bench.java | 24 +++++++ .../main/java/org/streambench/Transform.java | 64 +++++++++++++++++++ .../main/java/org/streambench/Utility.java | 30 +++++++++ 3 files changed, 118 insertions(+) diff --git a/flink_bench/src/main/java/org/streambench/Bench.java b/flink_bench/src/main/java/org/streambench/Bench.java index 4d7ee65..7227e99 100644 --- a/flink_bench/src/main/java/org/streambench/Bench.java +++ b/flink_bench/src/main/java/org/streambench/Bench.java @@ -4,6 +4,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.streambench.Transform.AlgoTradeResult; +import org.streambench.Transform.YahooInteraction; import org.streambench.Utility.SumAggregation; import org.streambench.Utility.ConstKeySelector; import org.apache.flink.streaming.api.datastream.DataStream; @@ -47,6 +48,23 @@ public static DataStream streamGen(long size, long period, StreamExecution return with_timestamp; } + public static DataStream yahooGen(long size, long period, StreamExecutionEnvironment env) { + ArrayList source = new ArrayList(); + Random rand = new Random(); + for (int i = 0; i < size; i++) { + long userID = rand.nextInt(4) + 1; + long campaignID = rand.nextInt(4) + 1; + long event_type = rand.nextInt(4) + 1; + YahooInteraction payload = new YahooInteraction(i * period, (i + 1) * period, userID, campaignID, event_type); + source.add(payload); + } + DataStream stream = env.fromCollection(source); + WatermarkStrategy wmStrategy = WatermarkStrategy.forMonotonousTimestamps() + .withTimestampAssigner((event, timestamp) -> event.start_time); + DataStream with_timestamp = stream.assignTimestampsAndWatermarks(wmStrategy); + return with_timestamp; + } + public static void main(String[] args) throws Exception { String benchmark = (args.length > 0) ? args[0] : "select"; long size = (args.length > 1) ? Long.parseLong(args[1]) : 10000000; @@ -104,6 +122,12 @@ public Data join(Data first, Data second) { long win_size = 10000; DataStream result = Transform.Normalization(stream1, win_size); break; + case "yahoo": + win_size = 10; + long event_type = 1; + DataStream yahoo_source = yahooGen(size, period, env); + DataStream yahooResult = Transform.Yahoo(yahoo_source, win_size, event_type, period); + break; default: System.out.println("Unknown benchmark type"); } diff --git a/flink_bench/src/main/java/org/streambench/Transform.java b/flink_bench/src/main/java/org/streambench/Transform.java index 4dc41a2..b260b08 100644 --- a/flink_bench/src/main/java/org/streambench/Transform.java +++ b/flink_bench/src/main/java/org/streambench/Transform.java @@ -6,9 +6,12 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.streambench.Bench.Data; import org.streambench.Utility.SmaAggregation; +import org.streambench.Utility.YahooAggregation; import org.streambench.Utility.ZscoreAggregation; import org.streambench.Utility.ConstKeySelector; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; public class Transform { @@ -95,4 +98,65 @@ public Data join(Data left, ZScore right) { }); return results; } + + public static class YahooInteraction { + public long start_time; + public long end_time; + public long userID; + public long campaignID; + public long event_type; + + public YahooInteraction(long start_time, long end_time, long userID, long campaignID, long event_type) { + this.start_time = start_time; + this.end_time = end_time; + this.userID = userID; + this.campaignID = campaignID; + this.event_type = event_type; + } + + public String toString() { + return "start_time: " + String.valueOf(start_time) + " end_time: " + String.valueOf(end_time) + " userID: " + + String.valueOf(userID) + " campaignID: " + String.valueOf(campaignID) + " event_type: " + + String.valueOf(event_type); + } + } + + public static class ReducedYahooInteraction { + public long start_time; + public long end_time; + public long campaignID; + public long event_type; + + public ReducedYahooInteraction(long start_time, long end_time, long campaignID, long event_type) { + this.start_time = start_time; + this.end_time = end_time; + this.campaignID = campaignID; + this.event_type = event_type; + } + + public String toString() { + return "start_time: " + String.valueOf(start_time) + " end_time: " + String.valueOf(end_time) + + " campaignID: " + String.valueOf(campaignID) + " event_type: " + String.valueOf(event_type); + } + } + + static DataStream Yahoo(DataStream source, long win_size, long event_type, long period) { + DataStream result = source + .filter(new FilterFunction() { + public boolean filter(YahooInteraction data) { + return data.event_type == event_type; + } + }) + .map(new MapFunction() { + public ReducedYahooInteraction map(YahooInteraction data) { + return new ReducedYahooInteraction(data.start_time, data.end_time, + data.campaignID, data.event_type); + } + }) + .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(win_size * period))) + .aggregate(new YahooAggregation()); + + return result; + } } + diff --git a/flink_bench/src/main/java/org/streambench/Utility.java b/flink_bench/src/main/java/org/streambench/Utility.java index c5bbe98..9a5ee42 100644 --- a/flink_bench/src/main/java/org/streambench/Utility.java +++ b/flink_bench/src/main/java/org/streambench/Utility.java @@ -5,6 +5,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.streambench.Bench.Data; +import org.streambench.Transform.ReducedYahooInteraction; import org.streambench.Transform.ZScore; public class Utility { @@ -146,4 +147,33 @@ public Tuple3 merge(Tuple3 a, Tuple3 { + @Override + public Data createAccumulator() { + return new Data((long) Integer.MAX_VALUE, 0, 0); + } + + @Override + public Data add(ReducedYahooInteraction value, Data accumulator) { + accumulator.start_time = Math.min(accumulator.start_time, value.start_time); + accumulator.end_time = Math.max(accumulator.end_time, value.end_time); + accumulator.payload += 1; + return accumulator; + } + + @Override + public Data getResult(Data accumulator) { + return new Data(accumulator.start_time, accumulator.end_time, accumulator.payload); + } + + @Override + public Data merge(Data a, Data b) { + a.start_time = Math.min(a.start_time, b.start_time); + a.end_time = Math.max(a.end_time, b.end_time); + a.payload += b.payload; + return a; + } + } + }