diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-04-30 23:14:14 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-04-30 23:14:14 -0700 |
commit | 0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92 (patch) | |
tree | 18cb693da7cf83292e1f2af7bdc8a16a1b033454 /tools | |
parent | a9fc50552ec96cd7817dfd19fc681b3368545ee3 (diff) | |
download | spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.gz spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.bz2 spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.zip |
[SPARK-4550] In sort-based shuffle, store map outputs in serialized form
Refer to the JIRA for the design doc and some perf results.
I wanted to call out some of the more possibly controversial changes up front:
* Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work.
* The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object.
* The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change.
* When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #4450 from sryza/sandy-spark-4550 and squashes the following commits:
8c70dd9 [Sandy Ryza] Fix serialization
9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance
6c54e06 [Sandy Ryza] Fix scalastyle
d8462d8 [Sandy Ryza] SPARK-4550
Diffstat (limited to 'tools')
-rw-r--r-- | tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala index f2d135397c..baa97616ea 100644 --- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -46,7 +46,8 @@ object StoragePerfTester { val totalRecords = dataSizeMb * 1000 val recordsPerMap = totalRecords / numMaps - val writeData = "1" * recordLength + val writeKey = "1" * (recordLength / 2) + val writeValue = "1" * (recordLength / 2) val executor = Executors.newFixedThreadPool(numMaps) val conf = new SparkConf() @@ -63,7 +64,7 @@ object StoragePerfTester { new KryoSerializer(sc.conf), new ShuffleWriteMetrics()) val writers = shuffle.writers for (i <- 1 to recordsPerMap) { - writers(i % numOutputSplits).write(writeData) + writers(i % numOutputSplits).write(writeKey, writeValue) } writers.map { w => w.commitAndClose() |