diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-04-30 23:14:14 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-04-30 23:14:14 -0700 |
commit | 0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92 (patch) | |
tree | 18cb693da7cf83292e1f2af7bdc8a16a1b033454 /sql/core/src/main | |
parent | a9fc50552ec96cd7817dfd19fc681b3368545ee3 (diff) | |
download | spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.gz spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.bz2 spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.zip |
[SPARK-4550] In sort-based shuffle, store map outputs in serialized form
Refer to the JIRA for the design doc and some perf results.
I wanted to call out some of the more possibly controversial changes up front:
* Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work.
* The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object.
* The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change.
* When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #4450 from sryza/sandy-spark-4550 and squashes the following commits:
8c70dd9 [Sandy Ryza] Fix serialization
9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance
6c54e06 [Sandy Ryza] Fix scalastyle
d8462d8 [Sandy Ryza] SPARK-4550
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala | 38 |
1 files changed, 29 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index cec97de2cd..9552f41115 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -50,10 +50,10 @@ private[sql] class Serializer2SerializationStream( extends SerializationStream with Logging { val rowOut = new DataOutputStream(out) - val writeKey = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut) - val writeValue = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut) + val writeKeyFunc = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut) + val writeValueFunc = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut) - def writeObject[T: ClassTag](t: T): SerializationStream = { + override def writeObject[T: ClassTag](t: T): SerializationStream = { val kv = t.asInstanceOf[Product2[Row, Row]] writeKey(kv._1) writeValue(kv._2) @@ -61,6 +61,16 @@ private[sql] class Serializer2SerializationStream( this } + override def writeKey[T: ClassTag](t: T): SerializationStream = { + writeKeyFunc(t.asInstanceOf[Row]) + this + } + + override def writeValue[T: ClassTag](t: T): SerializationStream = { + writeValueFunc(t.asInstanceOf[Row]) + this + } + def flush(): Unit = { rowOut.flush() } @@ -83,17 +93,27 @@ private[sql] class Serializer2DeserializationStream( val key = if (keySchema != null) new SpecificMutableRow(keySchema) else null val value = if (valueSchema != null) new SpecificMutableRow(valueSchema) else null - val readKey = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key) - val readValue = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn, value) + val readKeyFunc = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key) + val readValueFunc = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn, value) - def readObject[T: ClassTag](): T = { - readKey() - readValue() + override def readObject[T: ClassTag](): T = { + readKeyFunc() + readValueFunc() (key, value).asInstanceOf[T] } - def close(): Unit = { + override def readKey[T: ClassTag](): T = { + readKeyFunc() + key.asInstanceOf[T] + } + + override def readValue[T: ClassTag](): T = { + readValueFunc() + value.asInstanceOf[T] + } + + override def close(): Unit = { rowIn.close() } } |