Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ WHERE ac.claim_status <> 'DENIED'
GROUP BY m.insurance_company, ac.accident_detail;
```

Finally, create a simple [dashboard in Kibana](https://www.elastic.co/guide/en/kibana/current/dashboard-create-new-dashboard.html) with a 1s refresh rate and use the (very rustic) `postgres_datagen.sql` data generator script to periodically insert some records into the Postgres source table, creating visible changes in your results:
Finally, create a simple [dashboard in Kibana](https://www.elastic.co/guide/en/kibana/7.17/dashboard.html) with a 1s refresh rate and use the (very rustic) `postgres_datagen.sql` data generator script to periodically insert some records into the Postgres source table, creating visible changes in your results:

```bash
cat ./postgres_datagen.sql | docker exec -i flink-sql-cdc_postgres_1 psql -U postgres -d postgres
cat ./postgres_datagen.sql | docker exec -i flink-sql-cdc-postgres-1 psql -U postgres -d postgres
```

![flink-sql-CDC_kibana](https://user-images.githubusercontent.com/23521087/109538607-93fbe300-7ac0-11eb-8840-2ed7b2aafa27.gif)
Expand Down
12 changes: 6 additions & 6 deletions client-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
# limitations under the License.
###############################################################################

FROM flink:1.12.1-scala_2.11
FROM flink:1.17

# Copy sql-client script
COPY sql-client/ /opt/sql-client
RUN mkdir -p /opt/sql-client/lib

# Download connector libraries
RUN wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.12.1/flink-json-1.12.1.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.1/flink-sql-connector-kafka_2.11-1.12.1.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.12.1/flink-sql-connector-elasticsearch7_2.11-1.12.1.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.12.1/flink-connector-jdbc_2.11-1.12.1.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/postgresql/postgresql/42.2.14/postgresql-42.2.14.jar;
RUN wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.17.0/flink-json-1.17.0.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.17/flink-sql-connector-kafka-3.1.0-1.17.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.2-1.17/flink-connector-jdbc-3.1.2-1.17.jar; \
wget -P /opt/sql-client/lib/ https://repo.maven.apache.org/maven2/org/postgresql/postgresql/42.7.3/postgresql-42.7.3.jar;

# Copy configuration
COPY conf/* /opt/flink/conf/
Expand Down
2 changes: 2 additions & 0 deletions client-image/conf/sql-client-init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'table';
2 changes: 1 addition & 1 deletion client-image/sql-client/sql-client.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

${FLINK_HOME}/bin/sql-client.sh embedded -d ${FLINK_HOME}/conf/sql-client-conf.yaml -l ${SQL_CLIENT_HOME}/lib
${FLINK_HOME}/bin/sql-client.sh embedded -i ${FLINK_HOME}/conf/sql-client-init.sql -l ${SQL_CLIENT_HOME}/lib
16 changes: 8 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
version: '3'
services:
zookeeper:
image: debezium/zookeeper:1.2
image: debezium/zookeeper:2.6
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.2
image: debezium/kafka:2.6
ports:
- 9092:9092
links:
Expand All @@ -25,7 +25,7 @@ services:
volumes:
- ${PWD}/postgres-image:/docker-entrypoint-initdb.d
connect:
image: debezium/connect:1.2
image: debezium/connect:2.6
ports:
- 8083:8083
links:
Expand All @@ -50,21 +50,21 @@ services:
KAFKA_BOOTSTRAP: kafka
ES_HOST: elasticsearch
jobmanager:
image: flink:1.12.1-scala_2.11
image: flink:1.17
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.12.1-scala_2.11
image: flink:1.17
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.20
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
Expand All @@ -81,6 +81,6 @@ services:
soft: 65536
hard: 65536
kibana:
image: docker.elastic.co/kibana/kibana:7.6.0
image: docker.elastic.co/kibana/kibana:7.17.20
ports:
- "5601:5601"
- "5601:5601"
4 changes: 2 additions & 2 deletions postgres-image/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM debezium/postgres:11
FROM debezium/postgres:16

COPY postgres_bootstrap.sql /docker-entrypoint-initdb.d/
COPY postgres_bootstrap.sql /docker-entrypoint-initdb.d/
6 changes: 3 additions & 3 deletions register-postgres.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "pg_claims",
"table.whitelist": "claims.accident_claims",
"topic.prefix": "pg_claims",
"table.include.list": "claims.accident_claims",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"decimal.handling.mode": "double"
}
}
}