aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorroot <root@ip-10-68-155-156.ec2.internal>2012-09-01 19:45:25 +0000
committerroot <root@ip-10-68-155-156.ec2.internal>2012-09-01 19:45:25 +0000
commit83dad56334e73c477e9b62715df14b0c798220e3 (patch)
tree0027556890cdbb8d45edbc5c00d3782078550d60 /streaming
parentf84d2bbe55aaf3ef7a6631b9018a573aa5729ff7 (diff)
downloadspark-83dad56334e73c477e9b62715df14b0c798220e3.tar.gz
spark-83dad56334e73c477e9b62715df14b0c798220e3.tar.bz2
spark-83dad56334e73c477e9b62715df14b0c798220e3.zip
Further fixes to raw text sender, plus an app that uses it
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/CountRaw.scala32
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextSender.scala7
7 files changed, 56 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3a57488f9b..74140ab2b8 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -176,7 +176,7 @@ extends Logging with Serializable {
def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) =
new MapPartitionedDStream(this, ssc.sc.clean(mapPartFunc))
- def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2)
+ def reduce(reduceFunc: (T, T) => T) = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
def count() = this.map(_ => 1).reduce(_ + _)
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 40e614b4ed..9bf9251519 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.4f s for job %s; execution was %.4f s".format(
+ logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index 49e4781e75..d59c245a23 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -22,7 +22,8 @@ import spark.storage.StorageLevel
class RawInputDStream[T: ClassManifest](
@transient ssc: StreamingContext,
host: String,
- port: Int)
+ port: Int,
+ storageLevel: StorageLevel)
extends NetworkInputDStream[T](ssc) with Logging {
val streamId = id
@@ -49,7 +50,7 @@ class RawInputDStream[T: ClassManifest](
val buffer = queue.take()
val blockId = "input-" + streamId + "-" + nextBlockNumber
nextBlockNumber += 1
- env.blockManager.putBytes(blockId, buffer, StorageLevel.MEMORY_ONLY_2)
+ env.blockManager.putBytes(blockId, buffer, storageLevel)
actor ! BlockPublished(blockId)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index feb769e036..cb0f9ceb15 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -4,6 +4,7 @@ import spark.RDD
import spark.Logging
import spark.SparkEnv
import spark.SparkContext
+import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue
@@ -64,6 +65,16 @@ class StreamingContext (
inputStreams += inputStream
inputStream
}
+
+ def createRawNetworkStream[T: ClassManifest](
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2
+ ): DStream[T] = {
+ val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
+ inputStreams += inputStream
+ inputStream
+ }
/*
def createHttpTextStream(url: String): DStream[String] = {
diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
new file mode 100644
index 0000000000..17d1ce3602
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
@@ -0,0 +1,32 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+import spark.streaming._
+import spark.streaming.StreamingContext._
+
+object CountRaw {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: WordCountNetwork <master> <numStreams> <hostname> <port>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(numStreams), hostname, IntParam(port)) = args
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "CountRaw")
+ ssc.setBatchDuration(Seconds(1))
+
+ // Make sure some tasks have started on each node
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+ ssc.sc.parallelize(1 to 1000, 1000).count()
+
+ val rawStreams = (1 to numStreams).map(_ =>
+ ssc.createRawNetworkStream[String](hostname, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ val union = new UnifiedDStream(rawStreams)
+ union.map(_.length).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
+ ssc.start()
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index a090dcb85d..ce553758a7 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -85,7 +85,7 @@ object WordCount2 {
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, true, 2)) // Memory only, deserialized, 2 replicas
+ new StorageLevel(false, true, false, 2)) // Memory only, serialized, 2 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
index 85927c02ec..8db651ba19 100644
--- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala
@@ -1,5 +1,6 @@
package spark.streaming.util
+import java.nio.ByteBuffer
import spark.util.{RateLimitedOutputStream, IntParam}
import java.net.ServerSocket
import spark.{Logging, KryoSerializer}
@@ -33,7 +34,12 @@ object RawTextSender extends Logging {
bufferStream.trim()
val array = bufferStream.array
+ val countBuf = ByteBuffer.wrap(new Array[Byte](4))
+ countBuf.putInt(array.length)
+ countBuf.flip()
+
val serverSocket = new ServerSocket(port)
+ logInfo("Listening on port " + port)
while (true) {
val socket = serverSocket.accept()
@@ -41,6 +47,7 @@ object RawTextSender extends Logging {
val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec)
try {
while (true) {
+ out.write(countBuf.array)
out.write(array)
}
} catch {