aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-16 22:52:55 -0700
committerReynold Xin <rxin@databricks.com>2016-03-16 22:52:55 -0700
commitde1a84e56e81347cb0d1ec67cc86944ea98bb9a9 (patch)
treea5a577ebb2049d55c46682161b95594ba2537201 /core/src/test
parentd1c193a2f1a5e2b98f5df1b86d7a7ec0ced13668 (diff)
downloadspark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.gz
spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.tar.bz2
spark-de1a84e56e81347cb0d1ec67cc86944ea98bb9a9.zip
[SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo. This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers. In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places). Author: Josh Rosen <joshrosen@databricks.com> Closes #11755 from JoshRosen/automatically-pick-best-serializer.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala34
5 files changed, 24 insertions, 24 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index ddea6f5a69..47c695ad4e 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -191,7 +191,7 @@ public class UnsafeShuffleWriterSuite {
});
when(taskContext.taskMetrics()).thenReturn(taskMetrics);
- when(shuffleDep.serializer()).thenReturn(Option.<Serializer>apply(serializer));
+ when(shuffleDep.serializer()).thenReturn(serializer);
when(shuffleDep.partitioner()).thenReturn(hashPartitioner);
}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index 26a372d6a9..08f52c92e1 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -127,7 +127,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
// Create a mocked shuffle handle to pass into HashShuffleReader.
val shuffleHandle = {
val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
- when(dependency.serializer).thenReturn(Some(serializer))
+ when(dependency.serializer).thenReturn(serializer)
when(dependency.aggregator).thenReturn(None)
when(dependency.keyOrdering).thenReturn(None)
new BaseShuffleHandle(shuffleId, numMaps, dependency)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index cf9f9da1e6..16418f855b 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -66,7 +66,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
dependency = dependency
)
when(dependency.partitioner).thenReturn(new HashPartitioner(7))
- when(dependency.serializer).thenReturn(Some(new JavaSerializer(conf)))
+ when(dependency.serializer).thenReturn(new JavaSerializer(conf))
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)
doAnswer(new Answer[Void] {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
index 8744a072cb..55cebe7c8b 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
@@ -41,7 +41,7 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers {
private def shuffleDep(
partitioner: Partitioner,
- serializer: Option[Serializer],
+ serializer: Serializer,
keyOrdering: Option[Ordering[Any]],
aggregator: Option[Aggregator[Any, Any, Any]],
mapSideCombine: Boolean): ShuffleDependency[Any, Any, Any] = {
@@ -56,7 +56,7 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers {
}
test("supported shuffle dependencies for serialized shuffle") {
- val kryo = Some(new KryoSerializer(new SparkConf()))
+ val kryo = new KryoSerializer(new SparkConf())
assert(canUseSerializedShuffle(shuffleDep(
partitioner = new HashPartitioner(2),
@@ -88,8 +88,8 @@ class SortShuffleManagerSuite extends SparkFunSuite with Matchers {
}
test("unsupported shuffle dependencies for serialized shuffle") {
- val kryo = Some(new KryoSerializer(new SparkConf()))
- val java = Some(new JavaSerializer(new SparkConf()))
+ val kryo = new KryoSerializer(new SparkConf())
+ val java = new JavaSerializer(new SparkConf())
// We only support serializers that support object relocation
assert(!canUseSerializedShuffle(shuffleDep(
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index a62adf1c2c..a1a7ac97d9 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -110,7 +110,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner _, mergeValue _, mergeCombiners _)
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
- context, Some(agg), None, None, None)
+ context, Some(agg), None, None)
val collisionPairs = Seq(
("Aa", "BB"), // 2112
@@ -161,7 +161,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
- val sorter = new ExternalSorter[FixedHashObject, Int, Int](context, Some(agg), None, None, None)
+ val sorter = new ExternalSorter[FixedHashObject, Int, Int](context, Some(agg), None, None)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
val toInsert = for (i <- 1 to 10; j <- 1 to size) yield (FixedHashObject(j, j % 2), 1)
@@ -192,7 +192,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners)
val sorter =
- new ExternalSorter[Int, Int, ArrayBuffer[Int]](context, Some(agg), None, None, None)
+ new ExternalSorter[Int, Int, ArrayBuffer[Int]](context, Some(agg), None, None)
sorter.insertAll(
(1 to size).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue)))
assert(sorter.numSpills > 0, "sorter did not spill")
@@ -219,7 +219,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner, mergeValue, mergeCombiners)
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
- context, Some(agg), None, None, None)
+ context, Some(agg), None, None)
sorter.insertAll((1 to size).iterator.map(i => (i.toString, i.toString)) ++ Iterator(
(null.asInstanceOf[String], "1"),
@@ -283,25 +283,25 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
// Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
- context, Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
+ context, Some(agg), Some(new HashPartitioner(3)), Some(ord))
assert(sorter.iterator.toSeq === Seq())
sorter.stop()
// Only aggregator
val sorter2 = new ExternalSorter[Int, Int, Int](
- context, Some(agg), Some(new HashPartitioner(3)), None, None)
+ context, Some(agg), Some(new HashPartitioner(3)), None)
assert(sorter2.iterator.toSeq === Seq())
sorter2.stop()
// Only ordering
val sorter3 = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(3)), Some(ord), None)
+ context, None, Some(new HashPartitioner(3)), Some(ord))
assert(sorter3.iterator.toSeq === Seq())
sorter3.stop()
// Neither aggregator nor ordering
val sorter4 = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(3)), None, None)
+ context, None, Some(new HashPartitioner(3)), None)
assert(sorter4.iterator.toSeq === Seq())
sorter4.stop()
}
@@ -320,28 +320,28 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
// Both aggregator and ordering
val sorter = new ExternalSorter[Int, Int, Int](
- context, Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
+ context, Some(agg), Some(new HashPartitioner(7)), Some(ord))
sorter.insertAll(elements.iterator)
assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter.stop()
// Only aggregator
val sorter2 = new ExternalSorter[Int, Int, Int](
- context, Some(agg), Some(new HashPartitioner(7)), None, None)
+ context, Some(agg), Some(new HashPartitioner(7)), None)
sorter2.insertAll(elements.iterator)
assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter2.stop()
// Only ordering
val sorter3 = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(7)), Some(ord), None)
+ context, None, Some(new HashPartitioner(7)), Some(ord))
sorter3.insertAll(elements.iterator)
assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter3.stop()
// Neither aggregator nor ordering
val sorter4 = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(7)), None, None)
+ context, None, Some(new HashPartitioner(7)), None)
sorter4.insertAll(elements.iterator)
assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
sorter4.stop()
@@ -358,7 +358,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val elements = Iterator((1, 1), (5, 5)) ++ (0 until size).iterator.map(x => (2, 2))
val sorter = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(7)), Some(ord), None)
+ context, None, Some(new HashPartitioner(7)), Some(ord))
sorter.insertAll(elements)
assert(sorter.numSpills > 0, "sorter did not spill")
val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
@@ -442,7 +442,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val expectedSize = if (withFailures) size - 1 else size
val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val sorter = new ExternalSorter[Int, Int, Int](
- context, None, Some(new HashPartitioner(3)), Some(ord), None)
+ context, None, Some(new HashPartitioner(3)), Some(ord))
if (withFailures) {
intercept[SparkException] {
sorter.insertAll((0 until size).iterator.map { i =>
@@ -512,7 +512,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val ord = if (withOrdering) Some(implicitly[Ordering[Int]]) else None
val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val sorter =
- new ExternalSorter[Int, Int, Int](context, agg, Some(new HashPartitioner(3)), ord, None)
+ new ExternalSorter[Int, Int, Int](context, agg, Some(new HashPartitioner(3)), ord)
sorter.insertAll((0 until size).iterator.map { i => (i / 4, i) })
if (withSpilling) {
assert(sorter.numSpills > 0, "sorter did not spill")
@@ -551,7 +551,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val context = MemoryTestingUtils.fakeTaskContext(sc.env)
val sorter1 = new ExternalSorter[String, String, String](
- context, None, None, Some(wrongOrdering), None)
+ context, None, None, Some(wrongOrdering))
val thrown = intercept[IllegalArgumentException] {
sorter1.insertAll(testData.iterator.map(i => (i, i)))
assert(sorter1.numSpills > 0, "sorter did not spill")
@@ -573,7 +573,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
createCombiner, mergeValue, mergeCombiners)
val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]](
- context, Some(agg), None, None, None)
+ context, Some(agg), None, None)
sorter2.insertAll(testData.iterator.map(i => (i, i)))
assert(sorter2.numSpills > 0, "sorter did not spill")