aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-01 08:26:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-01 08:26:10 -0800
commit62965c5d8e3f4f0246ac2c8814ac75ea82b3f238 (patch)
tree6b9a951457b2aa5c07228ee8153bcde1f40b3be5 /streaming
parent6fcd09f499dca66d255aa7196839156433aae442 (diff)
downloadspark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.tar.gz
spark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.tar.bz2
spark-62965c5d8e3f4f0246ac2c8814ac75ea82b3f238.zip
Added ssc.union
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
2 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 8b484e6acf..bb852cbcca 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
if (seqOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
- throw new Exception("Neither previous window has value for key, nor new values found")
+ val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n")
+ throw new Exception("Neither previous window has value for key, nor new values found\n" + info)
}
// Reduce the new values
newValues.reduce(reduceF) // return
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 90dd560752..63d8766749 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -189,6 +189,10 @@ class StreamingContext private (
inputStream
}
+ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ new UnionDStream[T](streams.toArray)
+ }
+
/**
* This function registers a InputDStream as an input stream that will be
* started (InputDStream.start() called) to get the input data streams.