aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-27 18:55:50 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-27 18:55:50 -0700
commit1b900183c8bb4063d8ae7bd5134fdadd52b3a155 (patch)
tree8bdd9ab6f1b65917f7e26b8cd4679bc40ebe1951 /streaming
parent650d717544eeb05e7632202c7004dfd0cb69d637 (diff)
downloadspark-1b900183c8bb4063d8ae7bd5134fdadd52b3a155.tar.gz
spark-1b900183c8bb4063d8ae7bd5134fdadd52b3a155.tar.bz2
spark-1b900183c8bb4063d8ae7bd5134fdadd52b3a155.zip
Added save operations to DStreams.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala61
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala10
3 files changed, 84 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 12d7ba97ea..175ebf104f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -363,6 +363,22 @@ extends Serializable with Logging {
rdds.toSeq
}
+ def saveAsObjectFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsObjectFile(file)
+ }
+ this.foreachRDD(saveFunc)
+ }
+
+ def saveAsTextFiles(prefix: String, suffix: String = "") {
+ val saveFunc = (rdd: RDD[T], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsTextFile(file)
+ }
+ this.foreachRDD(saveFunc)
+ }
+
def register() {
ssc.registerOutputStream(this)
}
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index ce1f4ad0a0..f88247708b 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -1,9 +1,16 @@
package spark.streaming
-import scala.collection.mutable.ArrayBuffer
-import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.streaming.StreamingContext._
-import javax.annotation.Nullable
+
+import spark.{Manifests, RDD, Partitioner, HashPartitioner}
+import spark.SparkContext._
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
extends Serializable {
@@ -231,6 +238,54 @@ extends Serializable {
for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
+
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassManifest[F]) {
+ saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf = new JobConf
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
+ self.foreachRDD(saveFunc)
+ }
+
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
+ prefix: String,
+ suffix: String
+ )(implicit fm: ClassManifest[F]) {
+ saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration
+ ) {
+ val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
+ val file = rddToFileName(prefix, suffix, time)
+ rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
+ }
+ self.foreachRDD(saveFunc)
+ }
+
+ private def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+ private def getValueClass() = implicitly[ClassManifest[V]].erasure
}
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 7c7b3afe47..b3148eaa97 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -225,5 +225,15 @@ object StreamingContext {
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
+
+ def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
+ if (prefix == null) {
+ time.millis.toString
+ } else if (suffix == null || suffix.length ==0) {
+ prefix + "-" + time.milliseconds
+ } else {
+ prefix + "-" + time.milliseconds + "." + suffix
+ }
+ }
}