aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-16 22:52:55 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 22:52:55 -0700
commitde1a84e56e81347cb0d1ec67cc86944ea98bb9a9 (patch)
treea5a577ebb2049d55c46682161b95594ba2537201 /sql/core
parentd1c193a2f1a5e2b98f5df1b86d7a7ec0ced13668 (diff)
downloadspark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.gz
spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.bz2
spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.zip
[SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo. This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers. In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places). Author: Josh Rosen <joshrosen@databricks.com> Closes #11755 from JoshRosen/automatically-pick-best-serializer.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala4
2 files changed, 3 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index 4eb4d9adbd..7e35db7dd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -261,7 +261,7 @@ object ShuffleExchange {
new ShuffleDependency[Int, InternalRow, InternalRow](
rddWithPartitionIds,
new PartitionIdPassthrough(part.numPartitions),
- Some(serializer))
+ serializer)
dependency
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 50bdcd6c2c..1f3779373b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -118,7 +118,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
taskContext,
partitioner = Some(new HashPartitioner(10)),
- serializer = Some(new UnsafeRowSerializer(numFields = 1)))
+ serializer = new UnsafeRowSerializer(numFields = 1))
// Ensure we spilled something and have to merge them later
assert(sorter.numSpills === 0)
@@ -153,7 +153,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
new ShuffleDependency[Int, InternalRow, InternalRow](
rowsRDD,
new PartitionIdPassthrough(2),
- Some(new UnsafeRowSerializer(2)))
+ new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(dependency)
shuffled.count()
}