diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-11-21 11:28:37 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-11-21 11:28:37 -0800 |
commit | fd031679df59b83ae0a735ea77c49623f6e257c4 (patch) | |
tree | e33b6286dab6adc88506397c1107aaec5b11d561 /streaming | |
parent | 2ec4b2e38d432ef4f21b725c2fceac863d5f9ea1 (diff) | |
download | spark-fd031679df59b83ae0a735ea77c49623f6e257c4.tar.gz spark-fd031679df59b83ae0a735ea77c49623f6e257c4.tar.bz2 spark-fd031679df59b83ae0a735ea77c49623f6e257c4.zip |
Added partitioner aware union, modified DStream.window.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala | 41 |
1 files changed, 2 insertions, 39 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 03f522e581..49f84310bc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -17,8 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD +import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark._ @@ -57,7 +56,7 @@ class WindowedDStream[T: ClassManifest]( val rddsInWindow = parent.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logInfo("Using partition aware union") - new PartitionAwareUnionRDD(ssc.sc, rddsInWindow) + new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) } else { logInfo("Using normal union") new UnionRDD(ssc.sc,rddsInWindow) @@ -66,39 +65,3 @@ class WindowedDStream[T: ClassManifest]( } } -private[streaming] -class PartitionAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition]) - extends Partition { - override val index = idx - override def hashCode(): Int = idx -} - -private[streaming] -class PartitionAwareUnionRDD[T: ClassManifest]( - sc: SparkContext, - var rdds: Seq[RDD[T]]) - extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { - require(rdds.length > 0) - require(rdds.flatMap(_.partitioner).distinct.length == 1, "Parent RDDs have different partitioners") - - override val partitioner = rdds.head.partitioner - - override def getPartitions: Array[Partition] = { - val numPartitions = rdds.head.partitions.length - (0 until numPartitions).map(index => { - val parentPartitions = rdds.map(_.partitions(index)).toArray - new PartitionAwareUnionRDDPartition(index, parentPartitions) - }).toArray - } - - override def compute(s: Partition, context: TaskContext): Iterator[T] = { - val parentPartitions = s.asInstanceOf[PartitionAwareUnionRDDPartition].partitions - rdds.zip(parentPartitions).iterator.flatMap { - case (rdd, p) => rdd.iterator(p, context) - } - } -} - - - - |