diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-01 08:26:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-01 08:26:10 -0800 |
commit | 62965c5d8e3f4f0246ac2c8814ac75ea82b3f238 (patch) | |
tree | 6b9a951457b2aa5c07228ee8153bcde1f40b3be5 /streaming | |
parent | 6fcd09f499dca66d255aa7196839156433aae442 (diff) | |
download | spark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.tar.gz spark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.tar.bz2 spark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.zip |
Added ssc.union
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala | 3 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 4 |
2 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 8b484e6acf..bb852cbcca 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( if (seqOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { - throw new Exception("Neither previous window has value for key, nor new values found") + val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n") + throw new Exception("Neither previous window has value for key, nor new values found\n" + info) } // Reduce the new values newValues.reduce(reduceF) // return diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 90dd560752..63d8766749 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -189,6 +189,10 @@ class StreamingContext private ( inputStream } + def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + new UnionDStream[T](streams.toArray) + } + /** * This function registers a InputDStream as an input stream that will be * started (InputDStream.start() called) to get the input data streams. |