diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index b0434c9a3b..fe3129b62f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -123,11 +123,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key - val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) + val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) - lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() + lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner) @@ -168,12 +168,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key - val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) + val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task - lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() + lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) |