diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-16 22:52:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-16 22:52:55 -0700 |
commit | de1a84e56e81347cb0d1ec67cc86944ea98bb9a9 (patch) | |
tree | a5a577ebb2049d55c46682161b95594ba2537201 /sql | |
parent | d1c193a2f1a5e2b98f5df1b86d7a7ec0ced13668 (diff) | |
download | spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.gz spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.bz2 spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.zip |
[SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.
This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.
In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11755 from JoshRosen/automatically-pick-best-serializer.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala | 4 |
2 files changed, 3 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index 4eb4d9adbd..7e35db7dd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -261,7 +261,7 @@ object ShuffleExchange { new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), - Some(serializer)) + serializer) dependency } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index 50bdcd6c2c..1f3779373b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -118,7 +118,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow]( taskContext, partitioner = Some(new HashPartitioner(10)), - serializer = Some(new UnsafeRowSerializer(numFields = 1))) + serializer = new UnsafeRowSerializer(numFields = 1)) // Ensure we spilled something and have to merge them later assert(sorter.numSpills === 0) @@ -153,7 +153,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { new ShuffleDependency[Int, InternalRow, InternalRow]( rowsRDD, new PartitionIdPassthrough(2), - Some(new UnsafeRowSerializer(2))) + new UnsafeRowSerializer(2)) val shuffled = new ShuffledRowRDD(dependency) shuffled.count() } |