aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala20
1 files changed, 19 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index fdf995320b..c0670e22a7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -121,6 +121,24 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)
+ private val effectiveStorageLevel = {
+ if (storageLevel.deserialized) {
+ logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
+ s" write ahead log is enabled, change to serialization false")
+ }
+ if (storageLevel.replication > 1) {
+ logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
+ s"write ahead log is enabled, change to replication 1")
+ }
+
+ StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
+ }
+
+ if (storageLevel != effectiveStorageLevel) {
+ logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
+ s"$effectiveStorageLevel when write ahead log is enabled")
+ }
+
// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
@@ -156,7 +174,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
- blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
+ blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")