aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIvan Vergiliev <ivan@leanplum.com>2014-12-18 16:29:36 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-18 16:29:36 -0800
commitf9f58b9a01c4c7eaf0ce5055d6870e69a22297e3 (patch)
tree434124fdfe46851de608fa4b96c00d423a549fc8 /core
parentd5a596d4188bfa85ff49ee85039f54255c19a4de (diff)
downloadspark-f9f58b9a01c4c7eaf0ce5055d6870e69a22297e3.tar.gz
spark-f9f58b9a01c4c7eaf0ce5055d6870e69a22297e3.tar.bz2
spark-f9f58b9a01c4c7eaf0ce5055d6870e69a22297e3.zip
SPARK-4743 - Use SparkEnv.serializer instead of closureSerializer in aggregateByKey and foldByKey
Author: Ivan Vergiliev <ivan@leanplum.com> Closes #3605 from IvanVergiliev/change-serializer and squashes the following commits: a49b7cf [Ivan Vergiliev] Use serializer instead of closureSerializer in aggregate/foldByKey.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b0434c9a3b..fe3129b62f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -123,11 +123,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+ val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
- lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+ lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
@@ -168,12 +168,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+ val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
val zeroArray = new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)
// When deserializing, use a lazy val to create just one instance of the serializer per task
- lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+ lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)