Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flink_bench/src/main/java/org/streambench/Bench.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.streambench.Transform.AlgoTradeResult;
import org.streambench.Transform.YahooInteraction;
import org.streambench.Utility.SumAggregation;
import org.streambench.Utility.ConstKeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -47,6 +48,23 @@ public static DataStream<Data> streamGen(long size, long period, StreamExecution
return with_timestamp;
}

public static DataStream<YahooInteraction> yahooGen(long size, long period, StreamExecutionEnvironment env) {
ArrayList<YahooInteraction> source = new ArrayList<YahooInteraction>();
Random rand = new Random();
for (int i = 0; i < size; i++) {
long userID = rand.nextInt(4) + 1;
long campaignID = rand.nextInt(4) + 1;
long event_type = rand.nextInt(4) + 1;
YahooInteraction payload = new YahooInteraction(i * period, (i + 1) * period, userID, campaignID, event_type);
source.add(payload);
}
DataStream<YahooInteraction> stream = env.fromCollection(source);
WatermarkStrategy<YahooInteraction> wmStrategy = WatermarkStrategy.<YahooInteraction>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.start_time);
DataStream<YahooInteraction> with_timestamp = stream.assignTimestampsAndWatermarks(wmStrategy);
return with_timestamp;
}

public static void main(String[] args) throws Exception {
String benchmark = (args.length > 0) ? args[0] : "select";
long size = (args.length > 1) ? Long.parseLong(args[1]) : 10000000;
Expand Down Expand Up @@ -104,6 +122,12 @@ public Data join(Data first, Data second) {
long win_size = 10000;
DataStream<Data> result = Transform.Normalization(stream1, win_size);
break;
case "yahoo":
win_size = 10;
long event_type = 1;
DataStream<YahooInteraction> yahoo_source = yahooGen(size, period, env);
DataStream<Data> yahooResult = Transform.Yahoo(yahoo_source, win_size, event_type, period);
break;
default:
System.out.println("Unknown benchmark type");
}
Expand Down
64 changes: 64 additions & 0 deletions flink_bench/src/main/java/org/streambench/Transform.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.streambench.Bench.Data;
import org.streambench.Utility.SmaAggregation;
import org.streambench.Utility.YahooAggregation;
import org.streambench.Utility.ZscoreAggregation;
import org.streambench.Utility.ConstKeySelector;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;

public class Transform {
Expand Down Expand Up @@ -95,4 +98,65 @@ public Data join(Data left, ZScore right) {
});
return results;
}

public static class YahooInteraction {
public long start_time;
public long end_time;
public long userID;
public long campaignID;
public long event_type;

public YahooInteraction(long start_time, long end_time, long userID, long campaignID, long event_type) {
this.start_time = start_time;
this.end_time = end_time;
this.userID = userID;
this.campaignID = campaignID;
this.event_type = event_type;
}

public String toString() {
return "start_time: " + String.valueOf(start_time) + " end_time: " + String.valueOf(end_time) + " userID: "
+ String.valueOf(userID) + " campaignID: " + String.valueOf(campaignID) + " event_type: "
+ String.valueOf(event_type);
}
}

public static class ReducedYahooInteraction {
public long start_time;
public long end_time;
public long campaignID;
public long event_type;

public ReducedYahooInteraction(long start_time, long end_time, long campaignID, long event_type) {
this.start_time = start_time;
this.end_time = end_time;
this.campaignID = campaignID;
this.event_type = event_type;
}

public String toString() {
return "start_time: " + String.valueOf(start_time) + " end_time: " + String.valueOf(end_time)
+ " campaignID: " + String.valueOf(campaignID) + " event_type: " + String.valueOf(event_type);
}
}

static DataStream<Data> Yahoo(DataStream<YahooInteraction> source, long win_size, long event_type, long period) {
DataStream<Data> result = source
.filter(new FilterFunction<YahooInteraction>() {
public boolean filter(YahooInteraction data) {
return data.event_type == event_type;
}
})
.map(new MapFunction<YahooInteraction, ReducedYahooInteraction>() {
public ReducedYahooInteraction map(YahooInteraction data) {
return new ReducedYahooInteraction(data.start_time, data.end_time,
data.campaignID, data.event_type);
}
})
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(win_size * period)))
.aggregate(new YahooAggregation());

return result;
}
}

30 changes: 30 additions & 0 deletions flink_bench/src/main/java/org/streambench/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.streambench.Bench.Data;
import org.streambench.Transform.ReducedYahooInteraction;
import org.streambench.Transform.ZScore;

public class Utility {
Expand Down Expand Up @@ -146,4 +147,33 @@ public Tuple3<Data, Float, Long> merge(Tuple3<Data, Float, Long> a, Tuple3<Data,
return a;
}
}

public static class YahooAggregation implements AggregateFunction<ReducedYahooInteraction, Data, Data> {
@Override
public Data createAccumulator() {
return new Data((long) Integer.MAX_VALUE, 0, 0);
}

@Override
public Data add(ReducedYahooInteraction value, Data accumulator) {
accumulator.start_time = Math.min(accumulator.start_time, value.start_time);
accumulator.end_time = Math.max(accumulator.end_time, value.end_time);
accumulator.payload += 1;
return accumulator;
}

@Override
public Data getResult(Data accumulator) {
return new Data(accumulator.start_time, accumulator.end_time, accumulator.payload);
}

@Override
public Data merge(Data a, Data b) {
a.start_time = Math.min(a.start_time, b.start_time);
a.end_time = Math.max(a.end_time, b.end_time);
a.payload += b.payload;
return a;
}
}

}