aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala3
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala5
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala88
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala7
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala14
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/Interval.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala40
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala56
-rw-r--r--streaming/src/main/scala/spark/streaming/QueueInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala17
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala221
-rw-r--r--streaming/src/main/scala/spark/streaming/Scheduler.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala93
-rw-r--r--streaming/src/main/scala/spark/streaming/WindowedDStream.scala38
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/Grep2.scala64
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala95
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordMax2.scala73
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala67
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuite.scala123
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala68
-rw-r--r--streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala188
30 files changed, 930 insertions, 435 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
index ea009f0f4f..daabc0d566 100644
--- a/core/src/main/scala/spark/BlockRDD.scala
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -7,7 +7,8 @@ class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
}
-class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+ extends RDD[T](sc) {
@transient
val splits_ = (0 until blockIds.size).map(i => {
diff --git a/core/src/main/scala/spark/HadoopRDD.scala b/core/src/main/scala/spark/HadoopRDD.scala
index f282a4023b..0befca582d 100644
--- a/core/src/main/scala/spark/HadoopRDD.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -42,7 +42,8 @@ class HadoopRDD[K, V](
minSplits: Int)
extends RDD[(K, V)](sc) {
- val serializableConf = new SerializableWritable(conf)
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+ val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@transient
val splits_ : Array[Split] = {
@@ -66,7 +67,7 @@ class HadoopRDD[K, V](
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
- val conf = serializableConf.value
+ val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 65d0532bd5..8a3f565071 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -10,8 +10,10 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer
+import com.esotericsoftware.kryo.serialize.SerializableSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
+import spark.broadcast._
import spark.storage._
/**
@@ -203,6 +205,10 @@ class KryoSerializer extends Serializer with Logging {
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
+ // Allow sending SerializableWritable
+ kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer())
+ kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer())
+
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index de23eb6f48..82c1391345 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -1,5 +1,6 @@
package spark
+import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream}
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
@@ -10,6 +11,7 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
+import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.storage.BlockManagerId
@@ -18,12 +20,11 @@ sealed trait MapOutputTrackerMessage
case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
case object StopMapOutputTracker extends MapOutputTrackerMessage
-class MapOutputTrackerActor(bmAddresses: ConcurrentHashMap[Int, Array[BlockManagerId]])
-extends Actor with Logging {
+class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
case GetMapOutputLocations(shuffleId: Int) =>
logInfo("Asked to get map output locations for shuffle " + shuffleId)
- sender ! bmAddresses.get(shuffleId)
+ sender ! tracker.getSerializedLocations(shuffleId)
case StopMapOutputTracker =>
logInfo("MapOutputTrackerActor stopped!")
@@ -39,15 +40,19 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
val timeout = 10.seconds
- private var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+ var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
private var generation: Long = 0
private var generationLock = new java.lang.Object
+ // Cache a serialized version of the output locations for each shuffle to send them out faster
+ var cacheGeneration = generation
+ val cachedSerializedLocs = new HashMap[Int, Array[Byte]]
+
var trackerActor: ActorRef = if (isMaster) {
- val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(bmAddresses)), name = actorName)
+ val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
@@ -134,15 +139,16 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val fetched = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[BlockManagerId]]
+ val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
+ val fetchedLocs = deserializeLocations(fetchedBytes)
logInfo("Got the output locations")
- bmAddresses.put(shuffleId, fetched)
+ bmAddresses.put(shuffleId, fetchedLocs)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
- return fetched
+ return fetchedLocs
} else {
return locs
}
@@ -181,4 +187,70 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
}
}
}
+
+ def getSerializedLocations(shuffleId: Int): Array[Byte] = {
+ var locs: Array[BlockManagerId] = null
+ var generationGotten: Long = -1
+ generationLock.synchronized {
+ if (generation > cacheGeneration) {
+ cachedSerializedLocs.clear()
+ cacheGeneration = generation
+ }
+ cachedSerializedLocs.get(shuffleId) match {
+ case Some(bytes) =>
+ return bytes
+ case None =>
+ locs = bmAddresses.get(shuffleId)
+ generationGotten = generation
+ }
+ }
+ // If we got here, we failed to find the serialized locations in the cache, so we pulled
+ // out a snapshot of the locations as "locs"; let's serialize and return that
+ val bytes = serializeLocations(locs)
+ // Add them into the table only if the generation hasn't changed while we were working
+ generationLock.synchronized {
+ if (generation == generationGotten) {
+ cachedSerializedLocs(shuffleId) = bytes
+ }
+ }
+ return bytes
+ }
+
+ // Serialize an array of map output locations into an efficient byte format so that we can send
+ // it to reduce tasks. We do this by grouping together the locations by block manager ID.
+ def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = {
+ val out = new ByteArrayOutputStream
+ val dataOut = new DataOutputStream(out)
+ dataOut.writeInt(locs.length)
+ val grouped = locs.zipWithIndex.groupBy(_._1)
+ dataOut.writeInt(grouped.size)
+ for ((id, pairs) <- grouped if id != null) {
+ dataOut.writeUTF(id.ip)
+ dataOut.writeInt(id.port)
+ dataOut.writeInt(pairs.length)
+ for ((_, blockIndex) <- pairs) {
+ dataOut.writeInt(blockIndex)
+ }
+ }
+ dataOut.close()
+ out.toByteArray
+ }
+
+ // Opposite of serializeLocations.
+ def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = {
+ val dataIn = new DataInputStream(new ByteArrayInputStream(bytes))
+ val length = dataIn.readInt()
+ val array = new Array[BlockManagerId](length)
+ val numGroups = dataIn.readInt()
+ for (i <- 0 until numGroups) {
+ val ip = dataIn.readUTF()
+ val port = dataIn.readInt()
+ val id = new BlockManagerId(ip, port)
+ val numBlocks = dataIn.readInt()
+ for (j <- 0 until numBlocks) {
+ array(dataIn.readInt()) = id
+ }
+ }
+ array
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 73479bff01..f1eae9bc88 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -26,7 +26,8 @@ object ShuffleMapTask {
return old
} else {
val out = new ByteArrayOutputStream
- val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
objOut.writeObject(rdd)
objOut.writeObject(dep)
objOut.close()
@@ -45,10 +46,8 @@ object ShuffleMapTask {
} else {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
val tuple = (rdd, dep)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 5b59479682..20c82ad0fa 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -115,6 +115,7 @@ class ClusterScheduler(sc: SparkContext)
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized {
+ SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
slaveIdToHost(o.slaveId) = o.hostname
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 83e7c6e036..978b4f2676 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -99,7 +99,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster
def removeSlave(slaveId: String) {
- logInfo("Slave " + slaveId + " disconnected, so removing it")
+ logWarning("Slave " + slaveId + " disconnected, so removing it")
val numCores = freeCores(slaveId)
actorToSlaveId -= slaveActor(slaveId)
addressToSlaveId -= slaveAddress(slaveId)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index f2d9499bad..4cdb9710ec 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -509,10 +509,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* Replicate block to another node.
*/
+ var firstTime = true
+ var peers : Seq[BlockManagerId] = null
private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
val tLevel: StorageLevel =
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
- var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ if (firstTime) {
+ peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
+ firstTime = false;
+ }
for (peer: BlockManagerId <- peers) {
val start = System.nanoTime
data.rewind()
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index 0b2ed69e07..607633c6df 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -12,7 +12,7 @@ case class GetBlock(id: String)
case class GotBlock(id: String, data: ByteBuffer)
case class PutBlock(id: String, data: ByteBuffer, level: StorageLevel)
-class BlockMessage() extends Logging{
+class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
@@ -22,8 +22,6 @@ class BlockMessage() extends Logging{
private var data: ByteBuffer = null
private var level: StorageLevel = null
- initLogging()
-
def set(getBlock: GetBlock) {
typ = BlockMessage.TYPE_GET_BLOCK
id = getBlock.id
@@ -62,8 +60,6 @@ class BlockMessage() extends Logging{
}
id = idBuilder.toString()
- logDebug("Set from buffer Result: " + typ + " " + id)
- logDebug("Buffer position is " + buffer.position)
if (typ == BlockMessage.TYPE_PUT_BLOCK) {
val booleanInt = buffer.getInt()
@@ -77,23 +73,18 @@ class BlockMessage() extends Logging{
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 2: " + level + " " + data)
} else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
val dataLength = buffer.getInt()
- logDebug("Data length is "+ dataLength)
- logDebug("Buffer position is " + buffer.position)
data = ByteBuffer.allocate(dataLength)
if (dataLength != buffer.remaining) {
throw new Exception("Error parsing buffer")
}
data.put(buffer)
data.flip()
- logDebug("Set from buffer Result 3: " + data)
}
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " from bytebuffer in " + (finishTime - startTime) / 1000.0 + " s")
}
def set(bufferMsg: BufferMessage) {
@@ -145,8 +136,6 @@ class BlockMessage() extends Logging{
buffers += data
}
- logDebug("Start to log buffers.")
- buffers.foreach((x: ByteBuffer) => logDebug("" + x))
/*
println()
println("BlockMessage: ")
@@ -160,7 +149,6 @@ class BlockMessage() extends Logging{
println()
*/
val finishTime = System.currentTimeMillis
- logDebug("Converted " + id + " to buffer message in " + (finishTime - startTime) / 1000.0 + " s")
return Message.createBufferMessage(buffers)
}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 57d212e4ca..df4e23bfd6 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -31,6 +31,8 @@ object AkkaUtils {
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s
+ akka.remote.netty.execution-pool-size = 8
+ akka.actor.default-dispatcher.throughput = 30
""".format(host, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 9b0115eef6..3973ca1520 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -41,17 +41,17 @@ extends Logging with Serializable {
*/
// Variable to store the RDDs generated earlier in time
- @transient private val generatedRDDs = new HashMap[Time, RDD[T]] ()
+ @transient protected val generatedRDDs = new HashMap[Time, RDD[T]] ()
// Variable to be set to the first time seen by the DStream (effective time zero)
- private[streaming] var zeroTime: Time = null
+ protected[streaming] var zeroTime: Time = null
// Variable to specify storage level
- private var storageLevel: StorageLevel = StorageLevel.NONE
+ protected var storageLevel: StorageLevel = StorageLevel.NONE
// Checkpoint level and checkpoint interval
- private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
- private var checkpointInterval: Time = null
+ protected var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
+ protected var checkpointInterval: Time = null
// Change this RDD's storage level
def persist(
@@ -84,7 +84,7 @@ extends Logging with Serializable {
* the validity of future times is calculated. This method also recursively initializes
* its parent DStreams.
*/
- def initialize(time: Time) {
+ protected[streaming] def initialize(time: Time) {
if (zeroTime == null) {
zeroTime = time
}
@@ -93,7 +93,7 @@ extends Logging with Serializable {
}
/** This method checks whether the 'time' is valid wrt slideTime for generating RDD */
- private def isTimeValid (time: Time): Boolean = {
+ protected def isTimeValid (time: Time): Boolean = {
if (!isInitialized) {
throw new Exception (this.toString + " has not been initialized")
} else if (time < zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) {
@@ -143,7 +143,7 @@ extends Logging with Serializable {
/**
* This method generates a SparkStreaming job for the given time
- * and may require to be overriden by subclasses
+ * and may required to be overriden by subclasses
*/
def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
@@ -208,7 +208,7 @@ extends Logging with Serializable {
new TransformedDStream(this, ssc.sc.clean(transformFunc))
}
- private[streaming] def toQueue = {
+ def toBlockingQueue = {
val queue = new ArrayBlockingQueue[RDD[T]](10000)
this.foreachRDD(rdd => {
queue.add(rdd)
@@ -256,6 +256,28 @@ extends Logging with Serializable {
def union(that: DStream[T]) = new UnifiedDStream(Array(this, that))
+ def slice(interval: Interval): Seq[RDD[T]] = {
+ slice(interval.beginTime, interval.endTime)
+ }
+
+ // Get all the RDDs between fromTime to toTime (both included)
+ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
+
+ val rdds = new ArrayBuffer[RDD[T]]()
+ var time = toTime.floor(slideTime)
+
+
+ while (time >= zeroTime && time >= fromTime) {
+ getOrCompute(time) match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not get old reduced RDD for time " + time)
+ }
+ time -= slideTime
+ }
+
+ rdds.toSeq
+ }
+
def register() {
ssc.registerOutputStream(this)
}
diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala
index 87b8437b3d..ffb7725ac9 100644
--- a/streaming/src/main/scala/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/spark/streaming/Interval.scala
@@ -9,6 +9,10 @@ case class Interval(beginTime: Time, endTime: Time) {
new Interval(beginTime + time, endTime + time)
}
+ def - (time: Time): Interval = {
+ new Interval(beginTime - time, endTime - time)
+ }
+
def < (that: Interval): Boolean = {
if (this.duration != that.duration) {
throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..230d806a89 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
+ println("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index acf97c1883..9f9001e4d5 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -4,6 +4,7 @@ import spark.Logging
import spark.SparkEnv
import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
import akka.actor._
import akka.pattern.ask
@@ -28,6 +29,17 @@ extends Logging {
logInfo("Registered receiver for network stream " + streamId)
sender ! true
}
+ case GotBlockIds(streamId, blockIds) => {
+ val tmp = receivedBlockIds.synchronized {
+ if (!receivedBlockIds.contains(streamId)) {
+ receivedBlockIds += ((streamId, new Queue[String]))
+ }
+ receivedBlockIds(streamId)
+ }
+ tmp.synchronized {
+ tmp ++= blockIds
+ }
+ }
}
}
@@ -69,8 +81,8 @@ extends Logging {
val networkInputStreamIds = networkInputStreams.map(_.id).toArray
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef]
- val receivedBlockIds = new HashMap[Int, Array[String]]
- val timeout = 1000.milliseconds
+ val receivedBlockIds = new HashMap[Int, Queue[String]]
+ val timeout = 5000.milliseconds
var currentTime: Time = null
@@ -86,22 +98,12 @@ extends Logging {
}
def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
- if (currentTime == null || time > currentTime) {
- logInfo("Getting block ids from receivers for " + time)
- implicit val ec = ssc.env.actorSystem.dispatcher
- receivedBlockIds.clear()
- val message = new GetBlockIds(time)
- val listOfFutures = receiverInfo.values.map(
- _.ask(message)(timeout).mapTo[GotBlockIds]
- ).toList
- val futureOfList = Future.sequence(listOfFutures)
- val allBlockIds = Await.result(futureOfList, timeout)
- receivedBlockIds ++= allBlockIds.map(x => (x.streamId, x.blocksIds))
- if (receivedBlockIds.size != receiverInfo.size) {
- throw new Exception("Unexpected number of the Block IDs received")
- }
- currentTime = time
+ val queue = receivedBlockIds.synchronized {
+ receivedBlockIds.getOrElse(receiverId, new Queue[String]())
+ }
+ val result = queue.synchronized {
+ queue.dequeueAll(x => true)
}
- receivedBlockIds.getOrElse(receiverId, Array[String]())
+ result.toArray
}
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 13db34ac80..3fd0a16bf0 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -19,32 +19,32 @@ extends Serializable {
/* DStream operations for key-value pairs */
/* ---------------------------------- */
- def groupByKey(): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
- def groupByKey(numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
- def groupByKey(partitioner: Partitioner): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer[V](v)
def mergeValue(c: ArrayBuffer[V], v: V) = (c += v)
def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2)
- combineByKey[ArrayBuffer[V]](createCombiner _, mergeValue _, mergeCombiner _, partitioner)
+ combineByKey(createCombiner _, mergeValue _, mergeCombiner _, partitioner).asInstanceOf[DStream[(K, Seq[V])]]
}
- def reduceByKey(reduceFunc: (V, V) => V): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
- def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
- def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): ShuffledDStream[K, V, V] = {
+ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
- combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
+ combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
private def combineByKey[C: ClassManifest](
@@ -55,11 +55,15 @@ extends Serializable {
new ShuffledDStream[K, V, C](stream, createCombiner, mergeValue, mergeCombiner, partitioner)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner())
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ def groupByKeyAndWindow(
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int
+ ): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions))
}
@@ -67,15 +71,24 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ShuffledDStream[K, V, ArrayBuffer[V]] = {
+ ): DStream[(K, Seq[V])] = {
stream.window(windowTime, slideTime).groupByKey(partitioner)
}
- def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time): ShuffledDStream[K, V, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner())
}
- def reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowTime: Time, slideTime: Time, numPartitions: Int): ShuffledDStream[K, V, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
}
@@ -84,7 +97,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ShuffledDStream[K, V, V] = {
+ ): DStream[(K, V)] = {
stream.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner)
}
@@ -93,12 +106,13 @@ extends Serializable {
// so that new elements introduced in the window can be "added" using
// reduceFunc to the previous window's result and old elements can be
// "subtracted using invReduceFunc.
+
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
windowTime: Time,
slideTime: Time
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner())
@@ -110,7 +124,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
numPartitions: Int
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
reduceByKeyAndWindow(
reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions))
@@ -122,7 +136,7 @@ extends Serializable {
windowTime: Time,
slideTime: Time,
partitioner: Partitioner
- ): ReducedWindowedDStream[K, V] = {
+ ): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
@@ -137,21 +151,21 @@ extends Serializable {
//
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
}
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S,
numPartitions: Int
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], S) => S,
partitioner: Partitioner
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
val func = (iterator: Iterator[(K, Seq[V], S)]) => {
iterator.map(tuple => (tuple._1, updateFunc(tuple._2, tuple._3)))
}
@@ -162,7 +176,7 @@ extends Serializable {
updateFunc: (Iterator[(K, Seq[V], S)]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
- ): StateDStream[K, V, S] = {
+ ): DStream[(K, S)] = {
new StateDStream(stream, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
index f6b53fe2f2..b794159b09 100644
--- a/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/QueueInputDStream.scala
@@ -7,7 +7,7 @@ import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
class QueueInputDStream[T: ClassManifest](
- ssc: StreamingContext,
+ @transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index d59c245a23..d29aea7886 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -86,14 +86,15 @@ class RawInputDStream[T: ClassManifest](
private class ReceiverActor(env: SparkEnv, receivingThread: Thread) extends Actor {
val newBlocks = new ArrayBuffer[String]
+ logInfo("Attempting to register with tracker")
+ val ip = System.getProperty("spark.master.host", "localhost")
+ val port = System.getProperty("spark.master.port", "7077").toInt
+ val actorName: String = "NetworkInputTracker"
+ val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
+ val trackerActor = env.actorSystem.actorFor(url)
+ val timeout = 5.seconds
+
override def preStart() {
- logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.master.host", "localhost")
- val port = System.getProperty("spark.master.port", "7077").toInt
- val actorName: String = "NetworkInputTracker"
- val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
- val trackerActor = env.actorSystem.actorFor(url)
- val timeout = 1.seconds
val future = trackerActor.ask(RegisterReceiver(streamId, self))(timeout)
Await.result(future, timeout)
}
@@ -101,6 +102,7 @@ class RawInputDStream[T: ClassManifest](
override def receive = {
case BlockPublished(blockId) =>
newBlocks += blockId
+ val future = trackerActor ! GotBlockIds(streamId, Array(blockId))
case GetBlockIds(time) =>
logInfo("Got request for block IDs for " + time)
@@ -111,5 +113,6 @@ class RawInputDStream[T: ClassManifest](
receivingThread.interrupt()
sender ! true
}
+
}
}
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 191d264b2b..b0beaba94d 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -12,7 +12,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
- parent: DStream[(K, V)],
+ @transient parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
_windowTime: Time,
@@ -28,9 +28,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
- val allowPartialWindows = true
- //reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ @transient val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
override def dependencies = List(reducedStream)
@@ -44,174 +42,95 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
checkpointInterval: Time): DStream[(K,V)] = {
super.persist(storageLevel, checkpointLevel, checkpointInterval)
reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval)
+ this
}
-
+
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
-
- // Notation:
+ val currentTime = validTime
+ val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime)
+ val previousWindow = currentWindow - slideTime
+
+ logDebug("Window time = " + windowTime)
+ logDebug("Slide time = " + slideTime)
+ logDebug("ZeroTime = " + zeroTime)
+ logDebug("Current window = " + currentWindow)
+ logDebug("Previous window = " + previousWindow)
+
// _____________________________
- // | previous window _________|___________________
- // |___________________| current window | --------------> Time
+ // | previous window _________|___________________
+ // |___________________| current window | --------------> Time
// |_____________________________|
- //
+ //
// |________ _________| |________ _________|
// | |
// V V
- // old time steps new time steps
+ // old RDDs new RDDs
//
- def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = {
- val beginTime =
- if (allowPartialWindows && endTime - windowTime < parent.zeroTime) {
- parent.zeroTime
- } else {
- endTime - windowTime
- }
- Interval(beginTime, endTime)
- }
-
- val currentTime = validTime
- val currentWindow = getAdjustedWindow(currentTime, windowTime)
- val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime)
-
- logInfo("Current window = " + currentWindow)
- logInfo("Slide time = " + slideTime)
- logInfo("Previous window = " + previousWindow)
- logInfo("Parent.zeroTime = " + parent.zeroTime)
-
- if (allowPartialWindows) {
- if (currentTime - slideTime <= parent.zeroTime) {
- reducedStream.getOrCompute(currentTime) match {
- case Some(rdd) => return Some(rdd)
- case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
- }
- }
- } else {
- if (previousWindow.beginTime < parent.zeroTime) {
- if (currentWindow.beginTime < parent.zeroTime) {
- return None
- } else {
- // If this is the first feasible window, then generate reduced value in the naive manner
- val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]()
- var t = currentWindow.endTime
- while (t > currentWindow.beginTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => reducedRDDs += rdd
- case None => throw new Exception("Could not get reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
- }
- if (reducedRDDs.size == 0) {
- throw new Exception("Could not generate the first RDD for time " + validTime)
- }
- return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(partitioner, reduceFunc))
- }
- }
- }
-
- // Get the RDD of the reduced value of the previous window
- val previousWindowRDD = getOrCompute(previousWindow.endTime) match {
- case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime)
- }
- val oldRDDs = new ArrayBuffer[RDD[(_, _)]]()
- val newRDDs = new ArrayBuffer[RDD[(_, _)]]()
-
// Get the RDDs of the reduced values in "old time steps"
- var t = currentWindow.beginTime
- while (t > previousWindow.beginTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get old reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
- }
+ val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime)
+ logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
- t = currentWindow.endTime
- while (t > previousWindow.endTime) {
- reducedStream.getOrCompute(t) match {
- case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]]
- case None => throw new Exception("Could not get new reduced RDD for time " + t)
- }
- t -= reducedStream.slideTime
+ val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime)
+ logDebug("# new RDDs = " + newRDDs.size)
+
+ // Get the RDD of the reduced value of the previous window
+ val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
+
+ // Make the list of RDDs that needs to cogrouped together for reducing their reduced values
+ val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs
+
+ // Cogroup the reduced RDDs and merge the reduced values
+ val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner)
+ val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValuesFunc)
+
+ Some(mergedValuesRDD)
+ }
+
+ def mergeValues(numOldValues: Int, numNewValues: Int)(seqOfValues: Seq[Seq[V]]): V = {
+
+ if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ throw new Exception("Unexpected number of sequences of reduced values")
}
- val allRDDs = new ArrayBuffer[RDD[(_, _)]]()
- allRDDs += previousWindowRDD
- allRDDs ++= oldRDDs
- allRDDs ++= newRDDs
-
-
- val numOldRDDs = oldRDDs.size
- val numNewRDDs = newRDDs.size
- logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
- logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs")
- val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => {
- val (key, value) = x
- logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
- if (value.size != 1 + numOldRDDs + numNewRDDs) {
- throw new Exception("Number of groups not odd!")
- }
+ // Getting reduced values "old time steps" that will be removed from current window
+ val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+
+ // Getting reduced values "new time steps"
+ val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
- // old values = reduced values of the "old time steps" that are eliminated from current window
- // new values = reduced values of the "new time steps" that are introduced to the current window
- // previous value = reduced value of the previous window
-
- /*val numOldValues = (value.size - 1) / 2*/
- // Getting reduced values "old time steps"
- val oldValues =
- (0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0))
- // Getting reduced values "new time steps"
- val newValues =
- (0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0))
-
- // If reduced value for the key does not exist in previous window, it should not exist in "old time steps"
- if (value(0).size == 0 && oldValues.size != 0) {
- throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values")
+ if (seqOfValues(0).isEmpty) {
+
+ // If previous window's reduce value does not exist, then at least new values should exist
+ if (newValues.isEmpty) {
+ throw new Exception("Neither previous window has value for key, nor new values found")
}
- // For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values
- if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) {
- throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values")
+ // Reduce the new values
+ // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
+ return newValues.reduce(reduceFunc)
+ } else {
+
+ // Get the previous window's reduced value
+ var tempValue = seqOfValues(0).head
+
+ // If old values exists, then inverse reduce then from previous value
+ if (!oldValues.isEmpty) {
+ // println("old values = " + oldValues.map(_.toString).reduce(_ + " " + _))
+ tempValue = invReduceFunc(tempValue, oldValues.reduce(reduceFunc))
}
- // Logic to generate the final reduced value for current window:
- //
- // If previous window did not have reduced value for the key
- // Then, return reduced value of "new time steps" as the final value
- // Else, reduced value exists in previous window
- // If "old" time steps did not have reduced value for the key
- // Then, reduce previous window's reduced value with that of "new time steps" for final value
- // Else, reduced values exists in "old time steps"
- // If "new values" did not have reduced value for the key
- // Then, inverse-reduce "old values" from previous window's reduced value for final value
- // Else, all 3 values exist, combine all of them together
- //
- logDebug("# old values = " + oldValues.size + ", # new values = " + newValues)
- val finalValue = {
- if (value(0).size == 0) {
- newValues.reduce(reduceFunc)
- } else {
- val prevValue = value(0)(0)
- logDebug("prev value = " + prevValue)
- if (oldValues.size == 0) {
- // assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier)
- val temp = newValues.reduce(reduceFunc)
- reduceFunc(prevValue, temp)
- } else if (newValues.size == 0) {
- invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
- } else {
- val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
- reduceFunc(tempValue, newValues.reduce(reduceFunc))
- }
- }
+ // If new values exists, then reduce them with previous value
+ if (!newValues.isEmpty) {
+ // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _))
+ tempValue = reduceFunc(tempValue, newValues.reduce(reduceFunc))
}
- (key, finalValue)
- })
- //newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2)
- Some(newRDD)
+ // println("final value = " + tempValue)
+ return tempValue
+ }
}
}
diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala
index b4b8e34ec8..d2e907378d 100644
--- a/streaming/src/main/scala/spark/streaming/Scheduler.scala
+++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala
@@ -39,8 +39,8 @@ extends Logging {
}
def generateRDDs (time: Time) {
- println("\n-----------------------------------------------------\n")
SparkEnv.set(ssc.env)
+ logInfo("\n-----------------------------------------------------\n")
logInfo("Generating RDDs for time " + time)
outputStreams.foreach(outputStream => {
outputStream.generateJob(time) match {
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 9d3561b4a0..72b71d5fab 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -1,10 +1,11 @@
package spark.streaming
import spark.RDD
+import spark.BlockRDD
import spark.Partitioner
import spark.MapPartitionsRDD
import spark.SparkContext._
-
+import spark.storage.StorageLevel
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
@transient parent: DStream[(K, V)],
@@ -22,6 +23,47 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
override def slideTime = parent.slideTime
+ override def getOrCompute(time: Time): Option[RDD[(K, S)]] = {
+ generatedRDDs.get(time) match {
+ case Some(oldRDD) => {
+ if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval) && oldRDD.dependencies.size > 0) {
+ val r = oldRDD
+ val oldRDDBlockIds = oldRDD.splits.map(s => "rdd:" + r.id + ":" + s.index)
+ val checkpointedRDD = new BlockRDD[(K, S)](ssc.sc, oldRDDBlockIds) {
+ override val partitioner = oldRDD.partitioner
+ }
+ generatedRDDs.update(time, checkpointedRDD)
+ logInfo("Updated RDD of time " + time + " with its checkpointed version")
+ Some(checkpointedRDD)
+ } else {
+ Some(oldRDD)
+ }
+ }
+ case None => {
+ if (isTimeValid(time)) {
+ compute(time) match {
+ case Some(newRDD) => {
+ if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
+ newRDD.persist(checkpointLevel)
+ logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time)
+ } else if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time)
+ }
+ generatedRDDs.put(time, newRDD)
+ Some(newRDD)
+ }
+ case None => {
+ None
+ }
+ }
+ } else {
+ None
+ }
+ }
+ }
+ }
+
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
// Try to get the previous state RDD
@@ -29,26 +71,27 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
case Some(prevStateRDD) => { // If previous state RDD exists
- // Define the function for the mapPartition operation on cogrouped RDD;
- // first map the cogrouped tuple to tuples of required type,
- // and then apply the update function
- val func = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
- val i = iterator.map(t => {
- (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
- })
- updateFunc(i)
- }
-
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on cogrouped RDD;
+ // first map the cogrouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val mapPartitionFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val i = iterator.map(t => {
+ (t._1, t._2._1, t._2._2.headOption.getOrElse(null.asInstanceOf[S]))
+ })
+ updateFuncLocal(i)
+ }
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
- val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, func)
- logDebug("Generating state RDD for time " + validTime)
+ val stateRDD = new SpecialMapPartitionsRDD(cogroupedRDD, mapPartitionFunc)
+ //logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
case None => { // If parent RDD does not exist, then return old state RDD
- logDebug("Generating state RDD for time " + validTime + " (no change)")
+ //logDebug("Generating state RDD for time " + validTime + " (no change)")
return Some(prevStateRDD)
}
}
@@ -56,23 +99,25 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
case None => { // If previous session RDD does not exist (first input data)
- // Define the function for the mapPartition operation on grouped RDD;
- // first map the grouped tuple to tuples of required type,
- // and then apply the update function
- val func = (iterator: Iterator[(K, Seq[V])]) => {
- updateFunc(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
- }
-
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
+
+ // Define the function for the mapPartition operation on grouped RDD;
+ // first map the grouped tuple to tuples of required type,
+ // and then apply the update function
+ val updateFuncLocal = updateFunc
+ val mapPartitionFunc = (iterator: Iterator[(K, Seq[V])]) => {
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, null.asInstanceOf[S])))
+ }
+
val groupedRDD = parentRDD.groupByKey(partitioner)
- val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, func)
- logDebug("Generating state RDD for time " + validTime + " (first)")
+ val sessionRDD = new SpecialMapPartitionsRDD(groupedRDD, mapPartitionFunc)
+ //logDebug("Generating state RDD for time " + validTime + " (first)")
return Some(sessionRDD)
}
case None => { // If parent RDD does not exist, then nothing to do!
- logDebug("Not generating state RDD (no previous state, no parent)")
+ //logDebug("Not generating state RDD (no previous state, no parent)")
return None
}
}
diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
index 6c791fcfc1..93c1291691 100644
--- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala
@@ -1,12 +1,8 @@
package spark.streaming
-import spark.streaming.StreamingContext._
-
import spark.RDD
import spark.UnionRDD
-import spark.SparkContext._
-import scala.collection.mutable.ArrayBuffer
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
@@ -22,8 +18,6 @@ class WindowedDStream[T: ClassManifest](
throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " +
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")")
- val allowPartialWindows = true
-
override def dependencies = List(parent)
def windowTime: Time = _windowTime
@@ -31,36 +25,8 @@ class WindowedDStream[T: ClassManifest](
override def slideTime: Time = _slideTime
override def compute(validTime: Time): Option[RDD[T]] = {
- val parentRDDs = new ArrayBuffer[RDD[T]]()
- val windowEndTime = validTime
- val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) {
- parent.zeroTime
- } else {
- windowEndTime - windowTime
- }
-
- logInfo("Window = " + windowStartTime + " - " + windowEndTime)
- logInfo("Parent.zeroTime = " + parent.zeroTime)
-
- if (windowStartTime >= parent.zeroTime) {
- // Walk back through time, from the 'windowEndTime' to 'windowStartTime'
- // and get all parent RDDs from the parent DStream
- var t = windowEndTime
- while (t > windowStartTime) {
- parent.getOrCompute(t) match {
- case Some(rdd) => parentRDDs += rdd
- case None => throw new Exception("Could not generate parent RDD for time " + t)
- }
- t -= parent.slideTime
- }
- }
-
- // Do a union of all parent RDDs to generate the new RDD
- if (parentRDDs.size > 0) {
- Some(new UnionRDD(ssc.sc, parentRDDs))
- } else {
- None
- }
+ val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime)
+ Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
new file mode 100644
index 0000000000..7237142c7c
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
@@ -0,0 +1,64 @@
+package spark.streaming.examples
+
+import spark.SparkContext
+import SparkContext._
+import spark.streaming._
+import StreamingContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+
+object Grep2 {
+
+ def warmup(sc: SparkContext) {
+ (0 until 10).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(x => (x % 337, x % 1331))
+ .reduceByKey(_ + _)
+ .count()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length != 6) {
+ println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
+
+ val batchDuration = Milliseconds(batchMillis.toLong)
+
+ val ssc = new StreamingContext(master, "Grep2")
+ ssc.setBatchDuration(batchDuration)
+
+ //warmup(ssc.sc)
+
+ val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
+ new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
+ println("Data count: " + data.count())
+ println("Data count: " + data.count())
+ println("Data count: " + data.count())
+
+ val sentences = new ConstantInputDStream(ssc, data)
+ ssc.inputStreams += sentences
+
+ sentences.filter(_.contains("Culpepper")).count().foreachRDD(r =>
+ println("Grep count: " + r.collect().mkString))
+
+ ssc.start()
+
+ while(true) { Thread.sleep(1000) }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
new file mode 100644
index 0000000000..3ba07d0448
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -0,0 +1,95 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.SparkContext
+import spark.SparkContext._
+import spark.storage.StorageLevel
+import spark.streaming._
+import spark.streaming.StreamingContext._
+
+import WordCount2_ExtraFunctions._
+
+object TopKWordCountRaw {
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 7) {
+ System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ System.exit(1)
+ }
+
+ val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
+ IntParam(chkptMs), IntParam(reduces)) = args
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "TopKWordCountRaw")
+ ssc.setBatchDuration(Milliseconds(batchMs))
+
+ // Make sure some tasks have started on each node
+ moreWarmup(ssc.sc)
+
+ val rawStreams = (1 to streams).map(_ =>
+ ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ val union = new UnifiedDStream(rawStreams)
+
+ val windowedCounts = union.mapPartitions(splitAndCountPartitions)
+ .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ Milliseconds(chkptMs))
+ //windowedCounts.print() // TODO: something else?
+
+ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
+ val taken = new Array[(String, Long)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Long) = null
+ var swap: (String, Long) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ count += 1
+ println("count = " + count)
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ println("Took " + len + " out of " + count + " items")
+ return taken.toIterator
+ }
+
+ val k = 50
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreachRDD(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " items")
+ topK(collectedCounts.toIterator, k).foreach(println)
+ })
+
+// windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
+
+ ssc.start()
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index 8c2724e97c..c22949d7b9 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -22,6 +22,8 @@ object WordCount2_ExtraFunctions {
def subtract(v1: Long, v2: Long) = (v1 - v2)
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
+
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
//val map = new java.util.HashMap[String, Long]
val map = new OLMap[String]
@@ -60,10 +62,10 @@ object WordCount2_ExtraFunctions {
object WordCount2 {
def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
+ (0 until 3).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
.map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
+ .reduceByKey(_ + _, 100)
.count()
}
}
@@ -85,8 +87,8 @@ object WordCount2 {
//warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 2)) // Memory only, serialized, 2 replicas
- println("Data count: " + data.count())
+ new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
+ println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
println("Data count: " + data.count())
println("Data count: " + data.count())
@@ -98,7 +100,9 @@ object WordCount2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
- windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER,
+ StorageLevel.MEMORY_ONLY_DESER_2,
+ //new StorageLevel(false, true, true, 3),
Milliseconds(chkptMillis.toLong))
windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 298d9ef381..9702003805 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -1,11 +1,24 @@
package spark.streaming.examples
import spark.util.IntParam
+import spark.SparkContext
+import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming._
import spark.streaming.StreamingContext._
+import WordCount2_ExtraFunctions._
+
object WordCountRaw {
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
def main(args: Array[String]) {
if (args.length != 7) {
System.err.println("Usage: WordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
@@ -20,16 +33,12 @@ object WordCountRaw {
ssc.setBatchDuration(Milliseconds(batchMs))
// Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
+ moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
- import WordCount2_ExtraFunctions._
-
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
new file mode 100644
index 0000000000..3658cb302d
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
@@ -0,0 +1,73 @@
+package spark.streaming.examples
+
+import spark.SparkContext
+import SparkContext._
+import spark.streaming._
+import StreamingContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+
+object WordMax2 {
+
+ def warmup(sc: SparkContext) {
+ (0 until 10).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(x => (x % 337, x % 1331))
+ .reduceByKey(_ + _)
+ .count()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length != 6) {
+ println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
+
+ val batchDuration = Milliseconds(batchMillis.toLong)
+
+ val ssc = new StreamingContext(master, "WordMax2")
+ ssc.setBatchDuration(batchDuration)
+
+ //warmup(ssc.sc)
+
+ val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
+ new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
+ println("Data count: " + data.count())
+ println("Data count: " + data.count())
+ println("Data count: " + data.count())
+
+ val sentences = new ConstantInputDStream(ssc, data)
+ ssc.inputStreams += sentences
+
+ import WordCount2_ExtraFunctions._
+
+ val windowedCounts = sentences
+ .mapPartitions(splitAndCountPartitions)
+ .reduceByKey(add _, reduceTasks.toInt)
+ .persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ Milliseconds(chkptMillis.toLong))
+ .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt)
+ //.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ // Milliseconds(chkptMillis.toLong))
+ windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
+
+ ssc.start()
+
+ while(true) { Thread.sleep(1000) }
+ }
+}
+
+
diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
new file mode 100644
index 0000000000..9b953d9dae
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
@@ -0,0 +1,67 @@
+package spark.streaming
+
+import spark.streaming.StreamingContext._
+import scala.runtime.RichInt
+
+class DStreamBasicSuite extends DStreamSuiteBase {
+
+ test("map-like operations") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+
+ // map
+ testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString)))
+
+ // flatMap
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
+ input.map(_.flatMap(x => Array(x, x * 2)))
+ )
+ }
+
+ test("shuffle-based operations") {
+ // reduceByKey
+ testOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
+ (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
+ Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
+ true
+ )
+
+ // reduce
+ testOperation(
+ Seq(1 to 4, 5 to 8, 9 to 12),
+ (s: DStream[Int]) => s.reduce(_ + _),
+ Seq(Seq(10), Seq(26), Seq(42))
+ )
+ }
+
+ test("stateful operations") {
+ val inputData =
+ Seq(
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c"),
+ Seq("a", "b", "c")
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 2)),
+ Seq(("a", 3), ("b", 3), ("c", 3))
+ )
+
+ val updateStateOp = (s: DStream[String]) => {
+ val updateFunc = (values: Seq[Int], state: RichInt) => {
+ var newState = 0
+ if (values != null) newState += values.reduce(_ + _)
+ if (state != null) newState += state.self
+ //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
+ new RichInt(newState)
+ }
+ s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
+ }
+
+ testOperation(inputData, updateStateOp, outputData, true)
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
deleted file mode 100644
index 030f351080..0000000000
--- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-package spark.streaming
-
-import spark.Logging
-import spark.streaming.StreamingContext._
-import spark.streaming.util.ManualClock
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import scala.collection.mutable.ArrayBuffer
-import scala.runtime.RichInt
-
-class DStreamSuite extends FunSuite with BeforeAndAfter with Logging {
-
- var ssc: StreamingContext = null
- val batchDurationMillis = 1000
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
- def testOp[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- expectedOutput: Seq[Seq[V]],
- useSet: Boolean = false
- ) {
- try {
- ssc = new StreamingContext("local", "test")
- ssc.setBatchDuration(Milliseconds(batchDurationMillis))
-
- val inputStream = ssc.createQueueStream(input.map(ssc.sc.makeRDD(_, 2)).toIterator)
- val outputStream = operation(inputStream)
- val outputQueue = outputStream.toQueue
-
- ssc.start()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.addToTime(input.size * batchDurationMillis)
-
- Thread.sleep(1000)
-
- val output = new ArrayBuffer[Seq[V]]()
- while(outputQueue.size > 0) {
- val rdd = outputQueue.take()
- output += (rdd.collect())
- }
-
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- if (useSet) {
- assert(output(i).toSet === expectedOutput(i).toSet)
- } else {
- assert(output(i).toList === expectedOutput(i).toList)
- }
- }
- } finally {
- ssc.stop()
- }
- }
-
- test("map-like operations") {
- val inputData = Seq(1 to 4, 5 to 8, 9 to 12)
-
- // map
- testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString)))
-
- // flatMap
- testOp(
- inputData,
- (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
- inputData.map(_.flatMap(x => Array(x, x * 2)))
- )
- }
-
- test("shuffle-based operations") {
- // reduceByKey
- testOp(
- Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
- (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
- Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
- true
- )
-
- // reduce
- testOp(
- Seq(1 to 4, 5 to 8, 9 to 12),
- (s: DStream[Int]) => s.reduce(_ + _),
- Seq(Seq(10), Seq(26), Seq(42))
- )
- }
-
- test("window-based operations") {
-
- }
-
-
- test("stateful operations") {
- val inputData =
- Seq(
- Seq("a", "b", "c"),
- Seq("a", "b", "c"),
- Seq("a", "b", "c")
- )
-
- val outputData =
- Seq(
- Seq(("a", 1), ("b", 1), ("c", 1)),
- Seq(("a", 2), ("b", 2), ("c", 2)),
- Seq(("a", 3), ("b", 3), ("c", 3))
- )
-
- val updateStateOp =(s: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: RichInt) => {
- var newState = 0
- if (values != null) newState += values.reduce(_ + _)
- if (state != null) newState += state.self
- //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
- new RichInt(newState)
- }
- s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
- }
-
- testOp(inputData, updateStateOp, outputData, true)
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
new file mode 100644
index 0000000000..1c4ea14b1d
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala
@@ -0,0 +1,68 @@
+package spark.streaming
+
+import spark.{RDD, Logging}
+import util.ManualClock
+import collection.mutable.ArrayBuffer
+import org.scalatest.FunSuite
+import scala.collection.mutable.Queue
+
+
+trait DStreamSuiteBase extends FunSuite with Logging {
+
+ def batchDuration() = Seconds(1)
+
+ def maxWaitTimeMillis() = 10000
+
+ def testOperation[U: ClassManifest, V: ClassManifest](
+ input: Seq[Seq[U]],
+ operation: DStream[U] => DStream[V],
+ expectedOutput: Seq[Seq[V]],
+ useSet: Boolean = false
+ ) {
+
+ val manualClock = true
+
+ if (manualClock) {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ }
+
+ val ssc = new StreamingContext("local", "test")
+
+ try {
+ ssc.setBatchDuration(Milliseconds(batchDuration))
+
+ val inputQueue = new Queue[RDD[U]]()
+ inputQueue ++= input.map(ssc.sc.makeRDD(_, 2))
+ val emptyRDD = ssc.sc.makeRDD(Seq[U](), 2)
+
+ val inputStream = ssc.createQueueStream(inputQueue, true, emptyRDD)
+ val outputStream = operation(inputStream)
+
+ val output = new ArrayBuffer[Seq[V]]()
+ outputStream.foreachRDD(rdd => output += rdd.collect())
+
+ ssc.start()
+
+ val clock = ssc.scheduler.clock
+ if (clock.isInstanceOf[ManualClock]) {
+ clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ Thread.sleep(500)
+ }
+
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ if (useSet) {
+ assert(output(i).toSet === expectedOutput(i).toSet)
+ } else {
+ assert(output(i).toList === expectedOutput(i).toList)
+ }
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
new file mode 100644
index 0000000000..061cab2cbb
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala
@@ -0,0 +1,188 @@
+package spark.streaming
+
+import spark.streaming.StreamingContext._
+
+class DStreamWindowSuite extends DStreamSuiteBase {
+
+ val largerSlideInput = Seq(
+ Seq(("a", 1)), // 1st window from here
+ Seq(("a", 2)),
+ Seq(("a", 3)), // 2nd window from here
+ Seq(("a", 4)),
+ Seq(("a", 5)), // 3rd window from here
+ Seq(("a", 6)),
+ Seq(), // 4th window from here
+ Seq(),
+ Seq() // 5th window from here
+ )
+
+ val largerSlideOutput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 6)),
+ Seq(("a", 14)),
+ Seq(("a", 15)),
+ Seq(("a", 6))
+ )
+
+ val bigInput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1)),
+ Seq(),
+ Seq(("a", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1), ("c", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1)),
+ Seq()
+ )
+
+ val bigOutput = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 1))
+ )
+
+ /*
+ The output of the reduceByKeyAndWindow with inverse reduce function is
+ difference from the naive reduceByKeyAndWindow. Even if the count of a
+ particular key is 0, the key does not get eliminated from the RDDs of
+ ReducedWindowedDStream. This causes the number of keys in these RDDs to
+ increase forever. A more generalized version that allows elimination of
+ keys should be considered.
+ */
+ val bigOutputInv = Seq(
+ Seq(("a", 1)),
+ Seq(("a", 2), ("b", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 2), ("c", 1)),
+ Seq(("a", 2), ("b", 1), ("c", 0)),
+ Seq(("a", 1), ("b", 0), ("c", 0))
+ )
+
+ def testReduceByKeyAndWindow(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+ def testReduceByKeyAndWindowInv(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowTime: Time = Seconds(2),
+ slideTime: Time = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindowInv - " + name) {
+ testOperation(
+ input,
+ (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(),
+ expectedOutput,
+ true
+ )
+ }
+ }
+
+
+ // Testing naive reduceByKeyAndWindow (without invertible function)
+
+ testReduceByKeyAndWindow(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+
+ testReduceByKeyAndWindow(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindow(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() )
+ )
+
+ testReduceByKeyAndWindow(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testReduceByKeyAndWindow("big test", bigInput, bigOutput)
+
+
+ // Testing reduceByKeyAndWindow (with invertible reduce function)
+
+ testReduceByKeyAndWindowInv(
+ "basic reduction",
+ Seq(Seq(("a", 1), ("a", 3)) ),
+ Seq(Seq(("a", 4)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key already in window and new value added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "new key added into window",
+ Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
+ Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "key removed from window",
+ Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
+ Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
+ )
+
+ testReduceByKeyAndWindowInv(
+ "larger slide time",
+ largerSlideInput,
+ largerSlideOutput,
+ Seconds(4),
+ Seconds(2)
+ )
+
+ testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv)
+}