aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-04-23 17:03:54 -0700
committerReynold Xin <rxin@apache.org>2014-04-23 17:03:54 -0700
commit640f9a0efefd42cff86aecd4878a3a57f5ae85fa (patch)
tree0579814373ca50a06ca5f9c630065e9c4efee5e5 /streaming
parent432201c7ee9e1ea1d70a6418cbad1c5ad2653ed3 (diff)
downloadspark-640f9a0efefd42cff86aecd4878a3a57f5ae85fa.tar.gz
spark-640f9a0efefd42cff86aecd4878a3a57f5ae85fa.tar.bz2
spark-640f9a0efefd42cff86aecd4878a3a57f5ae85fa.zip
[SPARK-1540] Add an optional Ordering parameter to PairRDDFunctions.
In https://issues.apache.org/jira/browse/SPARK-1540 we'd like to look at Spark's API to see if we can take advantage of Comparable keys in more places, which will make external spilling more efficient. This PR is a first step towards that that shows how to pass an Ordering when available and still continue functioning otherwise. It does this using a new implicit parameter with a default value of null. The API is currently only in Scala -- in Java we'd have to add new versions of mapToPair and such that take a Comparator, or a new method to add a "type hint" to an RDD. We can address those later though. Unfortunately requiring all keys to be Comparable would not work without requiring RDDs in general to contain only Comparable types. The reason is that methods such as distinct() and intersection() do a shuffle, but should be usable on RDDs of any type. So ordering will have to remain an optimization for the types that can be ordered. I think this isn't a horrible outcome though because one of the nice things about Spark's API is that it works on objects of *any* type, without requiring you to specify a schema or implement Writable or stuff like that. Author: Matei Zaharia <matei@databricks.com> This patch had conflicts when merged, resolved by Committer: Reynold Xin <rxin@apache.org> Closes #487 from mateiz/ordered-keys and squashes the following commits: bd565f6 [Matei Zaharia] Pass an Ordering to only one version of groupBy because the Scala language spec doesn't allow having an optional parameter on all of them (this was only compiling in Scala 2.10 due to a bug). 4629965 [Matei Zaharia] Add tests for other versions of groupBy 3beae85 [Matei Zaharia] Added a test for implicit orderings 80b7a3b [Matei Zaharia] Add an optional Ordering parameter to PairRDDFunctions.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala18
3 files changed, 19 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9ba6e02229..1c89543058 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -495,7 +495,10 @@ class StreamingContext private[streaming] (
object StreamingContext extends Logging {
- implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
+ private[streaming] val DEFAULT_CLEANER_TTL = 3600
+
+ implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairDStreamFunctions[K, V](stream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index a7e5215437..d393cc03cb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -488,7 +488,8 @@ abstract class DStream[T: ClassTag] (
* the RDDs with `numPartitions` partitions (Spark's default number of partitions if
* `numPartitions` not specified).
*/
- def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] =
+ def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
+ : DStream[(T, Long)] =
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
/**
@@ -686,9 +687,10 @@ abstract class DStream[T: ClassTag] (
def countByValueAndWindow(
windowDuration: Duration,
slideDuration: Duration,
- numPartitions: Int = ssc.sc.defaultParallelism
- ): DStream[(T, Long)] = {
-
+ numPartitions: Int = ssc.sc.defaultParallelism)
+ (implicit ord: Ordering[T] = null)
+ : DStream[(T, Long)] =
+ {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 354bc132dc..826bf39e86 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -37,13 +37,13 @@ import org.apache.spark.streaming.{Time, Duration}
* Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
* these functions.
*/
-class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
- extends Serializable {
-
+class PairDStreamFunctions[K, V](self: DStream[(K,V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
+ extends Serializable
+{
private[streaming] def ssc = self.ssc
- private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism)
- = {
+ private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = {
new HashPartitioner(numPartitions)
}
@@ -576,7 +576,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ saveAsHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -607,7 +607,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
prefix: String,
suffix: String
)(implicit fm: ClassTag[F]) {
- saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+ saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
fm.runtimeClass.asInstanceOf[Class[F]])
}
@@ -630,7 +630,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
self.foreachRDD(saveFunc)
}
- private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass
+ private def keyClass: Class[_] = kt.runtimeClass
- private def getValueClass() = implicitly[ClassTag[V]].runtimeClass
+ private def valueClass: Class[_] = vt.runtimeClass
}