aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-04-21 16:24:15 -0700
committerReynold Xin <rxin@databricks.com>2015-04-21 16:24:15 -0700
commitf83c0f112d04173f4fc2c5eaf0f9cb11d9180077 (patch)
tree074eee9e9076f67fea1d60d2393036cfa8374c73 /core/src/test
parent7662ec23bb6c4d4fe4c857b6928eaed0a97d3c04 (diff)
downloadspark-f83c0f112d04173f4fc2c5eaf0f9cb11d9180077.tar.gz
spark-f83c0f112d04173f4fc2c5eaf0f9cb11d9180077.tar.bz2
spark-f83c0f112d04173f4fc2c5eaf0f9cb11d9180077.zip
[SPARK-3386] Share and reuse SerializerInstances in shuffle paths
This patch modifies several shuffle-related code paths to share and re-use SerializerInstances instead of creating new ones. Some serializers, such as KryoSerializer or SqlSerializer, can be fairly expensive to create or may consume moderate amounts of memory, so it's probably best to avoid unnecessary serializer creation in hot code paths. The key change in this patch is modifying `getDiskWriter()` / `DiskBlockObjectWriter` to accept `SerializerInstance`s instead of `Serializer`s (which are factories for instances). This allows the disk writer's creator to decide whether the serializer instance can be shared or re-used. The rest of the patch modifies several write and read paths to use shared serializers. One big win is in `ShuffleBlockFetcherIterator`, where we used to create a new serializer per received block. Similarly, the shuffle write path used to create a new serializer per file even though in many cases only a single thread would be writing to a file at a time. I made a small serializer reuse optimization in CoarseGrainedExecutorBackend as well, since it seemed like a small and obvious improvement. Author: Josh Rosen <joshrosen@databricks.com> Closes #5606 from JoshRosen/SPARK-3386 and squashes the following commits: f661ce7 [Josh Rosen] Remove thread local; add comment instead 64f8398 [Josh Rosen] Use ThreadLocal for serializer instance in CoarseGrainedExecutorBackend aeb680e [Josh Rosen] [SPARK-3386] Reuse SerializerInstance in shuffle code paths
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index 78bbc4ec2c..003a728cb8 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -30,7 +30,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
// Record metrics update on every write
@@ -52,7 +52,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.write(Long.box(20))
// Record metrics update on every write
@@ -75,7 +75,7 @@ class BlockObjectWriterSuite extends FunSuite {
val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
- new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+ new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
writer.open()
writer.close()