diff options
Diffstat (limited to 'streaming/src')
4 files changed, 17 insertions, 16 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index aae79a4e6f..b97fb7e6e3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -30,10 +30,11 @@ import akka.actor._ import akka.pattern.ask import akka.util.duration._ import akka.dispatch._ +import org.apache.spark.storage.BlockId private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage /** @@ -48,7 +49,7 @@ class NetworkInputTracker( val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[String]] + val receivedBlockIds = new HashMap[Int, Queue[BlockId]] val timeout = 5000.milliseconds var currentTime: Time = null @@ -67,9 +68,9 @@ class NetworkInputTracker( } /** Return all the blocks received from a receiver. */ - def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { + def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { val queue = receivedBlockIds.synchronized { - receivedBlockIds.getOrElse(receiverId, new Queue[String]()) + receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]()) } val result = queue.synchronized { queue.dequeueAll(x => true) @@ -92,7 +93,7 @@ class NetworkInputTracker( case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { - receivedBlockIds += ((streamId, new Queue[String])) + receivedBlockIds += ((streamId, new Queue[BlockId])) } receivedBlockIds(streamId) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 31f9891560..8d3ac0fc65 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.rdd.{RDD, BlockRDD} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} /** * Abstract class for defining any InputDStream that has to start a receiver on worker @@ -69,7 +69,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[String]())) + Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } } @@ -77,7 +77,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -158,7 +158,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, metadata) } @@ -166,7 +166,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** * Pushes a block (as bytes) into the block manager. */ - def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) actor ! ReportBlock(blockId, metadata) } @@ -209,7 +209,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong @@ -241,7 +241,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { - val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) + val blockId = StreamBlockId(NetworkReceiver.this.streamId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.add(newBlock) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index c91f12ecd7..10ed4ef78d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext import java.net.InetSocketAddress @@ -71,7 +71,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) var nextBlockNumber = 0 while (true) { val buffer = queue.take() - val blockId = "input-" + streamId + "-" + nextBlockNumber + val blockId = StreamBlockId(streamId, nextBlockNumber) nextBlockNumber += 1 pushBlock(blockId, buffer, null, storageLevel) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index 4b5d8c467e..ef0f85a717 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -21,7 +21,7 @@ import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver import java.util.concurrent.atomic.AtomicInteger @@ -159,7 +159,7 @@ private[streaming] class ActorReceiver[T: ClassManifest]( protected def pushBlock(iter: Iterator[T]) { val buffer = new ArrayBuffer[T] buffer ++= iter - pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel) + pushBlock(StreamBlockId(streamId, System.nanoTime()), buffer, null, storageLevel) } protected def onStart() = { |