diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-16 22:52:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-16 22:52:55 -0700 |
commit | de1a84e56e81347cb0d1ec67cc86944ea98bb9a9 (patch) | |
tree | a5a577ebb2049d55c46682161b95594ba2537201 /core/src/main/scala/org/apache/spark/SparkEnv.scala | |
parent | d1c193a2f1a5e2b98f5df1b86d7a7ec0ced13668 (diff) | |
download | spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.gz spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.bz2 spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.zip |
[SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.
This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.
In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11755 from JoshRosen/automatically-pick-best-serializer.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkEnv.scala | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b3b3729625..668a913a20 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -35,7 +35,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint -import org.apache.spark.serializer.{JavaSerializer, Serializer} +import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage._ import org.apache.spark.util.{RpcUtils, Utils} @@ -56,6 +56,7 @@ class SparkEnv ( private[spark] val rpcEnv: RpcEnv, val serializer: Serializer, val closureSerializer: Serializer, + val serializerManager: SerializerManager, val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, @@ -276,6 +277,8 @@ object SparkEnv extends Logging { "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") + val serializerManager = new SerializerManager(serializer, conf) + val closureSerializer = new JavaSerializer(conf) def registerOrLookupEndpoint( @@ -368,6 +371,7 @@ object SparkEnv extends Logging { rpcEnv, serializer, closureSerializer, + serializerManager, mapOutputTracker, shuffleManager, broadcastManager, |