From 1b900183c8bb4063d8ae7bd5134fdadd52b3a155 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 27 Oct 2012 18:55:50 -0700 Subject: Added save operations to DStreams. --- .../src/main/scala/spark/streaming/DStream.scala | 16 ++++++ .../spark/streaming/PairDStreamFunctions.scala | 61 ++++++++++++++++++++-- .../scala/spark/streaming/StreamingContext.scala | 10 ++++ 3 files changed, 84 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 12d7ba97ea..175ebf104f 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -363,6 +363,22 @@ extends Serializable with Logging { rdds.toSeq } + def saveAsObjectFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsObjectFile(file) + } + this.foreachRDD(saveFunc) + } + + def saveAsTextFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(file) + } + this.foreachRDD(saveFunc) + } + def register() { ssc.registerOutputStream(this) } diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index ce1f4ad0a0..f88247708b 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -1,9 +1,16 @@ package spark.streaming -import scala.collection.mutable.ArrayBuffer -import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.streaming.StreamingContext._ -import javax.annotation.Nullable + +import spark.{Manifests, RDD, Partitioner, HashPartitioner} +import spark.SparkContext._ + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.conf.Configuration class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) extends Serializable { @@ -231,6 +238,54 @@ extends Serializable { for (v <- vs.iterator; w <- ws.iterator) yield (v, w) } } + + def saveAsHadoopFiles[F <: OutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreachRDD(saveFunc) + } + + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreachRDD(saveFunc) + } + + private def getKeyClass() = implicitly[ClassManifest[K]].erasure + + private def getValueClass() = implicitly[ClassManifest[V]].erasure } diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7c7b3afe47..b3148eaa97 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -225,5 +225,15 @@ object StreamingContext { implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } + + def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { + if (prefix == null) { + time.millis.toString + } else if (suffix == null || suffix.length ==0) { + prefix + "-" + time.milliseconds + } else { + prefix + "-" + time.milliseconds + "." + suffix + } + } } -- cgit v1.2.3