diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 11:09:43 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-02 11:09:43 -0400 |
commit | 650d11817eb15c1c2a8dc322b72c753df88bf8d3 (patch) | |
tree | e28abc31ed4c9ba59691b25cf76cbcf58b0e5c6e /streaming | |
parent | ed897ac5e1d1dd2ce144b232cf7a73db2d6679f9 (diff) | |
download | spark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.tar.gz spark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.tar.bz2 spark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.zip |
Added a WordCount for external data and fixed bugs in file streams
Diffstat (limited to 'streaming')
3 files changed, 33 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala b/streaming/src/main/scala/spark/streaming/FileInputRDS.scala index dde80cd27a..ebd246823d 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputRDS.scala @@ -43,12 +43,11 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] var latestModTime = 0L def accept(path: Path): Boolean = { - if (!filter.accept(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime < lastModTime) { + if (modTime <= lastModTime) { return false } if (modTime > latestModTime) { @@ -60,10 +59,12 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] } val newFiles = fs.listStatus(directory, newFilter) - lastModTime = newFilter.latestModTime - val newRDD = new UnionRDD(ssc.sc, newFiles.map(file => - ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)) - ) + logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) + if (newFiles.length > 0) { + lastModTime = newFilter.latestModTime + } + val newRDD = new UnionRDD(ssc.sc, newFiles.map( + file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) Some(newRDD) } } diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 43d167f7db..c37fe1e9ad 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -13,7 +13,7 @@ class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging { try { val timeTaken = job.run() logInfo( - "Runnning " + job + " took " + timeTaken + " ms, " + + "Running " + job + " took " + timeTaken + " ms, " + "total delay was " + (System.currentTimeMillis - job.time) + " ms" ) } catch { diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala new file mode 100644 index 0000000000..a155630151 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala @@ -0,0 +1,25 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, SparkStreamContext} +import spark.streaming.SparkStreamContext._ + +object WordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: WordCount <master> <directory>") + System.exit(1) + } + + // Create the context and set the batch size + val ssc = new SparkStreamContext(args(0), "ExampleTwo") + ssc.setBatchDuration(Seconds(2)) + + // Create the FileInputRDS on the directory and use the + // stream to count words in new files created + val inputRDS = ssc.createTextFileStream(args(1)) + val wordsRDS = inputRDS.flatMap(_.split(" ")) + val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _) + wordCountsRDS.print() + ssc.start() + } +} |