From 37e734d169d3f11b0d5d832411093d49f4668d7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Fri, 4 Nov 2016 14:40:41 +0100 Subject: [PATCH 1/3] Updating to the Cascading 3.2-wip API + flink 1.1.3 --- pom.xml | 4 ++-- .../cascading/planner/FlinkFlowStepJob.java | 24 +++++++++++++++---- .../boundaryStages/BoundaryInStage.java | 4 ++-- .../boundaryStages/BoundaryOutStage.java | 2 +- .../bufferJoin/CoGroupBufferInGate.java | 8 ++----- .../coGroup/regularJoin/CoGroupInGate.java | 8 ++----- .../runtime/groupBy/GroupByInGate.java | 8 ++----- .../runtime/hashJoin/HashJoinGate.java | 4 ++-- .../runtime/hashJoin/JoinBoundaryInStage.java | 4 ++-- .../hashJoin/JoinBoundaryMapperInStage.java | 4 ++-- .../runtime/sink/SinkBoundaryInStage.java | 4 ++-- .../source/SingleOutBoundaryStage.java | 2 +- .../runtime/source/TapSourceStage.java | 2 +- .../runtime/stats/AccumulatorCache.java | 6 ++--- 14 files changed, 43 insertions(+), 41 deletions(-) diff --git a/pom.xml b/pom.xml index c720a98..f9fe421 100644 --- a/pom.xml +++ b/pom.xml @@ -52,8 +52,8 @@ limitations under the License. - 3.1.0 - 1.0.3 + 3.2.0-wip-6 + 1.1.3 1.7.7 UTF-8 diff --git a/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java b/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java index 2e02d7f..4d4d5ba 100644 --- a/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java +++ b/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java @@ -28,10 +28,11 @@ import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.OptimizerPlanEnvironment; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.JobWithJars; -import org.apache.flink.client.program.OptimizerPlanEnvironment; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.Path; import org.apache.flink.optimizer.DataStatistics; @@ -68,7 +69,7 @@ public class FlinkFlowStepJob extends FlowStepJob private final Configuration currentConf; - private Client client; + private ClusterClient client; private JobID jobID; private Throwable jobException; @@ -169,7 +170,20 @@ else if (FlinkConfigConstants.EXECUTION_MODE_PIPELINED.equals(execMode)) { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, localCluster.hostname()); - client = new Client(config); + final org.apache.flink.configuration.Configuration tmpConfig = localCluster.generateConfiguration(localCluster.configuration()); + + final int resourceManagerPort = tmpConfig.getInteger( + ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT); + + final int jobManagerPort = tmpConfig + .getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); + + flowStep.logWarn("Using local cluster at " + localCluster.hostname() + " RM port: " + resourceManagerPort + " JM port: " + jobManagerPort); + + client = new StandaloneClusterClient(config); client.setPrintStatusDuringExecution(env.getConfig().isSysoutLoggingEnabled()); } else if (isRemoteExecution()) { @@ -206,7 +220,7 @@ else if (FlinkConfigConstants.EXECUTION_MODE_PIPELINED.equals(execMode)) { final Callable callable = new Callable() { @Override public JobSubmissionResult call() throws Exception { - return client.runBlocking(jobGraph, loader); + return client.run(jobGraph, loader); } }; diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryInStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryInStage.java index 17eb818..a27822b 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryInStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryInStage.java @@ -37,7 +37,7 @@ public BoundaryInStage(FlowProcess flowProcess, FlowElement flowElement) { } @Override - public void receive(Duct previous, Void v) { + public void receive(Duct previous, int ordinal, Void v) { throw new UnsupportedOperationException( "use run() instead" ); } @@ -80,7 +80,7 @@ public void run(Object input) throws Throwable { continue; } - next.receive( this, tupleEntry ); + next.receive( this, 0, tupleEntry ); } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryOutStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryOutStage.java index 3ab4d0e..0b751c6 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryOutStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/boundaryStages/BoundaryOutStage.java @@ -40,7 +40,7 @@ public void setTupleCollector(Collector tupleCollector) { } @Override - public void receive(Duct previous, TupleEntry tupleEntry) { + public void receive(Duct previous, int ordinal, TupleEntry tupleEntry) { this.tupleCollector.collect(tupleEntry.getTuple()); } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java b/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java index c6b3ebc..2ec8207 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java @@ -50,10 +50,6 @@ public void bind( StreamGraph streamGraph ) if( role != IORole.sink ) { next = getNextFor(streamGraph); } - - if( role == IORole.sink ) { - setOrdinalMap(streamGraph); - } } @@ -81,7 +77,7 @@ public void start( Duct previous ) } } - public void receive( Duct previous, TupleEntry incomingEntry ) { + public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) { throw new UnsupportedOperationException("Receive not implemented for CoGroupBufferInGate."); } @@ -107,7 +103,7 @@ public void run(Object input) { keyEntry.setTuple( this.closure.getGroupTuple(key) ); - next.receive( this, grouping ); + next.receive( this, 0, grouping ); } @Override diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/regularJoin/CoGroupInGate.java b/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/regularJoin/CoGroupInGate.java index a7900f8..f98570a 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/regularJoin/CoGroupInGate.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/coGroup/regularJoin/CoGroupInGate.java @@ -48,10 +48,6 @@ public void bind( StreamGraph streamGraph ) if( role != IORole.sink ) { next = getNextFor(streamGraph); } - - if( role == IORole.sink ) { - setOrdinalMap(streamGraph); - } } @@ -97,7 +93,7 @@ public void start( Duct previous ) { } } - public void receive( Duct previous, TupleEntry incomingEntry ) { + public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) { throw new UnsupportedOperationException("Receive not implemented for CoGroupInGate."); } @@ -118,7 +114,7 @@ public void run(Object input) { tupleEntryIterator.reset(resultIterator); keyEntry.setTuple( this.closure.getGroupTuple(null) ); - next.receive( this, grouping ); + next.receive( this, 0, grouping ); } @Override diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/groupBy/GroupByInGate.java b/src/main/java/com/dataartisans/flink/cascading/runtime/groupBy/GroupByInGate.java index 926f35b..fa9a3fb 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/groupBy/GroupByInGate.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/groupBy/GroupByInGate.java @@ -49,10 +49,6 @@ public void bind( StreamGraph streamGraph ) { if( role != IORole.sink ) { next = getNextFor(streamGraph); } - - if( role == IORole.sink ) { - setOrdinalMap(streamGraph); - } } @@ -80,7 +76,7 @@ public void start( Duct previous ) { } } - public void receive( Duct previous, TupleEntry incomingEntry ) { + public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) { throw new UnsupportedOperationException("Receive not implemented for GroupByInGate."); } @@ -110,7 +106,7 @@ public void run(Object input) { Tuple groupTuple = keyPeekingIt.peekNextKey(); keyEntry.setTuple( groupTuple ); - next.receive( this, grouping ); + next.receive( this, 0, grouping ); } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/HashJoinGate.java b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/HashJoinGate.java index f74a5c4..8eb532b 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/HashJoinGate.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/HashJoinGate.java @@ -77,14 +77,14 @@ public void prepare() { } @Override - public void receive(Duct previous, Tuple2 t) { + public void receive(Duct previous, int ordinal, Tuple2 t) { closure.reset(t); entryIterator.reset(joiner.getIterator(closure)); while(entryIterator.hasNext()) { - next.receive(this, entryIterator.next()); + next.receive(this, ordinal, entryIterator.next()); } } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryInStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryInStage.java index 1a71a99..027cd9f 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryInStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryInStage.java @@ -33,7 +33,7 @@ public JoinBoundaryInStage(FlowProcess flowProcess, FlowElement flowElement) { } @Override - public void receive(Duct previous, Void v) { + public void receive(Duct previous, int ordinal, Void v) { throw new UnsupportedOperationException( "use run() instead" ); } @@ -65,6 +65,6 @@ public void run(Object input) throws Throwable { flowProcess.increment( StepCounters.Tuples_Read, 1 ); flowProcess.increment( SliceCounters.Tuples_Read, 1 ); - next.receive(this, joinInputTuples); + next.receive(this, 0, joinInputTuples); } } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryMapperInStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryMapperInStage.java index 0e8324d..3910c9e 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryMapperInStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/hashJoin/JoinBoundaryMapperInStage.java @@ -37,7 +37,7 @@ public JoinBoundaryMapperInStage(FlowProcess flowProcess, FlowElement flowElemen } @Override - public void receive(Duct previous, Void v) { + public void receive(Duct previous, int ordinal, Void v) { throw new UnsupportedOperationException( "use run() instead" ); } @@ -73,7 +73,7 @@ public void run(Object input) throws Throwable { continue; } - next.receive( this, joinListTuple ); + next.receive( this, 0, joinListTuple ); } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/sink/SinkBoundaryInStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/sink/SinkBoundaryInStage.java index 830c90d..09edb75 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/sink/SinkBoundaryInStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/sink/SinkBoundaryInStage.java @@ -54,7 +54,7 @@ public SinkBoundaryInStage(FlowProcess flowProcess, FlowElement flowElement, Flo } @Override - public void receive(Duct previous, Void v) { + public void receive(Duct previous, int ordinal, Void v) { throw new UnsupportedOperationException( "use run() instead" ); } @@ -82,7 +82,7 @@ public void run(Object input) throws Throwable { handleException(new DuctException("internal error", throwable), null); } - next.receive( this, tupleEntry ); + next.receive( this, 0, tupleEntry ); } } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/source/SingleOutBoundaryStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/source/SingleOutBoundaryStage.java index 6599c38..8ac5511 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/source/SingleOutBoundaryStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/source/SingleOutBoundaryStage.java @@ -34,7 +34,7 @@ public SingleOutBoundaryStage(FlowProcess flowProcess, FlowElement flowElement) } @Override - public void receive(Duct prev, TupleEntry entry) { + public void receive(Duct prev, int ordinal, TupleEntry entry) { if(this.nextTuple == null) { this.nextTuple = entry.getTuple(); diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/source/TapSourceStage.java b/src/main/java/com/dataartisans/flink/cascading/runtime/source/TapSourceStage.java index 558a0a2..50ba205 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/source/TapSourceStage.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/source/TapSourceStage.java @@ -86,7 +86,7 @@ public boolean readNextRecord() throws Throwable { continue; } - next.receive(this, tupleEntry); + next.receive(this, 0, tupleEntry); hasNext = true; break; } diff --git a/src/main/java/com/dataartisans/flink/cascading/runtime/stats/AccumulatorCache.java b/src/main/java/com/dataartisans/flink/cascading/runtime/stats/AccumulatorCache.java index 22a24f6..ccdba3e 100644 --- a/src/main/java/com/dataartisans/flink/cascading/runtime/stats/AccumulatorCache.java +++ b/src/main/java/com/dataartisans/flink/cascading/runtime/stats/AccumulatorCache.java @@ -18,7 +18,7 @@ import org.apache.flink.api.common.JobID; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ public class AccumulatorCache { private JobID jobID; - private Client client; + private ClusterClient client; private volatile Map currentAccumulators = Collections.emptyMap(); @@ -80,7 +80,7 @@ public void setJobID(JobID jobID) { this.jobID = jobID; } - public void setClient(Client client) { + public void setClient(ClusterClient client) { this.client = client; } From 69c65f06e495de05a0ea7bf2f62e4b78e2d0ccb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Fri, 4 Nov 2016 14:48:25 +0100 Subject: [PATCH 2/3] direct dependency to metrics-core no longer necessary (transitive from Flink) --- pom.xml | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index f9fe421..d4ea498 100644 --- a/pom.xml +++ b/pom.xml @@ -204,14 +204,7 @@ limitations under the License. jar test - - - com.codahale.metrics - metrics-core - 3.0.2 - jar - - + org.mockito mockito-all From 47e3bec0efaf2ce329df94e5568571dd7ea708d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cyrille=20Ch=C3=A9p=C3=A9lov=20=28TP12=29?= Date: Fri, 4 Nov 2016 15:58:23 +0100 Subject: [PATCH 3/3] extra ctor argument to enable scalding-fabric-flink to use a non-singleton Environment\n\nor rather, it'll probably still be a singleton but explicitly --- .../dataartisans/flink/cascading/FlinkConnector.java | 11 ++++++++++- .../flink/cascading/planner/FlinkPlanner.java | 8 ++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/dataartisans/flink/cascading/FlinkConnector.java b/src/main/java/com/dataartisans/flink/cascading/FlinkConnector.java index 92702c0..7916d1d 100644 --- a/src/main/java/com/dataartisans/flink/cascading/FlinkConnector.java +++ b/src/main/java/com/dataartisans/flink/cascading/FlinkConnector.java @@ -46,6 +46,7 @@ import com.dataartisans.flink.cascading.planner.rules.BoundaryAfterSourceTapTransformer; import com.dataartisans.flink.cascading.planner.rules.DoubleBoundaryRemovalTransformer; import com.dataartisans.flink.cascading.planner.rules.TopDownSplitBoundariesNodePartitioner; +import org.apache.flink.api.java.ExecutionEnvironment; import java.util.ArrayList; import java.util.List; @@ -56,12 +57,20 @@ public class FlinkConnector extends FlowConnector { List classPath = new ArrayList(); + private ExecutionEnvironment env; + public FlinkConnector() { this(new Properties()); } public FlinkConnector(Map properties) { + this(ExecutionEnvironment.getExecutionEnvironment(), properties); + } + + public FlinkConnector(ExecutionEnvironment env, Map properties) { + super(properties); + this.env = env; } @Override @@ -71,7 +80,7 @@ protected Class getDefaultIntermediateSchemeClass() { @Override protected FlowPlanner createFlowPlanner() { - return new FlinkPlanner(classPath); + return new FlinkPlanner(env, classPath); } @Override diff --git a/src/main/java/com/dataartisans/flink/cascading/planner/FlinkPlanner.java b/src/main/java/com/dataartisans/flink/cascading/planner/FlinkPlanner.java index 885dbe5..118fe81 100644 --- a/src/main/java/com/dataartisans/flink/cascading/planner/FlinkPlanner.java +++ b/src/main/java/com/dataartisans/flink/cascading/planner/FlinkPlanner.java @@ -47,15 +47,19 @@ public class FlinkPlanner extends FlowPlanner { private List classPath; - private ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + private ExecutionEnvironment env; - public FlinkPlanner(List classPath) { + public FlinkPlanner(List classPath) { this(ExecutionEnvironment.getExecutionEnvironment(), classPath); } + + public FlinkPlanner(ExecutionEnvironment env, List classPath) { super(); + this.env = env; this.classPath = classPath; env.getConfig().disableSysoutLogging(); if (env.getParallelism() <= 0) { // load the default parallelism from config + GlobalConfiguration.loadConfiguration(new File(CliFrontend.getConfigurationDirectoryFromEnv()).getAbsolutePath()); org.apache.flink.configuration.Configuration configuration = GlobalConfiguration.getConfiguration(); int parallelism = configuration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, -1);