diff options
author | root <root@ip-10-68-155-156.ec2.internal> | 2012-09-01 19:45:25 +0000 |
---|---|---|
committer | root <root@ip-10-68-155-156.ec2.internal> | 2012-09-01 19:45:25 +0000 |
commit | 83dad56334e73c477e9b62715df14b0c798220e3 (patch) | |
tree | 0027556890cdbb8d45edbc5c00d3782078550d60 /streaming | |
parent | f84d2bbe55aaf3ef7a6631b9018a573aa5729ff7 (diff) | |
download | spark-83dad56334e73c477e9b62715df14b0c798220e3.tar.gz spark-83dad56334e73c477e9b62715df14b0c798220e3.tar.bz2 spark-83dad56334e73c477e9b62715df14b0c798220e3.zip |
Further fixes to raw text sender, plus an app that uses it
Diffstat (limited to 'streaming')
7 files changed, 56 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 3a57488f9b..74140ab2b8 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -176,7 +176,7 @@ extends Logging with Serializable { def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc)) - def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2) + def reduce(reduceFunc: (T, T) => T) = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) def count() = this.map(_ => 1).reduce(_ + _) diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 40e614b4ed..9bf9251519 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.4f s for job %s; execution was %.4f s".format( + logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala index 49e4781e75..d59c245a23 100644 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala @@ -22,7 +22,8 @@ import spark.storage.StorageLevel class RawInputDStream[T: ClassManifest]( @transient ssc: StreamingContext, host: String, - port: Int) + port: Int, + storageLevel: StorageLevel) extends NetworkInputDStream[T](ssc) with Logging { val streamId = id @@ -49,7 +50,7 @@ class RawInputDStream[T: ClassManifest]( val buffer = queue.take() val blockId = "input-" + streamId + "-" + nextBlockNumber nextBlockNumber += 1 - env.blockManager.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_2) + env.blockManager.putBytes(blockId, buffer, storageLevel) actor ! BlockPublished(blockId) } } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index feb769e036..cb0f9ceb15 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -4,6 +4,7 @@ import spark.RDD import spark.Logging import spark.SparkEnv import spark.SparkContext +import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Queue @@ -64,6 +65,16 @@ class StreamingContext ( inputStreams += inputStream inputStream } + + def createRawNetworkStream[T: ClassManifest]( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2 + ): DStream[T] = { + val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) + inputStreams += inputStream + inputStream + } /* def createHttpTextStream(url: String): DStream[String] = { diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala new file mode 100644 index 0000000000..17d1ce3602 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala @@ -0,0 +1,32 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel +import spark.streaming._ +import spark.streaming.StreamingContext._ + +object CountRaw { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: WordCountNetwork <master> <numStreams> <hostname> <port>") + System.exit(1) + } + + val Array(master, IntParam(numStreams), hostname, IntParam(port)) = args + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "CountRaw") + ssc.setBatchDuration(Seconds(1)) + + // Make sure some tasks have started on each node + ssc.sc.parallelize(1 to 1000, 1000).count() + ssc.sc.parallelize(1 to 1000, 1000).count() + ssc.sc.parallelize(1 to 1000, 1000).count() + + val rawStreams = (1 to numStreams).map(_ => + ssc.createRawNetworkStream[String](hostname, port, StorageLevel.MEMORY_ONLY_2)).toArray + val union = new UnifiedDStream(rawStreams) + union.map(_.length).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) + ssc.start() + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index a090dcb85d..ce553758a7 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -85,7 +85,7 @@ object WordCount2 { //warmup(ssc.sc) val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, true, 2)) // Memory only, deserialized, 2 replicas + new StorageLevel(false, true, false, 2)) // Memory only, serialized, 2 replicas println("Data count: " + data.count()) println("Data count: " + data.count()) println("Data count: " + data.count()) diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala index 85927c02ec..8db651ba19 100644 --- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala @@ -1,5 +1,6 @@ package spark.streaming.util +import java.nio.ByteBuffer import spark.util.{RateLimitedOutputStream, IntParam} import java.net.ServerSocket import spark.{Logging, KryoSerializer} @@ -33,7 +34,12 @@ object RawTextSender extends Logging { bufferStream.trim() val array = bufferStream.array + val countBuf = ByteBuffer.wrap(new Array[Byte](4)) + countBuf.putInt(array.length) + countBuf.flip() + val serverSocket = new ServerSocket(port) + logInfo("Listening on port " + port) while (true) { val socket = serverSocket.accept() @@ -41,6 +47,7 @@ object RawTextSender extends Logging { val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) try { while (true) { + out.write(countBuf.array) out.write(array) } } catch { |