From 0eabea8a058ad60411c1384930ba12c1c638f5f1 Mon Sep 17 00:00:00 2001 From: Matt Massie Date: Thu, 10 Sep 2015 17:24:33 -0700 Subject: [SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency ShuffleManager implementations are currently not given type information for the key, value and combiner classes. Serialization of shuffle objects relies on objects being JavaSerializable, with methods defined for reading/writing the object or, alternatively, serialization via Kryo which uses reflection. Serialization systems like Avro, Thrift and Protobuf generate classes with zero argument constructors and explicit schema information (e.g. IndexedRecords in Avro have get, put and getSchema methods). By serializing the key, value and combiner class names in ShuffleDependency, shuffle implementations will have access to schema information when registerShuffle() is called. Author: Matt Massie Closes #7403 from massie/shuffle-classtags. --- bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'bagel/src/main/scala') diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index ef0bb2ac13..4e6b7686f7 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -78,7 +78,7 @@ object Bagel extends Logging { val startTime = System.currentTimeMillis val aggregated = agg(verts, aggregator) - val combinedMsgs = msgs.combineByKey( + val combinedMsgs = msgs.combineByKeyWithClassTag( combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) val superstep_ = superstep // Create a read-only copy of superstep for capture in closure -- cgit v1.2.3