aboutsummaryrefslogtreecommitdiff
path: root/bagel/src/main/scala
diff options
context:
space:
mode:
authorMatt Massie <massie@cs.berkeley.edu>2015-09-10 17:24:33 -0700
committerReynold Xin <rxin@databricks.com>2015-09-10 17:24:33 -0700
commit0eabea8a058ad60411c1384930ba12c1c638f5f1 (patch)
tree365097e7b2520b82e21ad019e9e543cbfe380ab0 /bagel/src/main/scala
parent89562a172fd3efa032f60714d600407c6cfe2c2f (diff)
downloadspark-0eabea8a058ad60411c1384930ba12c1c638f5f1.tar.gz
spark-0eabea8a058ad60411c1384930ba12c1c638f5f1.tar.bz2
spark-0eabea8a058ad60411c1384930ba12c1c638f5f1.zip
[SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency
ShuffleManager implementations are currently not given type information for the key, value and combiner classes. Serialization of shuffle objects relies on objects being JavaSerializable, with methods defined for reading/writing the object or, alternatively, serialization via Kryo which uses reflection. Serialization systems like Avro, Thrift and Protobuf generate classes with zero argument constructors and explicit schema information (e.g. IndexedRecords in Avro have get, put and getSchema methods). By serializing the key, value and combiner class names in ShuffleDependency, shuffle implementations will have access to schema information when registerShuffle() is called. Author: Matt Massie <massie@cs.berkeley.edu> Closes #7403 from massie/shuffle-classtags.
Diffstat (limited to 'bagel/src/main/scala')
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index ef0bb2ac13..4e6b7686f7 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -78,7 +78,7 @@ object Bagel extends Logging {
val startTime = System.currentTimeMillis
val aggregated = agg(verts, aggregator)
- val combinedMsgs = msgs.combineByKey(
+ val combinedMsgs = msgs.combineByKeyWithClassTag(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure