aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-04-30 23:14:14 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-30 23:14:14 -0700
commit0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92 (patch)
tree18cb693da7cf83292e1f2af7bdc8a16a1b033454 /tools
parenta9fc50552ec96cd7817dfd19fc681b3368545ee3 (diff)
downloadspark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.gz
spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.bz2
spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.zip
[SPARK-4550] In sort-based shuffle, store map outputs in serialized form
Refer to the JIRA for the design doc and some perf results. I wanted to call out some of the more possibly controversial changes up front: * Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work. * The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object. * The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change. * When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time. Author: Sandy Ryza <sandy@cloudera.com> Closes #4450 from sryza/sandy-spark-4550 and squashes the following commits: 8c70dd9 [Sandy Ryza] Fix serialization 9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance 6c54e06 [Sandy Ryza] Fix scalastyle d8462d8 [Sandy Ryza] SPARK-4550
Diffstat (limited to 'tools')
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index f2d135397c..baa97616ea 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -46,7 +46,8 @@ object StoragePerfTester {
val totalRecords = dataSizeMb * 1000
val recordsPerMap = totalRecords / numMaps
- val writeData = "1" * recordLength
+ val writeKey = "1" * (recordLength / 2)
+ val writeValue = "1" * (recordLength / 2)
val executor = Executors.newFixedThreadPool(numMaps)
val conf = new SparkConf()
@@ -63,7 +64,7 @@ object StoragePerfTester {
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
- writers(i % numOutputSplits).write(writeData)
+ writers(i % numOutputSplits).write(writeKey, writeValue)
}
writers.map { w =>
w.commitAndClose()