aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala67
9 files changed, 168 insertions, 23 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index ef0bb2ac13..4e6b7686f7 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -78,7 +78,7 @@ object Bagel extends Logging {
val startTime = System.currentTimeMillis
val aggregated = agg(verts, aggregator)
- val combinedMsgs = msgs.combineByKey(
+ val combinedMsgs = msgs.combineByKeyWithClassTag(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index cfeeb3902c..9aafc9eb1c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import scala.reflect.ClassTag
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
@@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
-class ShuffleDependency[K, V, C](
+class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
@@ -76,6 +78,13 @@ class ShuffleDependency[K, V, C](
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
+ private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
+ private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
+ // Note: It's possible that the combiner class tag is null, if the combineByKey
+ // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
+ private[spark] val combinerClassName: Option[String] =
+ Option(reflect.classTag[C]).map(_.runtimeClass.getName)
+
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index fb787979c1..8344f6368a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -239,7 +239,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
- fromRDD(rdd.combineByKey(
+ fromRDD(rdd.combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners,
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 9c617fc719..7bad749d58 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -22,6 +22,7 @@ import scala.language.existentials
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
@@ -74,7 +75,9 @@ private[spark] class CoGroupPartition(
* @param part partitioner used to partition the shuffle output
*/
@DeveloperApi
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
+class CoGroupedRDD[K: ClassTag](
+ @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
+ part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4e5f2e8a5d..c59f0d4aa7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -57,7 +57,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
with SparkHadoopMapReduceUtil
with Serializable
{
+
/**
+ * :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
@@ -70,12 +72,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
- def combineByKey[C](createCombiner: V => C,
+ @Experimental
+ def combineByKeyWithClassTag[C](
+ createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
- serializer: Serializer = null): RDD[(K, C)] = self.withScope {
+ serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
@@ -103,13 +107,50 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
- * Simplified version of combineByKey that hash-partitions the output RDD.
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. This method is here for backward compatibility. It does not provide combiner
+ * classtag information to the shuffle.
+ *
+ * @see [[combineByKeyWithClassTag]]
*/
- def combineByKey[C](createCombiner: V => C,
+ def combineByKey[C](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true,
+ serializer: Serializer = null): RDD[(K, C)] = self.withScope {
+ combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
+ partitioner, mapSideCombine, serializer)(null)
+ }
+
+ /**
+ * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
+ * This method is here for backward compatibility. It does not provide combiner
+ * classtag information to the shuffle.
+ *
+ * @see [[combineByKeyWithClassTag]]
+ */
+ def combineByKey[C](
+ createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
- combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
+ combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
+ }
+
+ /**
+ * :: Experimental ::
+ * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
+ */
+ @Experimental
+ def combineByKeyWithClassTag[C](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
+ combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
+ new HashPartitioner(numPartitions))
}
/**
@@ -133,7 +174,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
- combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
+ combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
+ cleanedSeqOp, combOp, partitioner)
}
/**
@@ -182,7 +224,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
val cleanedFunc = self.context.clean(func)
- combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
+ combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
+ cleanedFunc, cleanedFunc, partitioner)
}
/**
@@ -268,7 +311,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
- combineByKey[V]((v: V) => v, func, func, partitioner)
+ combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
/**
@@ -392,7 +435,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
h1
}
- combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
+ combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
+ .mapValues(_.cardinality())
}
/**
@@ -466,7 +510,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
- val bufs = combineByKey[CompactBuffer[V]](
+ val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
@@ -565,12 +609,30 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
/**
- * Simplified version of combineByKey that hash-partitions the resulting RDD using the
+ * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
+ * existing partitioner/parallelism level. This method is here for backward compatibility. It
+ * does not provide combiner classtag information to the shuffle.
+ *
+ * @see [[combineByKeyWithClassTag]]
+ */
+ def combineByKey[C](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
+ combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
+ }
+
+ /**
+ * :: Experimental ::
+ * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = self.withScope {
- combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
+ @Experimental
+ def combineByKeyWithClassTag[C](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
+ combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index 2dc47f9593..cb15d912bb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
@@ -37,7 +39,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
*/
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
@DeveloperApi
-class ShuffledRDD[K, V, C](
+class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 9a4fa301b0..25ec685eff 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -63,15 +63,17 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
}
override def getDependencies: Seq[Dependency[_]] = {
- Seq(rdd1, rdd2).map { rdd =>
+ def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
+ : Dependency[_] = {
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
- new ShuffleDependency(rdd, part, serializer)
+ new ShuffleDependency[T1, T2, Any](rdd, part, serializer)
}
}
+ Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}
override def getPartitions: Array[Partition] = {
@@ -105,7 +107,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq
}
}
- def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
+ def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d343bb95cb..4d70bfed90 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -483,7 +483,7 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
- def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
+ def cogroup[K: ClassTag, V: ClassTag](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
: RDD[(K, Array[Iterable[V]])] = {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala
new file mode 100644
index 0000000000..4d5f599fb1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleDependencySuite.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.shuffle
+
+import org.apache.spark._
+
+case class KeyClass()
+
+case class ValueClass()
+
+case class CombinerClass()
+
+class ShuffleDependencySuite extends SparkFunSuite with LocalSparkContext {
+
+ val conf = new SparkConf(loadDefaults = false)
+
+ test("key, value, and combiner classes correct in shuffle dependency without aggregation") {
+ sc = new SparkContext("local", "test", conf.clone())
+ val rdd = sc.parallelize(1 to 5, 4)
+ .map(key => (KeyClass(), ValueClass()))
+ .groupByKey()
+ val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ assert(!dep.mapSideCombine, "Test requires that no map-side aggregator is defined")
+ assert(dep.keyClassName == classOf[KeyClass].getName)
+ assert(dep.valueClassName == classOf[ValueClass].getName)
+ }
+
+ test("key, value, and combiner classes available in shuffle dependency with aggregation") {
+ sc = new SparkContext("local", "test", conf.clone())
+ val rdd = sc.parallelize(1 to 5, 4)
+ .map(key => (KeyClass(), ValueClass()))
+ .aggregateByKey(CombinerClass())({ case (a, b) => a }, { case (a, b) => a })
+ val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ assert(dep.mapSideCombine && dep.aggregator.isDefined, "Test requires map-side aggregation")
+ assert(dep.keyClassName == classOf[KeyClass].getName)
+ assert(dep.valueClassName == classOf[ValueClass].getName)
+ assert(dep.combinerClassName == Some(classOf[CombinerClass].getName))
+ }
+
+ test("combineByKey null combiner class tag handled correctly") {
+ sc = new SparkContext("local", "test", conf.clone())
+ val rdd = sc.parallelize(1 to 5, 4)
+ .map(key => (KeyClass(), ValueClass()))
+ .combineByKey((v: ValueClass) => v,
+ (c: AnyRef, v: ValueClass) => c,
+ (c1: AnyRef, c2: AnyRef) => c1)
+ val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
+ assert(dep.keyClassName == classOf[KeyClass].getName)
+ assert(dep.valueClassName == classOf[ValueClass].getName)
+ assert(dep.combinerClassName == None)
+ }
+
+}