aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-12-23 15:45:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-23 15:45:53 -0800
commit3f5f4cc4e7b3bc458e0579d247a0652dca365853 (patch)
treeaf18738a5f941ab1a4c41423d7434174f33cab75 /streaming
parent10d69e9cbfdabe95d0e513176d5347d7b59da0ee (diff)
downloadspark-3f5f4cc4e7b3bc458e0579d247a0652dca365853.tar.gz
spark-3f5f4cc4e7b3bc458e0579d247a0652dca365853.tar.bz2
spark-3f5f4cc4e7b3bc458e0579d247a0652dca365853.zip
[SPARK-4671][Streaming]Do not replicate streaming block when WAL is enabled
Currently streaming block will be replicated when specific storage level is set, since WAL is already fault tolerant, so replication is needless and will hurt the throughput of streaming application. Hi tdas , as per discussed about this issue, I fixed with this implementation, I'm not is this the way you want, would you mind taking a look at it? Thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #3534 from jerryshao/SPARK-4671 and squashes the following commits: 500b456 [jerryshao] Do not replicate streaming block when WAL is enabled
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala20
1 files changed, 19 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index fdf995320b..c0670e22a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
+ private val effectiveStorageLevel = {
+ if (storageLevel.deserialized) {
+ logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
+ s" write ahead log is enabled, change to serialization false")
+ }
+ if (storageLevel.replication > 1) {
+ logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
+ s"write ahead log is enabled, change to replication 1")
+ }
+
+ StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
+ }
+
+ if (storageLevel != effectiveStorageLevel) {
+ logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
+ s"$effectiveStorageLevel when write ahead log is enabled")
+ }
+
// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
- blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
+ blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")