Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

.ipynb_checkpoints/*
book/_build/*
book/.ipynb_checkpoints/*
.vscode
spark/setup/*
.vscode
Empty file modified bin/RClassifyTweet.sh
100644 → 100755
Empty file.
Empty file modified bin/RHello.sh
100644 → 100755
Empty file.
Empty file modified bin/RTweetConsumer.sh
100644 → 100755
Empty file.
Empty file modified bin/elasticSearch.sh
100644 → 100755
Empty file.
Empty file modified bin/flumeNetcatExample.sh
100644 → 100755
Empty file.
Empty file modified bin/flumeTwitter.sh
100644 → 100755
Empty file.
Empty file modified bin/flumeTwitterKafka.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaCcreateConsumer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaCcreateProducer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaCreateConsumer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaCreateProducer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaCreateTopic.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaPython10linesConsumer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaPython10linesProducer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaRunStandaloneConnectTwitterFile .sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaStartServer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaStartZk.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaWordCountConsumer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaWordCountProducer.sh
100644 → 100755
Empty file.
Empty file modified bin/kafkaWordCountStream.sh
100644 → 100755
Empty file.
Empty file modified bin/kibana.sh
100644 → 100755
Empty file.
Empty file modified bin/metricbeat.sh
100644 → 100755
Empty file.
Empty file modified bin/prettyzoo.sh
100644 → 100755
Empty file.
Empty file modified bin/pyspark.sh
100644 → 100755
Empty file.
Empty file modified bin/scratch.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkBash.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkExamplePi.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkNC.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkShell.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkSubmitApps.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkSubmitPython.sh
100644 → 100755
Empty file.
Empty file modified bin/sparkTap.sh
100644 → 100755
Empty file.
Empty file modified bin/tapInit.sh
100644 → 100755
Empty file.
Empty file modified bin/webServerFlumeA.sh
100644 → 100755
Empty file.
Empty file modified bin/zkServerStart.sh
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion book/Spark Mlib.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1928,4 +1928,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}
6 changes: 3 additions & 3 deletions spark/Dockerfile
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ ENV PATH $SPARK_DIR/bin:$PATH

ADD setup/spark-${SPARK_VERSION}-bin-hadoop2.7.tgz /opt

RUN apt-get update && apt-get -y install bash python3 python-pip netcat
RUN apt-get update && apt-get -y install bash python3 python3-pip netcat

RUN pip install pyspark kafka-utils numpy elasticsearch
RUN pip3 install pyspark kafka-utils numpy elasticsearch
# Create Sym Link
RUN ln -s /opt/spark-${SPARK_VERSION}-bin-hadoop2.7 ${SPARK_DIR}

Expand All @@ -22,4 +22,4 @@ ADD apps /opt/tap/apps
ADD spark-manager.sh $SPARK_DIR/bin/spark-manager

WORKDIR ${SPARK_DIR}
ENTRYPOINT [ "spark-manager" ]
ENTRYPOINT [ "spark-manager" ]
24 changes: 24 additions & 0 deletions spark/Dockerfile-WordCount
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
FROM openjdk:8-jre

ENV PATH $SPARK_DIR/bin:$PATH
ENV SPARK_VERSION=3.1.1
ENV SPARK_DIR=/opt/spark
ENV PATH $SPARK_DIR/bin:$PATH
ARG TXT=dataset/cockatiel.csv

RUN apt update && apt -y install bash python3 python3-pip

RUN pip3 install pyspark
# Create Sym Link
ADD setup/spark-${SPARK_VERSION}-bin-hadoop2.7.tgz /opt
ADD ${TXT} /opt/tap/spark/dataset/
# Add Python Code
ADD code/wordcount.py /opt/tap/
# Add Spark Manager
ADD spark-manager-wc.sh $SPARK_DIR/bin/spark-manager-wc


RUN ln -s /opt/spark-${SPARK_VERSION}-bin-hadoop2.7 ${SPARK_DIR}

WORKDIR ${SPARK_DIR}
ENTRYPOINT [ "spark-manager-wc" ]
65 changes: 65 additions & 0 deletions spark/code/wordcount.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pyspark
import string
import time
import argparse

parser = argparse.ArgumentParser("Extended Word Counter")
parser.add_argument("-f", dest="file", required=True, help="Filepath to analyze")
parser.add_argument("-c", type=int, dest="cores", required=True, help="Number of cores used by worker")
parser.add_argument("-s", dest="separator", required=True, help="Separator used by tokenizer")

args = parser.parse_args()
master = 'local[' + str(args.cores) + ']'
conf = pyspark.SparkConf().setAppName('WordCounter').setMaster(master)
print("Spark Session Active! Appname WordCount with " + str(args.cores) + (" cores used"))
sc = pyspark.SparkContext(conf=conf)
absFilename = args.file.split("/")
relFilename = absFilename[len(absFilename) - 1]
logfile = "/opt/tap/spark/dataset/" + relFilename
print("Analyzing file " + relFilename)
logData = sc.textFile(logfile).cache()
print("RDD generated from textFile")
separator = str(args.separator)
if separator == "w|":
separator = " "
if separator == " ":
print("Separator = Whitespace")
else:
print("Separator: " + separator)
tok_start = time.perf_counter()
words = logData.flatMap(lambda x: x.split(separator)).filter(lambda x: x != "")
tok_end = time.perf_counter()
tok_time = tok_end - tok_start
print("File tokenized")
print("Time elapsed for Tokenization: " + str(tok_time * 1000) + "ms")
wc_start = time.perf_counter()
counter = words.count()
wc_end = time.perf_counter()
wc_time = wc_end - wc_start
print("Word number computed. Word count: " + str(counter))
print("Time elapsed for Word Count: " + str(wc_time * 1000) + "ms")
lc_start =time.perf_counter()
lines = logData.flatMap(lambda x: x.split("\n")).count()
lc_end = time.perf_counter()
lc_time = lc_end - lc_start
print("Lines number computed. Line count: " + str(lines))
print("Time elapsed for Line Count: " + str(lc_time * 1000) + "ms")
fr_start = time.perf_counter()
freqs = words.map(lambda x: (x, 1)).reduceByKey(lambda a,b: a + b).sortBy(lambda x: x[1], False).take(10)
fr_end = time.perf_counter()
fr_time = fr_end - fr_start
print("Frequency of words in file computed. Ten most used words:")
for x in freqs:
print(x)
print("Time elapsed for Frequency Count: " + str(fr_time * 1000) + "ms")
letters = list(string.ascii_lowercase)
hp_start = time.perf_counter()
histo = words.map(lambda x: ((x[0]).lower())).sortBy(lambda x: x).histogram(letters)
hp_end = time.perf_counter()
hp_time = hp_end - hp_start
print("Histogram of words by initial plotted. Histogram: ")
print(histo)
print("Time elapsed for Histogram Plotting: " + str(hp_time * 1000) + "ms")
total_time = tok_time + wc_time + lc_time + fr_time + hp_time
print("Total time elapsed from job start: " + str(total_time * 1000) + "ms")
sc.stop()
8 changes: 8 additions & 0 deletions spark/dataset/cockatiel.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Mutation,EyeColor,MainColor,AccentColor,Sexable
Ancestral,Brown,Grey,Yellow,True
Pied,Brown,Grey,Yellow,True
Pearl,Brown,Grey,Yellow,True
Lutino,Red,Yellow,Yellow,False
WhiteFace,Brown,Grey,White,True
Cinnamon,Brown,Grey-Brown,Yellow,True

Loading