aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-21 17:19:39 -0700
commitb5f1ab701a167a728bb006e01b392b203da84391 (patch)
treeaf002e40214b1029e7d6f99b7b61389dbdac6895 /streaming/src
parentb3e5af62a1c9e11556c4721164e6539d7ecce8e7 (diff)
downloadspark-b5f1ab701a167a728bb006e01b392b203da84391.tar.gz
spark-b5f1ab701a167a728bb006e01b392b203da84391.tar.bz2
spark-b5f1ab701a167a728bb006e01b392b203da84391.zip
[SPARK-13990] Automatically pick serializer when caching RDDs
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks. When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager. There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala8
2 files changed, 7 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 8625882b04..ace67a639c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -161,7 +161,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
- blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
+ blockManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
+ .asInstanceOf[Iterator[T]]
}
if (partition.isBlockIdValid) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 76f67ed601..122ca0627f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -34,12 +34,13 @@ import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.io.ChunkedByteBuffer
class ReceivedBlockHandlerSuite
extends SparkFunSuite
@@ -155,7 +156,7 @@ class ReceivedBlockHandlerSuite
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
val bytes = reader.read(fileSegment)
reader.close()
- blockManager.dataDeserialize(generateBlockId(), bytes).toList
+ blockManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
}
loggedData shouldEqual data
}
@@ -264,7 +265,8 @@ class ReceivedBlockHandlerSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf,
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
blockManager.initialize("app-id")