From b2e5f60fa884f4640ecca28e0b1e0c9030e7985c Mon Sep 17 00:00:00 2001 From: TendTo Date: Wed, 19 May 2021 16:59:22 +0200 Subject: [PATCH] change: double stream from logstash and kafka to elasticsearch --- logstash_to_kafka/pipeline/logstash.conf | 6 ++++++ spark/code/twitter_stream_elastic.py | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/logstash_to_kafka/pipeline/logstash.conf b/logstash_to_kafka/pipeline/logstash.conf index a7515b3..12ac828 100644 --- a/logstash_to_kafka/pipeline/logstash.conf +++ b/logstash_to_kafka/pipeline/logstash.conf @@ -18,4 +18,10 @@ output { topic_id => "tweet" bootstrap_servers => "kafkaserver:9092" } + elasticsearch { + hosts => ["elasticsearch:9200"] + index => "taptweet" + document_id => "id_str" + doc_as_upsert => true + } } \ No newline at end of file diff --git a/spark/code/twitter_stream_elastic.py b/spark/code/twitter_stream_elastic.py index 7620fbb..b744705 100644 --- a/spark/code/twitter_stream_elastic.py +++ b/spark/code/twitter_stream_elastic.py @@ -68,7 +68,9 @@ ]) sparkConf = SparkConf().set("es.nodes", "elasticsearch") \ - .set("es.port", "9200") + .set("es.port", "9200") \ + .set("es.mapping.id", "id_str") \ + .set("es.write.operation", "upsert") sc = SparkContext(appName="TapSentiment", conf=sparkConf) spark = SparkSession(sc)