aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 11:09:43 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-02 11:09:43 -0400
commit650d11817eb15c1c2a8dc322b72c753df88bf8d3 (patch)
treee28abc31ed4c9ba59691b25cf76cbcf58b0e5c6e /streaming
parented897ac5e1d1dd2ce144b232cf7a73db2d6679f9 (diff)
downloadspark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.tar.gz
spark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.tar.bz2
spark-650d11817eb15c1c2a8dc322b72c753df88bf8d3.zip
Added a WordCount for external data and fixed bugs in file streams
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputRDS.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount.scala25
3 files changed, 33 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala b/streaming/src/main/scala/spark/streaming/FileInputRDS.scala
index dde80cd27a..ebd246823d 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputRDS.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputRDS.scala
@@ -43,12 +43,11 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V]
var latestModTime = 0L
def accept(path: Path): Boolean = {
-
if (!filter.accept(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime < lastModTime) {
+ if (modTime <= lastModTime) {
return false
}
if (modTime > latestModTime) {
@@ -60,10 +59,12 @@ class FileInputRDS[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V]
}
val newFiles = fs.listStatus(directory, newFilter)
- lastModTime = newFilter.latestModTime
- val newRDD = new UnionRDD(ssc.sc, newFiles.map(file =>
- ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))
- )
+ logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
+ if (newFiles.length > 0) {
+ lastModTime = newFilter.latestModTime
+ }
+ val newRDD = new UnionRDD(ssc.sc, newFiles.map(
+ file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))
Some(newRDD)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 43d167f7db..c37fe1e9ad 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -13,7 +13,7 @@ class JobManager(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo(
- "Runnning " + job + " took " + timeTaken + " ms, " +
+ "Running " + job + " took " + timeTaken + " ms, " +
"total delay was " + (System.currentTimeMillis - job.time) + " ms"
)
} catch {
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
new file mode 100644
index 0000000000..a155630151
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount.scala
@@ -0,0 +1,25 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, SparkStreamContext}
+import spark.streaming.SparkStreamContext._
+
+object WordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCount <master> <directory>")
+ System.exit(1)
+ }
+
+ // Create the context and set the batch size
+ val ssc = new SparkStreamContext(args(0), "ExampleTwo")
+ ssc.setBatchDuration(Seconds(2))
+
+ // Create the FileInputRDS on the directory and use the
+ // stream to count words in new files created
+ val inputRDS = ssc.createTextFileStream(args(1))
+ val wordsRDS = inputRDS.flatMap(_.split(" "))
+ val wordCountsRDS = wordsRDS.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCountsRDS.print()
+ ssc.start()
+ }
+}