aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala57
5 files changed, 105 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dcb6b6824b..e9d2f57579 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1281,8 +1281,10 @@ object SparkContext extends Logging {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
- implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
+ implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairRDDFunctions(rdd)
+ }
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index d250bef6aa..d2b9ee4276 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -52,11 +52,12 @@ import org.apache.spark.util.SerializableHyperLogLog
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
+class PairRDDFunctions[K, V](self: RDD[(K, V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
- with Serializable {
-
+ with Serializable
+{
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
@@ -77,7 +78,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
- if (getKeyClass().isArray) {
+ if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
@@ -170,7 +171,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
- if (getKeyClass().isArray) {
+ if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
@@ -288,7 +289,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
- if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+ if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
if (self.partitioner == partitioner) self else new ShuffledRDD[K, V, (K, V)](self, partitioner)
@@ -458,7 +459,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = {
- if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
@@ -473,7 +474,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
- if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
@@ -573,7 +574,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -584,7 +585,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
val runtimeClass = fm.runtimeClass
- saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec)
+ saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
@@ -592,7 +593,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
- saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]])
+ saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}
/**
@@ -782,7 +783,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def values: RDD[V] = self.map(_._2)
- private[spark] def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+ private[spark] def keyClass: Class[_] = kt.runtimeClass
+
+ private[spark] def valueClass: Class[_] = vt.runtimeClass
- private[spark] def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+ private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 596dcb84db..6c897cc03b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -284,7 +284,7 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numPartitions: Int): RDD[T] =
+ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
/**
@@ -301,7 +301,7 @@ abstract class RDD[T: ClassTag](
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
- def repartition(numPartitions: Int): RDD[T] = {
+ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
coalesce(numPartitions, shuffle = true)
}
@@ -325,7 +325,8 @@ abstract class RDD[T: ClassTag](
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
- def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+ def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
+ : RDD[T] = {
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
@@ -424,10 +425,11 @@ abstract class RDD[T: ClassTag](
*
* Note that this method performs a shuffle internally.
*/
- def intersection(other: RDD[T]): RDD[T] =
+ def intersection(other: RDD[T]): RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
+ }
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -437,10 +439,12 @@ abstract class RDD[T: ClassTag](
*
* @param partitioner Partitioner to use for the resulting RDD
*/
- def intersection(other: RDD[T], partitioner: Partitioner): RDD[T] =
+ def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null)
+ : RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
+ }
/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
@@ -450,10 +454,11 @@ abstract class RDD[T: ClassTag](
*
* @param numPartitions How many partitions to use in the resulting RDD
*/
- def intersection(other: RDD[T], numPartitions: Int): RDD[T] =
+ def intersection(other: RDD[T], numPartitions: Int): RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), new HashPartitioner(numPartitions))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
+ }
/**
* Return an RDD created by coalescing all elements within each partition into an array.
@@ -467,22 +472,25 @@ abstract class RDD[T: ClassTag](
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
- * Return an RDD of grouped items.
+ * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
+ * mapping to that key.
*/
- def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
+ def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
+ def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
- * Return an RDD of grouped items.
+ * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
+ * mapping to that key.
*/
- def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
+ def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
+ : RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
@@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
- def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
+ def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
@@ -847,7 +855,7 @@ abstract class RDD[T: ClassTag](
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
- def countByValue(): Map[T, Long] = {
+ def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = {
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValue() does not support arrays")
}
@@ -877,10 +885,10 @@ abstract class RDD[T: ClassTag](
* Approximate version of countByValue().
*/
@Experimental
- def countByValueApprox(
- timeout: Long,
- confidence: Double = 0.95
- ): PartialResult[Map[T, BoundedDouble]] = {
+ def countByValueApprox(timeout: Long, confidence: Double = 0.95)
+ (implicit ord: Ordering[T] = null)
+ : PartialResult[Map[T, BoundedDouble]] =
+ {
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
@@ -1030,13 +1038,13 @@ abstract class RDD[T: ClassTag](
* Returns the max of this RDD as defined by the implicit Ordering[T].
* @return the maximum element of the RDD
* */
- def max()(implicit ord: Ordering[T]):T = this.reduce(ord.max)
+ def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max)
/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
- def min()(implicit ord: Ordering[T]):T = this.reduce(ord.min)
+ def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)
/**
* Save this RDD as a text file, using string representations of elements.
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 7df9a2960d..9a1efc83cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -68,8 +68,8 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
val keyClass = getWritableClass[K]
val valueClass = getWritableClass[V]
- val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
- val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
+ val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
+ val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
valueClass.getSimpleName + ")" )
diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
new file mode 100644
index 0000000000..4bd8891356
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+
+class ImplicitOrderingSuite extends FunSuite with LocalSparkContext {
+ class NonOrderedClass {}
+
+ class ComparableClass extends Comparable[ComparableClass] {
+ override def compareTo(o: ComparableClass): Int = ???
+ }
+
+ class OrderedClass extends Ordered[OrderedClass] {
+ override def compare(o: OrderedClass): Int = ???
+ }
+
+ // Tests that PairRDDFunctions grabs an implicit Ordering in various cases where it should.
+ test("basic inference of Orderings"){
+ sc = new SparkContext("local", "test")
+ val rdd = sc.parallelize(1 to 10)
+
+ // Infer orderings after basic maps to particular types
+ assert(rdd.map(x => (x, x)).keyOrdering.isDefined)
+ assert(rdd.map(x => (1, x)).keyOrdering.isDefined)
+ assert(rdd.map(x => (x.toString, x)).keyOrdering.isDefined)
+ assert(rdd.map(x => (null, x)).keyOrdering.isDefined)
+ assert(rdd.map(x => (new NonOrderedClass, x)).keyOrdering.isEmpty)
+ assert(rdd.map(x => (new ComparableClass, x)).keyOrdering.isDefined)
+ assert(rdd.map(x => (new OrderedClass, x)).keyOrdering.isDefined)
+
+ // Infer orderings for other RDD methods
+ assert(rdd.groupBy(x => x).keyOrdering.isDefined)
+ assert(rdd.groupBy(x => new NonOrderedClass).keyOrdering.isEmpty)
+ assert(rdd.groupBy(x => new ComparableClass).keyOrdering.isDefined)
+ assert(rdd.groupBy(x => new OrderedClass).keyOrdering.isDefined)
+ assert(rdd.groupBy((x: Int) => x, 5).keyOrdering.isDefined)
+ assert(rdd.groupBy((x: Int) => x, new HashPartitioner(5)).keyOrdering.isDefined)
+ }
+}