aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala35
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala193
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala88
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala28
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala258
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala34
11 files changed, 603 insertions, 70 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 391e40924f..bb47d373de 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -23,8 +23,9 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
-import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.SparkException
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
- val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
+ val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
- Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
+ Some(new BlockRDD[T](ssc.sc, Array.empty))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
new file mode 100644
index 0000000000..47968afef2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.language.existentials
+
+/** Trait representing a received block */
+private[streaming] sealed trait ReceivedBlock
+
+/** class representing a block received as an ArrayBuffer */
+private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
+
+/** class representing a block received as an Iterator */
+private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
+
+/** class representing a block received as an ByteBuffer */
+private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
new file mode 100644
index 0000000000..fdf995320b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.{existentials, postfixOps}
+
+import WriteAheadLogBasedBlockHandler._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.storage._
+import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
+import org.apache.spark.util.Utils
+
+/** Trait that represents the metadata related to storage of blocks */
+private[streaming] trait ReceivedBlockStoreResult {
+ def blockId: StreamBlockId // Any implementation of this trait will store a block id
+}
+
+/** Trait that represents a class that handles the storage of blocks received by receiver */
+private[streaming] trait ReceivedBlockHandler {
+
+ /** Store a received block with the given block id and return related metadata */
+ def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
+
+ /** Cleanup old blocks older than the given threshold time */
+ def cleanupOldBlock(threshTime: Long)
+}
+
+
+/**
+ * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
+ * that stores the metadata related to storage of blocks using
+ * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
+ */
+private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
+ extends ReceivedBlockStoreResult
+
+
+/**
+ * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
+ * stores the received blocks into a block manager with the specified storage level.
+ */
+private[streaming] class BlockManagerBasedBlockHandler(
+ blockManager: BlockManager, storageLevel: StorageLevel)
+ extends ReceivedBlockHandler with Logging {
+
+ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+ val putResult: Seq[(BlockId, BlockStatus)] = block match {
+ case ArrayBufferBlock(arrayBuffer) =>
+ blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
+ case IteratorBlock(iterator) =>
+ blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
+ case ByteBufferBlock(byteBuffer) =>
+ blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
+ case o =>
+ throw new SparkException(
+ s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
+ }
+ if (!putResult.map { _._1 }.contains(blockId)) {
+ throw new SparkException(
+ s"Could not store $blockId to block manager with storage level $storageLevel")
+ }
+ BlockManagerBasedStoreResult(blockId)
+ }
+
+ def cleanupOldBlock(threshTime: Long) {
+ // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
+ // of BlockRDDs.
+ }
+}
+
+
+/**
+ * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
+ * that stores the metadata related to storage of blocks using
+ * [[org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler]]
+ */
+private[streaming] case class WriteAheadLogBasedStoreResult(
+ blockId: StreamBlockId,
+ segment: WriteAheadLogFileSegment
+ ) extends ReceivedBlockStoreResult
+
+
+/**
+ * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
+ * stores the received blocks in both, a write ahead log and a block manager.
+ */
+private[streaming] class WriteAheadLogBasedBlockHandler(
+ blockManager: BlockManager,
+ streamId: Int,
+ storageLevel: StorageLevel,
+ conf: SparkConf,
+ hadoopConf: Configuration,
+ checkpointDir: String,
+ clock: Clock = new SystemClock
+ ) extends ReceivedBlockHandler with Logging {
+
+ private val blockStoreTimeout = conf.getInt(
+ "spark.streaming.receiver.blockStoreTimeout", 30).seconds
+ private val rollingInterval = conf.getInt(
+ "spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
+ private val maxFailures = conf.getInt(
+ "spark.streaming.receiver.writeAheadLog.maxFailures", 3)
+
+ // Manages rolling log files
+ private val logManager = new WriteAheadLogManager(
+ checkpointDirToLogDir(checkpointDir, streamId),
+ hadoopConf, rollingInterval, maxFailures,
+ callerName = this.getClass.getSimpleName,
+ clock = clock
+ )
+
+ // For processing futures used in parallel block storing into block manager and write ahead log
+ // # threads = 2, so that both writing to BM and WAL can proceed in parallel
+ implicit private val executionContext = ExecutionContext.fromExecutorService(
+ Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
+
+ /**
+ * This implementation stores the block into the block manager as well as a write ahead log.
+ * It does this in parallel, using Scala Futures, and returns only after the block has
+ * been stored in both places.
+ */
+ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+
+ // Serialize the block so that it can be inserted into both
+ val serializedBlock = block match {
+ case ArrayBufferBlock(arrayBuffer) =>
+ blockManager.dataSerialize(blockId, arrayBuffer.iterator)
+ case IteratorBlock(iterator) =>
+ blockManager.dataSerialize(blockId, iterator)
+ case ByteBufferBlock(byteBuffer) =>
+ byteBuffer
+ case _ =>
+ throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
+ }
+
+ // Store the block in block manager
+ val storeInBlockManagerFuture = Future {
+ val putResult =
+ blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
+ if (!putResult.map { _._1 }.contains(blockId)) {
+ throw new SparkException(
+ s"Could not store $blockId to block manager with storage level $storageLevel")
+ }
+ }
+
+ // Store the block in write ahead log
+ val storeInWriteAheadLogFuture = Future {
+ logManager.writeToLog(serializedBlock)
+ }
+
+ // Combine the futures, wait for both to complete, and return the write ahead log segment
+ val combinedFuture = for {
+ _ <- storeInBlockManagerFuture
+ fileSegment <- storeInWriteAheadLogFuture
+ } yield fileSegment
+ val segment = Await.result(combinedFuture, blockStoreTimeout)
+ WriteAheadLogBasedStoreResult(blockId, segment)
+ }
+
+ def cleanupOldBlock(threshTime: Long) {
+ logManager.cleanupOldLogs(threshTime)
+ }
+
+ def stop() {
+ logManager.stop()
+ }
+}
+
+private[streaming] object WriteAheadLogBasedBlockHandler {
+ def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
+ new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 53a3e6200e..5360412330 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -25,16 +25,13 @@ import scala.concurrent.Await
import akka.actor.{Actor, Props}
import akka.pattern.ask
-
import com.google.common.base.Throwables
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
-import org.apache.spark.streaming.scheduler.DeregisterReceiver
-import org.apache.spark.streaming.scheduler.AddBlock
-import org.apache.spark.streaming.scheduler.RegisterReceiver
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.util.WriteAheadLogFileSegment
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
*/
private[streaming] class ReceiverSupervisorImpl(
receiver: Receiver[_],
- env: SparkEnv
+ env: SparkEnv,
+ hadoopConf: Configuration,
+ checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
- private val blockManager = env.blockManager
+ private val receivedBlockHandler: ReceivedBlockHandler = {
+ if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
+ if (checkpointDirOption.isEmpty) {
+ throw new SparkException(
+ "Cannot enable receiver write-ahead log without checkpoint directory set. " +
+ "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
+ "See documentation for more details.")
+ }
+ new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
+ receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
+ } else {
+ new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
+ }
+ }
- private val storageLevel = receiver.storageLevel
/** Remote Akka actor for the ReceiverTracker */
private val trackerActor = {
@@ -105,47 +116,50 @@ private[streaming] class ReceiverSupervisorImpl(
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
- optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
+ metadataOption: Option[Any],
+ blockIdOption: Option[StreamBlockId]
) {
- val blockId = optionalBlockId.getOrElse(nextBlockId)
- val time = System.currentTimeMillis
- blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
- logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
- reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
+ pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}
/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
- optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
+ metadataOption: Option[Any],
+ blockIdOption: Option[StreamBlockId]
) {
- val blockId = optionalBlockId.getOrElse(nextBlockId)
- val time = System.currentTimeMillis
- blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
- logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
- reportPushedBlock(blockId, -1, optionalMetadata)
+ pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
}
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
- optionalMetadata: Option[Any],
- optionalBlockId: Option[StreamBlockId]
+ metadataOption: Option[Any],
+ blockIdOption: Option[StreamBlockId]
) {
- val blockId = optionalBlockId.getOrElse(nextBlockId)
- val time = System.currentTimeMillis
- blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
- logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
- reportPushedBlock(blockId, -1, optionalMetadata)
+ pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
}
- /** Report pushed block */
- def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
- val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
- trackerActor ! AddBlock(blockInfo)
- logDebug("Reported block " + blockId)
+ /** Store block and report it to driver */
+ def pushAndReportBlock(
+ receivedBlock: ReceivedBlock,
+ metadataOption: Option[Any],
+ blockIdOption: Option[StreamBlockId]
+ ) {
+ val blockId = blockIdOption.getOrElse(nextBlockId)
+ val numRecords = receivedBlock match {
+ case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
+ case _ => -1
+ }
+
+ val time = System.currentTimeMillis
+ val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
+ logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
+
+ val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
+ val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
+ Await.result(future, askTimeout)
+ logDebug(s"Reported block $blockId")
}
/** Report error to the receiver tracker */
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
index a68aecb881..92dc113f39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
@@ -17,8 +17,8 @@
package org.apache.spark.streaming.scheduler
-import org.apache.spark.streaming.Time
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.streaming.Time
/**
* :: DeveloperApi ::
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index a69d743621..8c15a75b1b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming.scheduler
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable.HashSet
+
import org.apache.spark.streaming.Time
/** Class representing a set of Jobs
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
new file mode 100644
index 0000000000..94beb590f5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
+
+/** Information about blocks received by the receiver */
+private[streaming] case class ReceivedBlockInfo(
+ streamId: Int,
+ numRecords: Long,
+ blockStoreResult: ReceivedBlockStoreResult
+ )
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 7149dbc12a..d696563bce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,21 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import scala.language.existentials
import akka.actor._
-import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.{SerializableWritable, Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils
-/** Information about blocks received by the receiver */
-private[streaming] case class ReceivedBlockInfo(
- streamId: Int,
- blockId: StreamBlockId,
- numRecords: Long,
- metadata: Any
- )
-
/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
* with each other.
@@ -153,7 +144,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
- receivedBlockInfo.blockId)
+ receivedBlockInfo.blockStoreResult.blockId)
}
/** Report error sent by a receiver */
@@ -188,6 +179,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
sender ! true
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
+ sender ! true
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
case DeregisterReceiver(streamId, message, error) =>
@@ -252,6 +244,9 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
ssc.sc.makeRDD(receivers, receivers.size)
}
+ val checkpointDirOption = Option(ssc.checkpointDir)
+ val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
+
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
@@ -259,9 +254,10 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
- val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
- executor.start()
- executor.awaitTermination()
+ val supervisor = new ReceiverSupervisorImpl(
+ receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
+ supervisor.start()
+ supervisor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
index 92bad7a882..003989092a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
@@ -52,4 +52,3 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
}
}
-
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
new file mode 100644
index 0000000000..ad1a6f01b3
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import akka.actor.{ActorSystem, Props}
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark._
+import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage._
+import org.apache.spark.streaming.receiver._
+import org.apache.spark.streaming.util._
+import org.apache.spark.util.AkkaUtils
+import WriteAheadLogBasedBlockHandler._
+import WriteAheadLogSuite._
+
+class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
+
+ val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
+ val hadoopConf = new Configuration()
+ val storageLevel = StorageLevel.MEMORY_ONLY_SER
+ val streamId = 1
+ val securityMgr = new SecurityManager(conf)
+ val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ val shuffleManager = new HashShuffleManager(conf)
+ val serializer = new KryoSerializer(conf)
+ val manualClock = new ManualClock
+ val blockManagerSize = 10000000
+
+ var actorSystem: ActorSystem = null
+ var blockManagerMaster: BlockManagerMaster = null
+ var blockManager: BlockManager = null
+ var tempDirectory: File = null
+
+ before {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+ this.actorSystem = actorSystem
+ conf.set("spark.driver.port", boundPort.toString)
+
+ blockManagerMaster = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf, true)
+
+ blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
+ blockManagerSize, conf, mapOutputTracker, shuffleManager,
+ new NioBlockTransferService(conf, securityMgr))
+
+ tempDirectory = Files.createTempDir()
+ manualClock.setTime(0)
+ }
+
+ after {
+ if (blockManager != null) {
+ blockManager.stop()
+ blockManager = null
+ }
+ if (blockManagerMaster != null) {
+ blockManagerMaster.stop()
+ blockManagerMaster = null
+ }
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ actorSystem = null
+
+ if (tempDirectory != null && tempDirectory.exists()) {
+ FileUtils.deleteDirectory(tempDirectory)
+ tempDirectory = null
+ }
+ }
+
+ test("BlockManagerBasedBlockHandler - store blocks") {
+ withBlockManagerBasedBlockHandler { handler =>
+ testBlockStoring(handler) { case (data, blockIds, storeResults) =>
+ // Verify the data in block manager is correct
+ val storedData = blockIds.flatMap { blockId =>
+ blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+ }.toList
+ storedData shouldEqual data
+
+ // Verify that the store results are instances of BlockManagerBasedStoreResult
+ assert(
+ storeResults.forall { _.isInstanceOf[BlockManagerBasedStoreResult] },
+ "Unexpected store result type"
+ )
+ }
+ }
+ }
+
+ test("BlockManagerBasedBlockHandler - handle errors in storing block") {
+ withBlockManagerBasedBlockHandler { handler =>
+ testErrorHandling(handler)
+ }
+ }
+
+ test("WriteAheadLogBasedBlockHandler - store blocks") {
+ withWriteAheadLogBasedBlockHandler { handler =>
+ testBlockStoring(handler) { case (data, blockIds, storeResults) =>
+ // Verify the data in block manager is correct
+ val storedData = blockIds.flatMap { blockId =>
+ blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty)
+ }.toList
+ storedData shouldEqual data
+
+ // Verify that the store results are instances of WriteAheadLogBasedStoreResult
+ assert(
+ storeResults.forall { _.isInstanceOf[WriteAheadLogBasedStoreResult] },
+ "Unexpected store result type"
+ )
+ // Verify the data in write ahead log files is correct
+ val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
+ val loggedData = fileSegments.flatMap { segment =>
+ val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
+ val bytes = reader.read(segment)
+ reader.close()
+ blockManager.dataDeserialize(generateBlockId(), bytes).toList
+ }
+ loggedData shouldEqual data
+ }
+ }
+ }
+
+ test("WriteAheadLogBasedBlockHandler - handle errors in storing block") {
+ withWriteAheadLogBasedBlockHandler { handler =>
+ testErrorHandling(handler)
+ }
+ }
+
+ test("WriteAheadLogBasedBlockHandler - cleanup old blocks") {
+ withWriteAheadLogBasedBlockHandler { handler =>
+ val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) }
+ storeBlocks(handler, blocks)
+
+ val preCleanupLogFiles = getWriteAheadLogFiles()
+ preCleanupLogFiles.size should be > 1
+
+ // this depends on the number of blocks inserted using generateAndStoreData()
+ manualClock.currentTime() shouldEqual 5000L
+
+ val cleanupThreshTime = 3000L
+ handler.cleanupOldBlock(cleanupThreshTime)
+ eventually(timeout(10000 millis), interval(10 millis)) {
+ getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
+ }
+ }
+ }
+
+ /**
+ * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
+ * using the given verification function
+ */
+ private def testBlockStoring(receivedBlockHandler: ReceivedBlockHandler)
+ (verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) {
+ val data = Seq.tabulate(100) { _.toString }
+
+ def storeAndVerify(blocks: Seq[ReceivedBlock]) {
+ blocks should not be empty
+ val (blockIds, storeResults) = storeBlocks(receivedBlockHandler, blocks)
+ withClue(s"Testing with ${blocks.head.getClass.getSimpleName}s:") {
+ // Verify returns store results have correct block ids
+ (storeResults.map { _.blockId }) shouldEqual blockIds
+
+ // Call handler-specific verification function
+ verifyFunc(data, blockIds, storeResults)
+ }
+ }
+
+ def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator)
+
+ val blocks = data.grouped(10).toSeq
+
+ storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
+ storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
+ storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) })
+ }
+
+ /** Test error handling when blocks that cannot be stored */
+ private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) {
+ // Handle error in iterator (e.g. divide-by-zero error)
+ intercept[Exception] {
+ val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 }
+ receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator))
+ }
+
+ // Handler error in block manager storing (e.g. too big block)
+ intercept[SparkException] {
+ val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1))
+ receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer))
+ }
+ }
+
+ /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */
+ private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) {
+ body(new BlockManagerBasedBlockHandler(blockManager, storageLevel))
+ }
+
+ /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */
+ private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) {
+ val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1,
+ storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock)
+ try {
+ body(receivedBlockHandler)
+ } finally {
+ receivedBlockHandler.stop()
+ }
+ }
+
+ /** Store blocks using a handler */
+ private def storeBlocks(
+ receivedBlockHandler: ReceivedBlockHandler,
+ blocks: Seq[ReceivedBlock]
+ ): (Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) = {
+ val blockIds = Seq.fill(blocks.size)(generateBlockId())
+ val storeResults = blocks.zip(blockIds).map {
+ case (block, id) =>
+ manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf
+ logDebug("Inserting block " + id)
+ receivedBlockHandler.storeBlock(id, block)
+ }.toList
+ logDebug("Done inserting")
+ (blockIds, storeResults)
+ }
+
+ private def getWriteAheadLogFiles(): Seq[String] = {
+ getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
+ }
+
+ private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 5eba93c208..1956a4f1db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -58,7 +58,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
test("WriteAheadLogWriter - writing data") {
val dataToWrite = generateRandomData()
val segments = writeDataUsingWriter(testFile, dataToWrite)
- val writtenData = readDataManually(testFile, segments)
+ val writtenData = readDataManually(segments)
assert(writtenData === dataToWrite)
}
@@ -67,7 +67,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
val writer = new WriteAheadLogWriter(testFile, hadoopConf)
dataToWrite.foreach { data =>
val segment = writer.write(stringToByteBuffer(data))
- val dataRead = readDataManually(testFile, Seq(segment)).head
+ val dataRead = readDataManually(Seq(segment)).head
assert(data === dataRead)
}
writer.close()
@@ -281,14 +281,20 @@ object WriteAheadLogSuite {
}
/** Read data from a segments of a log file directly and return the list of byte buffers.*/
- def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
- val reader = HdfsUtils.getInputStream(file, hadoopConf)
- segments.map { x =>
- reader.seek(x.offset)
- val data = new Array[Byte](x.length)
- reader.readInt()
- reader.readFully(data)
- Utils.deserialize[String](data)
+ def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+ segments.map { segment =>
+ val reader = HdfsUtils.getInputStream(segment.path, hadoopConf)
+ try {
+ reader.seek(segment.offset)
+ val bytes = new Array[Byte](segment.length)
+ reader.readInt()
+ reader.readFully(bytes)
+ val data = Utils.deserialize[String](bytes)
+ reader.close()
+ data
+ } finally {
+ reader.close()
+ }
}
}
@@ -335,9 +341,11 @@ object WriteAheadLogSuite {
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
- fileSystem.listStatus(logDirectoryPath).map {
- _.getPath.toString.stripPrefix("file:")
- }.sorted
+ fileSystem.listStatus(logDirectoryPath).map { _.getPath() }.sortBy {
+ _.getName().split("-")(1).toLong
+ }.map {
+ _.toString.stripPrefix("file:")
+ }
} else {
Seq.empty
}