aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 16:41:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 16:41:11 -0800
commitd808e1026ad488bed19886f3a3ed2f34a10e7d16 (patch)
tree1417db7ec363823041680c89c5c196405997b66e /streaming/src
parent1d8b1c9bec75d1837ff33cd202f8233345b0a653 (diff)
parent4719e6d8fe6d93734f5bbe6c91dcc4616c1ed317 (diff)
downloadspark-d808e1026ad488bed19886f3a3ed2f34a10e7d16.tar.gz
spark-d808e1026ad488bed19886f3a3ed2f34a10e7d16.tar.bz2
spark-d808e1026ad488bed19886f3a3ed2f34a10e7d16.zip
Merge branch 'dev' into dev-merge
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala46
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala43
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala33
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala69
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala39
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala25
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala43
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala85
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala84
-rw-r--r--streaming/src/test/resources/log4j.properties4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
16 files changed, 37 insertions, 639 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 7256e41af9..215246ba2e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -154,7 +154,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -192,7 +192,7 @@ class StreamingContext private (
storageLevel: StorageLevel
): DStream[T] = {
val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -208,7 +208,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -228,13 +228,14 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -244,16 +245,37 @@ class StreamingContext private (
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
- ](directory: String): DStream[(K, V)] = {
+ ] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
+ registerInputStream(inputStream)
inputStream
}
+
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat).
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
@@ -274,7 +296,7 @@ class StreamingContext private (
defaultRDD: RDD[T] = null
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index cf72095324..1e6ad84b44 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -14,7 +14,7 @@ private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
- filter: PathFilter = FileInputDStream.defaultPathFilter,
+ filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
@@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter.accept(path)) {
+ if (!filter(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
@@ -95,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
+private[streaming]
object FileInputDStream {
- val defaultPathFilter = new PathFilter with Serializable {
- def accept(path: Path): Boolean = {
- val file = path.getName()
- if (file.startsWith(".") || file.endsWith("_tmp")) {
- return false
- } else {
- return true
- }
- }
- }
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
deleted file mode 100644
index 81938d30d4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.StreamingContext
-import spark.streaming.StreamingContext._
-import spark.streaming.Seconds
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-
-object FileStream {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: FileStream <master> <new HDFS compatible directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "FileStream", Seconds(1))
-
- // Create the new directory
- val directory = new Path(args(1))
- val fs = directory.getFileSystem(new Configuration())
- if (fs.exists(directory)) throw new Exception("This directory already exists")
- fs.mkdirs(directory)
- fs.deleteOnExit(directory)
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val inputStream = ssc.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
-
- // Creating new files in the directory
- val text = "This is a text file"
- for (i <- 1 to 30) {
- ssc.sc.parallelize((1 to (i * 10)).map(_ => text), 10)
- .saveAsTextFile(new Path(directory, i.toString).toString)
- Thread.sleep(1000)
- }
- Thread.sleep(5000) // Waiting for the file to be processed
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
deleted file mode 100644
index b7bc15a1d5..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-
-object FileStreamWithCheckpoint {
-
- def main(args: Array[String]) {
-
- if (args.size != 3) {
- println("FileStreamWithCheckpoint <master> <directory> <checkpoint dir>")
- println("FileStreamWithCheckpoint restart <directory> <checkpoint dir>")
- System.exit(-1)
- }
-
- val directory = new Path(args(1))
- val checkpointDir = args(2)
-
- val ssc: StreamingContext = {
-
- if (args(0) == "restart") {
-
- // Recreated streaming context from specified checkpoint file
- new StreamingContext(checkpointDir)
-
- } else {
-
- // Create directory if it does not exist
- val fs = directory.getFileSystem(new Configuration())
- if (!fs.exists(directory)) fs.mkdirs(directory)
-
- // Create new streaming context
- val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1))
- ssc_.checkpoint(checkpointDir)
-
- // Setup the streaming computation
- val inputStream = ssc_.textFileStream(directory.toString)
- val words = inputStream.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
-
- ssc_
- }
- }
-
- // Start the stream computation
- startFileWritingThread(directory.toString)
- ssc.start()
- }
-
- def startFileWritingThread(directory: String) {
-
- val fs = new Path(directory).getFileSystem(new Configuration())
-
- val fileWritingThread = new Thread() {
- override def run() {
- val r = new scala.util.Random()
- val text = "This is a sample text file with a random number "
- while(true) {
- val number = r.nextInt()
- val file = new Path(directory, number.toString)
- val fos = fs.create(file)
- fos.writeChars(text + number)
- fos.close()
- println("Created text file " + file)
- Thread.sleep(1000)
- }
- }
- }
- fileWritingThread.start()
- }
-
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
deleted file mode 100644
index e60ce483a3..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-import spark.streaming._
-
-/**
- * Produce a streaming count of events received from Flume.
- *
- * This should be used in conjunction with an AvroSink in Flume. It will start
- * an Avro server on at the request host:port address and listen for requests.
- * Your Flume AvroSink should be pointed to this address.
- *
- * Usage: FlumeEventCount <master> <host> <port>
- *
- * <master> is a Spark master URL
- * <host> is the host the Flume receiver will be started on - a receiver
- * creates a server and listens for flume events.
- * <port> is the port the Flume receiver will listen on.
- */
-object FlumeEventCount {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println(
- "Usage: FlumeEventCount <master> <host> <port>")
- System.exit(1)
- }
-
- val Array(master, host, IntParam(port)) = args
-
- val batchInterval = Milliseconds(2000)
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
-
- // Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
-
- // Print out the count of events received from this server in each batch
- stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
deleted file mode 100644
index dfaaf03f03..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package spark.streaming.examples
-
-import spark.util.IntParam
-import spark.storage.StorageLevel
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-object GrepRaw {
- def main(args: Array[String]) {
- if (args.length != 5) {
- System.err.println("Usage: GrepRaw <master> <numStreams> <host> <port> <batchMillis>")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
-
- // Create the context
- val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
-
- val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
- val union = ssc.union(rawStreams)
- union.filter(_.contains("Alice")).count().foreach(r =>
- println("Grep count: " + r.collect().mkString))
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
deleted file mode 100644
index fe55db6e2c..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-package spark.streaming.examples
-
-import java.util.Properties
-import kafka.message.Message
-import kafka.producer.SyncProducerConfig
-import kafka.producer._
-import spark.SparkContext
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.storage.StorageLevel
-import spark.streaming.util.RawTextHelper._
-
-object KafkaWordCount {
- def main(args: Array[String]) {
-
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
- System.exit(1)
- }
-
- val Array(master, hostname, port, group, topics, numThreads) = args
-
- val sc = new SparkContext(master, "KafkaWordCount")
- val ssc = new StreamingContext(sc, Seconds(2))
- ssc.checkpoint("checkpoint")
-
- val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
- wordCounts.print()
-
- ssc.start()
- }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
- System.exit(1)
- }
-
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
-
- // Zookeper connection properties
- val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
-
- val config = new ProducerConfig(props)
- val producer = new Producer[String, String](config)
-
- // Send some messages
- while(true) {
- val messages = (1 to messagesPerSec.toInt).map { messageNum =>
- (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
- }.toArray
- println(messages.mkString(","))
- val data = new ProducerData[String, String](topic, messages)
- producer.send(data)
- Thread.sleep(100)
- }
- }
-
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
deleted file mode 100644
index 2a265d021d..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package spark.streaming.examples
-
-import spark.RDD
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-import scala.collection.mutable.SynchronizedQueue
-
-object QueueStream {
-
- def main(args: Array[String]) {
- if (args.length < 1) {
- System.err.println("Usage: QueueStream <master>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
-
- // Create the queue through which RDDs can be pushed to
- // a QueueInputDStream
- val rddQueue = new SynchronizedQueue[RDD[Int]]()
-
- // Create the QueueInputDStream and use it do some processing
- val inputStream = ssc.queueStream(rddQueue)
- val mappedStream = inputStream.map(x => (x % 10, 1))
- val reducedStream = mappedStream.reduceByKey(_ + _)
- reducedStream.print()
- ssc.start()
-
- // Create and push some RDDs into
- for (i <- 1 to 30) {
- rddQueue += ssc.sc.makeRDD(1 to 1000, 10)
- Thread.sleep(1000)
- }
- ssc.stop()
- System.exit(0)
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
deleted file mode 100644
index 338834bc3c..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object TopKWordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- val k = 10
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- /*warmUp(ssc.sc)*/
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate top K words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
- partialTopKWindowedCounts.foreach(rdd => {
- val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " words from partial top words")
- println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
- })
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
deleted file mode 100644
index 867a8f42c4..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountHdfs {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountHdfs <master> <directory>")
- System.exit(1)
- }
-
- // Create the context
- val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2))
-
- // Create the FileInputDStream on the directory and use the
- // stream to count words in new files created
- val lines = ssc.textFileStream(args(1))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
deleted file mode 100644
index eadda60563..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-
-object WordCountNetwork {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1")
- System.exit(1)
- }
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1))
-
- // Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.networkTextStream(args(1), args(2).toInt)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
deleted file mode 100644
index d93335a8ce..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-package spark.streaming.examples
-
-import spark.storage.StorageLevel
-import spark.util.IntParam
-
-import spark.streaming._
-import spark.streaming.StreamingContext._
-import spark.streaming.util.RawTextHelper._
-
-import java.util.UUID
-
-object WordCountRaw {
-
- def main(args: Array[String]) {
- if (args.length != 4) {
- System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
- System.exit(1)
- }
-
- val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
-
- // Create the context, and set the checkpoint directory.
- // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
- // periodically to HDFS
- val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1))
- ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
-
- // Warm up the JVMs on master and slave for JIT compilation to kick in
- warmUp(ssc.sc)
-
- // Set up the raw network streams that will connect to localhost:port to raw test
- // senders on the slaves and generate count of words of last 30 seconds
- val lines = (1 to numStreams).map(_ => {
- ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
- })
- val union = ssc.union(lines)
- val counts = union.mapPartitions(splitAndCountPartitions)
- val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
- windowedCounts.foreach(r => println("# unique words = " + r.count()))
-
- ssc.start()
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
deleted file mode 100644
index 4c6e08bc74..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-package spark.streaming.examples.clickstream
-
-import java.net.{InetAddress,ServerSocket,Socket,SocketException}
-import java.io.{InputStreamReader, BufferedReader, PrintWriter}
-import util.Random
-
-/** Represents a page view on a website with associated dimension data.*/
-class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) {
- override def toString() : String = {
- "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID)
- }
-}
-object PageView {
- def fromString(in : String) : PageView = {
- val parts = in.split("\t")
- new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt)
- }
-}
-
-/** Generates streaming events to simulate page views on a website.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
- * */
-object PageViewGenerator {
- val pages = Map("http://foo.com/" -> .7,
- "http://foo.com/news" -> 0.2,
- "http://foo.com/contact" -> .1)
- val httpStatus = Map(200 -> .95,
- 404 -> .05)
- val userZipCode = Map(94709 -> .5,
- 94117 -> .5)
- val userID = Map((1 to 100).map(_ -> .01):_*)
-
-
- def pickFromDistribution[T](inputMap : Map[T, Double]) : T = {
- val rand = new Random().nextDouble()
- var total = 0.0
- for ((item, prob) <- inputMap) {
- total = total + prob
- if (total > rand) {
- return item
- }
- }
- return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
- }
-
- def getNextClickEvent() : String = {
- val id = pickFromDistribution(userID)
- val page = pickFromDistribution(pages)
- val status = pickFromDistribution(httpStatus)
- val zipCode = pickFromDistribution(userZipCode)
- new PageView(page, status, zipCode, id).toString()
- }
-
- def main(args : Array[String]) {
- if (args.length != 2) {
- System.err.println("Usage: PageViewGenerator <port> <viewsPerSecond>")
- System.exit(1)
- }
- val port = args(0).toInt
- val viewsPerSecond = args(1).toFloat
- val sleepDelayMs = (1000.0 / viewsPerSecond).toInt
- val listener = new ServerSocket(port)
- println("Listening on port: " + port)
-
- while (true) {
- val socket = listener.accept()
- new Thread() {
- override def run = {
- println("Got client connected from: " + socket.getInetAddress)
- val out = new PrintWriter(socket.getOutputStream(), true)
-
- while (true) {
- Thread.sleep(sleepDelayMs)
- out.write(getNextClickEvent())
- out.flush()
- }
- socket.close()
- }
- }.start()
- }
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
deleted file mode 100644
index a191321d91..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package spark.streaming.examples.clickstream
-
-import spark.streaming.{Seconds, StreamingContext}
-import spark.streaming.StreamingContext._
-import spark.SparkContext._
-
-/** Analyses a streaming dataset of web page views. This class demonstrates several types of
- * operators available in Spark streaming.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
- * */
-object PageViewStream {
- def main(args: Array[String]) {
- if (args.length != 3) {
- System.err.println("Usage: PageViewStream <metric> <host> <port>")
- System.err.println("<metric> must be one of pageCounts, slidingPageCounts," +
- " errorRatePerZipCode, activeUserCount, popularUsersSeen")
- System.exit(1)
- }
- val metric = args(0)
- val host = args(1)
- val port = args(2).toInt
-
- // Create the context
- val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
-
- // Create a NetworkInputDStream on target host:port and convert each line to a PageView
- val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
-
- // Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
-
- // Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
-
-
- // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
- val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2))
- .map(view => ((view.zipCode, view.status)))
- .groupByKey()
- val errorRatePerZipCode = statusesPerZipCode.map{
- case(zip, statuses) =>
- val normalCount = statuses.filter(_ == 200).size
- val errorCount = statuses.size - normalCount
- val errorRatio = errorCount.toFloat / statuses.size
- if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
- else {"%s: %s".format(zip, errorRatio)}
- }
-
- // Return the number unique users in last 15 seconds
- val activeUserCount = pageViews.window(Seconds(15), Seconds(2))
- .map(view => (view.userID, 1))
- .groupByKey()
- .count()
- .map("Unique active users: " + _)
-
- // An external dataset we want to join to this stream
- val userList = ssc.sc.parallelize(
- Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
-
- metric match {
- case "pageCounts" => pageCounts.print()
- case "slidingPageCounts" => slidingPageCounts.print()
- case "errorRatePerZipCode" => errorRatePerZipCode.print()
- case "activeUserCount" => activeUserCount.print()
- case "popularUsersSeen" =>
- // Look for users in our existing dataset and print it out if we have a match
- pageViews.map(view => (view.userID, 1))
- .foreach((rdd, time) => rdd.join(userList)
- .map(_._2._2)
- .take(10)
- .foreach(u => println("Saw user %s at time %s".format(u, time))))
- case _ => println("Invalid metric entered: " + metric)
- }
-
- ssc.start()
- }
-}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 33bafebaab..edfa1243fa 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,8 +1,8 @@
-# Set everything to be logged to the file streaming-tests.log
+# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming-tests.log
+log4j.appender.file.file=streaming/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 76b528bec3..e71ba6ddc1 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -318,7 +318,7 @@ class TestServer(port: Int) extends Logging {
}
}
} catch {
- case e: SocketException => println(e)
+ case e: SocketException => logError("TestServer error", e)
} finally {
logInfo("Connection closed")
if (!clientSocket.isClosed) clientSocket.close()