From 35db2cc5608673c61aa267e6a350797d37cbd179 Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Fri, 14 Aug 2015 10:59:06 +0200 Subject: [PATCH 1/8] replace reduceGroup with sum aggregator --- .../exercises/dataSetScala/mailCount/MailCount.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala index 785499d..ef566b4 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala @@ -53,9 +53,11 @@ object MailCount { // extract month from time string m._1.substring(0, 7), // extract email address from sender - m._2.substring(m._2.lastIndexOf("<") + 1, m._2.length - 1) ) } + m._2.substring(m._2.lastIndexOf("<") + 1, m._2.length - 1), + // add counter to each record + 1) } // group by month and sender and count the number of records per group - .groupBy(0, 1).reduceGroup { ms => ms.foldLeft(("","",0))( (c, m) => (m._1, m._2, c._3+1)) } + .groupBy(0, 1).sum(2) // print the result .print From f184bdeacf13711842a6ea7ed76e85c8894972d8 Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Fri, 14 Aug 2015 11:20:11 +0200 Subject: [PATCH 2/8] replace ReduceGroup with sum aggregator --- .../exercises/dataSetScala/replyGraph/ReplyGraph.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala index 7323211..43d1498 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala @@ -34,8 +34,8 @@ object ReplyGraph { def main(args: Array[String]) { // parse parameters - val params = ParameterTool.fromArgs(args); - val input = params.getRequired("input"); + val params = ParameterTool.fromArgs(args) + val input = params.getRequired("input") // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment @@ -59,11 +59,11 @@ object ReplyGraph { // compute reply connections by joining on messageId and reply-to val replyConnections = addressMails - .join(addressMails).where(2).equalTo(0) { (l,r) => (l._2, r._2) } + .join(addressMails).where(2).equalTo(0) { (l,r) => (l._2, r._2, 1) } // count connections for each pair of addresses replyConnections - .groupBy(0,1).reduceGroup( cs => cs.foldLeft(("","",0))( (l,r) => (r._1, r._2, l._3+1) ) ) + .groupBy(0,1).sum(2) .print } From e8a42f2a23312d940672978ad1ef92bbc17c4cdd Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 12:15:48 +0200 Subject: [PATCH 3/8] reduce the code size for tfidf, and modify it not to use mutable objects --- .../dataSetScala/tfIdf/MailTFIDF.scala | 157 ++++++------------ 1 file changed, 47 insertions(+), 110 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index d51c199..db25d33 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -18,17 +18,12 @@ package com.dataArtisans.flinkTraining.exercises.dataSetScala.tfIdf -import java.util.StringTokenizer -import java.util.regex.Pattern - import com.dataArtisans.flinkTraining.dataSetPreparation.MBoxParser -import org.apache.flink.api.common.functions.{FlatMapFunction} +import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.util.Collector -import scala.collection.mutable.{HashMap, HashSet} - /** * Scala reference implementation for the "TF-IDF" exercise of the Flink training. * The task of the exercise is to compute the TF-IDF score for words in mails of the @@ -40,137 +35,79 @@ import scala.collection.mutable.{HashMap, HashSet} */ object MailTFIDF { - val STOP_WORDS: Array[String] = Array ( + val STOP_WORDS = List( "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", "have", "as", "can") + val WORD_PATTERN = "(\\p{Alpha})+".r + def main(args: Array[String]) { - // parse paramters - val params = ParameterTool.fromArgs(args); - val input = params.getRequired("input"); + // parse parameters + val params = ParameterTool.fromArgs(args) + val input = params.getRequired("input") // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // read messageId and body field of the input data val mails = env.readCsvFile[(String, String)]( - "/users/fhueske/data/flinkdevlistparsed/", + input, lineDelimiter = MBoxParser.MAIL_RECORD_DELIM, fieldDelimiter = MBoxParser.MAIL_FIELD_DELIM, includedFields = Array(0,4) ) // count mails in data set - val mailCnt = mails.count + val mailCnt = mails.count() // compute term-frequency (TF) - val tf = mails - .flatMap(new TFComputer(STOP_WORDS)) + val tf = mails.flatMap { + new FlatMapFunction[(String, String), (String, String, Int)] { + + def flatMap(mail: (String, String), out: Collector[(String, String, Int)]): Unit = { + // extract email id + val id = mail._1 + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !STOP_WORDS.contains(w) && WORD_PATTERN.findFirstIn(w).isDefined) + // count the number of occurrences of a word in each document + .map(m => (m, 1)).groupBy(_._1).map { + case (item, count) => (item, count.foldLeft(0)(_ + _._2)) + } + // use the same mail id for each word in the body + output.foreach(m => out.collect(id, m._1, m._2)) + } + + } + } // compute document frequency (number of mails that contain a word at least once) - val df = mails - // extract unique words from mails - .flatMap(new UniqueWordExtractor(STOP_WORDS)) - // count number of mails for each word - .groupBy(0).reduce { (l,r) => (l._1, l._2 + r._2) } + // we can reuse the tf data set, since it already contains document <-> word association + val df = tf + // add a counter + .map(m => (m._1, m._2, 1)) + // group by the words + .groupBy(1) + // count the number of documents in each group (df) + .sum(2) // compute TF-IDF score from TF, DF, and total number of mails - val tfidf = tf.join(df).where(1).equalTo(0) - { (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._2) ) } - + val tfidf = tf + .join(df) + .where(0, 1) + .equalTo(0, 1) { + (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._3)) + } + // print the result - tfidf - .print - + tfidf.writeAsText("mymethod") + env.execute() } - /** - * Computes the frequency of each word in a mail. - * Words consist only of alphabetical characters. Frequent words (stop words) are filtered out. - * - * @param stopWordsA Array of words that are filtered out. - */ - class TFComputer(stopWordsA: Array[String]) - extends FlatMapFunction[(String, String), (String, String, Int)] { - - val stopWords: HashSet[String] = new HashSet[String] - val wordCounts: HashMap[String, Int] = new HashMap[String, Int] - // initialize word pattern match for sequences of alphabetical characters - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - // initialize set of stop words - for(sw <- stopWordsA) { - this.stopWords.add(sw) - } - - override def flatMap(t: (String, String), out: Collector[(String, String, Int)]): Unit = { - // clear word counts - wordCounts.clear - - // split mail along whitespaces - val tokens = new StringTokenizer(t._2) - // for each word candidate - while (tokens.hasMoreTokens) { - // normalize word to lower case - val word = tokens.nextToken.toLowerCase - if (!stopWords.contains(word) && wordPattern.matcher(word).matches) { - // word candidate is not a stop word and matches the word pattern - // increase word count - val cnt = wordCounts.getOrElse(word, 0) - wordCounts.put(word, cnt+1) - } - } - // emit all word counts per document and word - for (wc <- wordCounts.iterator) { - out.collect( (t._1, wc._1, wc._2) ) - } - } - } - - /** - * Extracts the unique words in a mail. - * Words consist only of alphabetical characters. Frequent words (stop words) are filtered out. - * - * @param stopWordsA Array of words that are filtered out. - */ - class UniqueWordExtractor(stopWordsA: Array[String]) - extends FlatMapFunction[(String, String), (String, Int) ] { - - val stopWords: HashSet[String] = new HashSet[String] - val uniqueWords: HashSet[String] = new HashSet[String] - // initalize pattern to match words - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - // initialize set of stop words - for(sw <- stopWordsA) { - this.stopWords.add(sw) - } - - override def flatMap(t: (String, String), out: Collector[(String, Int)]): Unit = { - // clear unique words - uniqueWords.clear() - - // split mail along whitespaces - val tokens = new StringTokenizer(t._2) - // for each word candidate - while(tokens.hasMoreTokens) { - // normalize word to lower case - val word = tokens.nextToken.toLowerCase - if (!stopWords.contains(word) && wordPattern.matcher(word).matches) { - // word candiate is not a stop word and matches the word pattern - uniqueWords.add(word) - } - } - - // emit all words that occurred at least once - for(w <- uniqueWords) { - out.collect( (w, 1) ) - } - } - } - } From 97ef9aafdd6db45c55b299d4ec6221639721a755 Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 13:43:50 +0200 Subject: [PATCH 4/8] use Java Pattern instead of scala regex, fix the a bug in calculation of tfidf score --- .../dataSetScala/tfIdf/MailTFIDF.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index db25d33..036b83c 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -18,6 +18,8 @@ package com.dataArtisans.flinkTraining.exercises.dataSetScala.tfIdf +import java.util.regex.Pattern + import com.dataArtisans.flinkTraining.dataSetPreparation.MBoxParser import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.utils.ParameterTool @@ -40,7 +42,7 @@ object MailTFIDF { "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", "have", "as", "can") - val WORD_PATTERN = "(\\p{Alpha})+".r + val WORD_PATTERN: Pattern = Pattern.compile("(\\p{Alpha})+") def main(args: Array[String]) { @@ -73,7 +75,7 @@ object MailTFIDF { // split the body .split(Array(' ', '\t', '\n', '\r', '\f')) // filter out stop words and non-words - .filter(w => !STOP_WORDS.contains(w) && WORD_PATTERN.findFirstIn(w).isDefined) + .filter(w => !STOP_WORDS.contains(w) && WORD_PATTERN.matcher(w).matches()) // count the number of occurrences of a word in each document .map(m => (m, 1)).groupBy(_._1).map { case (item, count) => (item, count.foldLeft(0)(_ + _._2)) @@ -88,24 +90,28 @@ object MailTFIDF { // compute document frequency (number of mails that contain a word at least once) // we can reuse the tf data set, since it already contains document <-> word association val df = tf + // find unique document/word combination + .distinct(0,1) // add a counter - .map(m => (m._1, m._2, 1)) + .map(m => (m._2, 1)) // group by the words - .groupBy(1) + .groupBy(0) // count the number of documents in each group (df) - .sum(2) + .sum(1) // compute TF-IDF score from TF, DF, and total number of mails val tfidf = tf .join(df) - .where(0, 1) - .equalTo(0, 1) { - (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._3)) + // where "word" from tf + .where(1) + // is equal "word" from df + .equalTo(0) { + (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._2)) } // print the result - tfidf.writeAsText("mymethod") - env.execute() + tfidf.print() + } } From 6e47fcbf4fd109806a09f94faf08ed5ca1c1167c Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 14:01:56 +0200 Subject: [PATCH 5/8] move stop words and wordPattern objects inside the FlatMap function --- .../dataSetScala/tfIdf/MailTFIDF.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index 036b83c..2dd96c0 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -37,13 +37,6 @@ import org.apache.flink.util.Collector */ object MailTFIDF { - val STOP_WORDS = List( - "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", - "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", - "have", "as", "can") - - val WORD_PATTERN: Pattern = Pattern.compile("(\\p{Alpha})+") - def main(args: Array[String]) { // parse parameters @@ -67,6 +60,14 @@ object MailTFIDF { // compute term-frequency (TF) val tf = mails.flatMap { new FlatMapFunction[(String, String), (String, String, Int)] { + // stop words to be filtered out + val stopWords = List( + "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", + "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", + "have", "as", "can") + + // pattern for recognizing acceptable 'word' + val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") def flatMap(mail: (String, String), out: Collector[(String, String, Int)]): Unit = { // extract email id @@ -75,7 +76,7 @@ object MailTFIDF { // split the body .split(Array(' ', '\t', '\n', '\r', '\f')) // filter out stop words and non-words - .filter(w => !STOP_WORDS.contains(w) && WORD_PATTERN.matcher(w).matches()) + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) // count the number of occurrences of a word in each document .map(m => (m, 1)).groupBy(_._1).map { case (item, count) => (item, count.foldLeft(0)(_ + _._2)) @@ -86,7 +87,6 @@ object MailTFIDF { } } - // compute document frequency (number of mails that contain a word at least once) // we can reuse the tf data set, since it already contains document <-> word association val df = tf @@ -110,7 +110,8 @@ object MailTFIDF { } // print the result - tfidf.print() + tfidf + .print() } From 525253b84e1b42606f68ec018d0d0b9f2b28f02e Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 14:44:38 +0200 Subject: [PATCH 6/8] fix a bug which reported wrong df where document ids were not unique --- .../dataSetScala/tfIdf/MailTFIDF.scala | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index 2dd96c0..f081662 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -89,16 +89,37 @@ object MailTFIDF { } // compute document frequency (number of mails that contain a word at least once) // we can reuse the tf data set, since it already contains document <-> word association - val df = tf - // find unique document/word combination - .distinct(0,1) - // add a counter - .map(m => (m._2, 1)) + val df = mails.flatMap{ + new FlatMapFunction[(String, String), (String, Int)] { + // stop words to be filtered out + val stopWords = List( + "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", + "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", + "have", "as", "can") + + // pattern for recognizing acceptable 'word' + val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") + + def flatMap(mail: (String, String), out: Collector[(String, Int)]): Unit = { + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) + // count the number of occurrences of a word in each document + .distinct + // use the same mail id for each word in the body + output.foreach(m => out.collect(m, 1)) + } + + } + + } // group by the words .groupBy(0) // count the number of documents in each group (df) .sum(1) - + // compute TF-IDF score from TF, DF, and total number of mails val tfidf = tf .join(df) @@ -114,7 +135,6 @@ object MailTFIDF { .print() } - } From acbcfb28d1a4016f2a9f45ad0ab9d998e18f63b9 Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 14:57:03 +0200 Subject: [PATCH 7/8] moved the FlatMaps into their own classes --- .../dataSetScala/tfIdf/MailTFIDF.scala | 110 +++++++++--------- 1 file changed, 52 insertions(+), 58 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index f081662..de9767b 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -43,6 +43,14 @@ object MailTFIDF { val params = ParameterTool.fromArgs(args) val input = params.getRequired("input") + val stopWords = List( + "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", + "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", + "have", "as", "can") + + // pattern for recognizing acceptable 'word' + val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") + // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment @@ -58,63 +66,10 @@ object MailTFIDF { val mailCnt = mails.count() // compute term-frequency (TF) - val tf = mails.flatMap { - new FlatMapFunction[(String, String), (String, String, Int)] { - // stop words to be filtered out - val stopWords = List( - "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", - "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", - "have", "as", "can") - - // pattern for recognizing acceptable 'word' - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - def flatMap(mail: (String, String), out: Collector[(String, String, Int)]): Unit = { - // extract email id - val id = mail._1 - val output = mail._2.toLowerCase - // split the body - .split(Array(' ', '\t', '\n', '\r', '\f')) - // filter out stop words and non-words - .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) - // count the number of occurrences of a word in each document - .map(m => (m, 1)).groupBy(_._1).map { - case (item, count) => (item, count.foldLeft(0)(_ + _._2)) - } - // use the same mail id for each word in the body - output.foreach(m => out.collect(id, m._1, m._2)) - } - - } - } - // compute document frequency (number of mails that contain a word at least once) - // we can reuse the tf data set, since it already contains document <-> word association - val df = mails.flatMap{ - new FlatMapFunction[(String, String), (String, Int)] { - // stop words to be filtered out - val stopWords = List( - "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", - "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", - "have", "as", "can") - - // pattern for recognizing acceptable 'word' - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - def flatMap(mail: (String, String), out: Collector[(String, Int)]): Unit = { - val output = mail._2.toLowerCase - // split the body - .split(Array(' ', '\t', '\n', '\r', '\f')) - // filter out stop words and non-words - .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) - // count the number of occurrences of a word in each document - .distinct - // use the same mail id for each word in the body - output.foreach(m => out.collect(m, 1)) - } - - } + val tf = mails.flatMap(new TFComputer(stopWords, wordPattern)) - } + // compute document frequency (number of mails that contain a word at least once) + val df = mails.flatMap(new DFComputer(stopWords, wordPattern)) // group by the words .groupBy(0) // count the number of documents in each group (df) @@ -129,12 +84,51 @@ object MailTFIDF { .equalTo(0) { (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._2)) } - + // print the result tfidf .print() } - } + + class DFComputer(stopWords: List[String], wordPattern: Pattern) + extends FlatMapFunction[(String, String), (String, Int)] { + + def flatMap(mail: (String, String), out: Collector[(String, Int)]): Unit = { + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) + // count the number of occurrences of a word in each document + .distinct + // use the same mail id for each word in the body + output.foreach(m => out.collect(m, 1)) + } + + } + + class TFComputer(stopWords: List[String], wordPattern: Pattern) + extends FlatMapFunction[(String, String), (String, String, Int)] { + + def flatMap(mail: (String, String), out: Collector[(String, String, Int)]): Unit = { + // extract email id + val id = mail._1 + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) + // count the number of occurrences of a word in each document + .map(m => (m, 1)).groupBy(_._1).map { + case (item, count) => (item, count.foldLeft(0)(_ + _._2)) + } + // use the same mail id for each word in the body + output.foreach(m => out.collect(id, m._1, m._2)) + } + + } + +} From d04ab9963c85a406c3a294389c50ba9642a9cef4 Mon Sep 17 00:00:00 2001 From: Behrouz Derakhshan Date: Tue, 18 Aug 2015 15:27:59 +0200 Subject: [PATCH 8/8] improve documentation --- .../dataSetScala/tfIdf/MailTFIDF.scala | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index de9767b..8e5e62b 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -69,7 +69,7 @@ object MailTFIDF { val tf = mails.flatMap(new TFComputer(stopWords, wordPattern)) // compute document frequency (number of mails that contain a word at least once) - val df = mails.flatMap(new DFComputer(stopWords, wordPattern)) + val df = mails.flatMap(new UniqueWordExtractor(stopWords, wordPattern)) // group by the words .groupBy(0) // count the number of documents in each group (df) @@ -91,7 +91,13 @@ object MailTFIDF { } - class DFComputer(stopWords: List[String], wordPattern: Pattern) + /** + * extract list of unique words in each document + * + * @param stopWords a list of stop words that should be omitted + * @param wordPattern pattern that defines a word + */ + class UniqueWordExtractor(stopWords: List[String], wordPattern: Pattern) extends FlatMapFunction[(String, String), (String, Int)] { def flatMap(mail: (String, String), out: Collector[(String, Int)]): Unit = { @@ -102,12 +108,18 @@ object MailTFIDF { .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) // count the number of occurrences of a word in each document .distinct - // use the same mail id for each word in the body + // emit every word that appeared in a document output.foreach(m => out.collect(m, 1)) } } + /** + * Calculate term frequency for each word per document + * + * @param stopWords a list of stop words that should be omitted + * @param wordPattern pattern that defines a word + */ class TFComputer(stopWords: List[String], wordPattern: Pattern) extends FlatMapFunction[(String, String), (String, String, Int)] { @@ -123,7 +135,7 @@ object MailTFIDF { .map(m => (m, 1)).groupBy(_._1).map { case (item, count) => (item, count.foldLeft(0)(_ + _._2)) } - // use the same mail id for each word in the body + // emit the document id, a term, and its number of occurrences in the document output.foreach(m => out.collect(id, m._1, m._2)) }