aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-04-30 23:14:14 -0700
committerPatrick Wendell <patrick@databricks.com>2015-04-30 23:14:14 -0700
commit0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92 (patch)
tree18cb693da7cf83292e1f2af7bdc8a16a1b033454 /sql/core/src
parenta9fc50552ec96cd7817dfd19fc681b3368545ee3 (diff)
downloadspark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.gz
spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.tar.bz2
spark-0a2b15ce43cf6096e1a7ae060b7c8a4010ce3b92.zip
[SPARK-4550] In sort-based shuffle, store map outputs in serialized form
Refer to the JIRA for the design doc and some perf results. I wanted to call out some of the more possibly controversial changes up front: * Map outputs are only stored in serialized form when Kryo is in use. I'm still unsure whether Java-serialized objects can be relocated. At the very least, Java serialization writes out a stream header which causes problems with the current approach, so I decided to leave investigating this to future work. * The shuffle now explicitly operates on key-value pairs instead of any object. Data is written to shuffle files in alternating keys and values instead of key-value tuples. `BlockObjectWriter.write` now accepts a key argument and a value argument instead of any object. * The map output buffer can hold a max of Integer.MAX_VALUE bytes. Though this wouldn't be terribly difficult to change. * When spilling occurs, the objects that still in memory at merge time end up serialized and deserialized an extra time. Author: Sandy Ryza <sandy@cloudera.com> Closes #4450 from sryza/sandy-spark-4550 and squashes the following commits: 8c70dd9 [Sandy Ryza] Fix serialization 9c16fe6 [Sandy Ryza] Fix a couple tests and move getAutoReset to KryoSerializerInstance 6c54e06 [Sandy Ryza] Fix scalastyle d8462d8 [Sandy Ryza] SPARK-4550
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala38
1 files changed, 29 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
index cec97de2cd..9552f41115 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala
@@ -50,10 +50,10 @@ private[sql] class Serializer2SerializationStream(
extends SerializationStream with Logging {
val rowOut = new DataOutputStream(out)
- val writeKey = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut)
- val writeValue = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut)
+ val writeKeyFunc = SparkSqlSerializer2.createSerializationFunction(keySchema, rowOut)
+ val writeValueFunc = SparkSqlSerializer2.createSerializationFunction(valueSchema, rowOut)
- def writeObject[T: ClassTag](t: T): SerializationStream = {
+ override def writeObject[T: ClassTag](t: T): SerializationStream = {
val kv = t.asInstanceOf[Product2[Row, Row]]
writeKey(kv._1)
writeValue(kv._2)
@@ -61,6 +61,16 @@ private[sql] class Serializer2SerializationStream(
this
}
+ override def writeKey[T: ClassTag](t: T): SerializationStream = {
+ writeKeyFunc(t.asInstanceOf[Row])
+ this
+ }
+
+ override def writeValue[T: ClassTag](t: T): SerializationStream = {
+ writeValueFunc(t.asInstanceOf[Row])
+ this
+ }
+
def flush(): Unit = {
rowOut.flush()
}
@@ -83,17 +93,27 @@ private[sql] class Serializer2DeserializationStream(
val key = if (keySchema != null) new SpecificMutableRow(keySchema) else null
val value = if (valueSchema != null) new SpecificMutableRow(valueSchema) else null
- val readKey = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key)
- val readValue = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn, value)
+ val readKeyFunc = SparkSqlSerializer2.createDeserializationFunction(keySchema, rowIn, key)
+ val readValueFunc = SparkSqlSerializer2.createDeserializationFunction(valueSchema, rowIn, value)
- def readObject[T: ClassTag](): T = {
- readKey()
- readValue()
+ override def readObject[T: ClassTag](): T = {
+ readKeyFunc()
+ readValueFunc()
(key, value).asInstanceOf[T]
}
- def close(): Unit = {
+ override def readKey[T: ClassTag](): T = {
+ readKeyFunc()
+ key.asInstanceOf[T]
+ }
+
+ override def readValue[T: ClassTag](): T = {
+ readValueFunc()
+ value.asInstanceOf[T]
+ }
+
+ override def close(): Unit = {
rowIn.close()
}
}