aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-11-21 11:28:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-11-21 11:28:37 -0800
commitfd031679df59b83ae0a735ea77c49623f6e257c4 (patch)
treee33b6286dab6adc88506397c1107aaec5b11d561 /streaming
parent2ec4b2e38d432ef4f21b725c2fceac863d5f9ea1 (diff)
downloadspark-fd031679df59b83ae0a735ea77c49623f6e257c4.tar.gz
spark-fd031679df59b83ae0a735ea77c49623f6e257c4.tar.bz2
spark-fd031679df59b83ae0a735ea77c49623f6e257c4.zip
Added partitioner aware union, modified DStream.window.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala41
1 files changed, 2 insertions, 39 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 03f522e581..49f84310bc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -17,8 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark._
@@ -57,7 +56,7 @@ class WindowedDStream[T: ClassManifest](
val rddsInWindow = parent.slice(currentWindow)
val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
logInfo("Using partition aware union")
- new PartitionAwareUnionRDD(ssc.sc, rddsInWindow)
+ new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
logInfo("Using normal union")
new UnionRDD(ssc.sc,rddsInWindow)
@@ -66,39 +65,3 @@ class WindowedDStream[T: ClassManifest](
}
}
-private[streaming]
-class PartitionAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
- extends Partition {
- override val index = idx
- override def hashCode(): Int = idx
-}
-
-private[streaming]
-class PartitionAwareUnionRDD[T: ClassManifest](
- sc: SparkContext,
- var rdds: Seq[RDD[T]])
- extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
- require(rdds.length > 0)
- require(rdds.flatMap(_.partitioner).distinct.length == 1, "Parent RDDs have different partitioners")
-
- override val partitioner = rdds.head.partitioner
-
- override def getPartitions: Array[Partition] = {
- val numPartitions = rdds.head.partitions.length
- (0 until numPartitions).map(index => {
- val parentPartitions = rdds.map(_.partitions(index)).toArray
- new PartitionAwareUnionRDDPartition(index, parentPartitions)
- }).toArray
- }
-
- override def compute(s: Partition, context: TaskContext): Iterator[T] = {
- val parentPartitions = s.asInstanceOf[PartitionAwareUnionRDDPartition].partitions
- rdds.zip(parentPartitions).iterator.flatMap {
- case (rdd, p) => rdd.iterator(p, context)
- }
- }
-}
-
-
-
-