diff options
author | Matt Massie <massie@cs.berkeley.edu> | 2015-09-10 17:24:33 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-10 17:24:33 -0700 |
commit | 0eabea8a058ad60411c1384930ba12c1c638f5f1 (patch) | |
tree | 365097e7b2520b82e21ad019e9e543cbfe380ab0 /bagel | |
parent | 89562a172fd3efa032f60714d600407c6cfe2c2f (diff) | |
download | spark-0eabea8a058ad60411c1384930ba12c1c638f5f1.tar.gz spark-0eabea8a058ad60411c1384930ba12c1c638f5f1.tar.bz2 spark-0eabea8a058ad60411c1384930ba12c1c638f5f1.zip |
[SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency
ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.
Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).
By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.
Author: Matt Massie <massie@cs.berkeley.edu>
Closes #7403 from massie/shuffle-classtags.
Diffstat (limited to 'bagel')
-rw-r--r-- | bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index ef0bb2ac13..4e6b7686f7 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -78,7 +78,7 @@ object Bagel extends Logging { val startTime = System.currentTimeMillis val aggregated = agg(verts, aggregator) - val combinedMsgs = msgs.combineByKey( + val combinedMsgs = msgs.combineByKeyWithClassTag( combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) val superstep_ = superstep // Create a read-only copy of superstep for capture in closure |