aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-10 19:29:22 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commitb36c4f7cce53446753ecc0ce6f9bdccb12b3350b (patch)
treec78583baa16eb0920b5c80b4e680dd4095e1f819 /streaming/src
parent5004eec37c01db3b96d665b0d9606002af209eda (diff)
downloadspark-b36c4f7cce53446753ecc0ce6f9bdccb12b3350b.tar.gz
spark-b36c4f7cce53446753ecc0ce6f9bdccb12b3350b.tar.bz2
spark-b36c4f7cce53446753ecc0ce6f9bdccb12b3350b.zip
More work on StreamingContext
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala47
1 files changed, 45 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 37ce037d5c..e8cd03847a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -9,6 +9,7 @@ import spark.storage.StorageLevel
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
+import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
def this(master: String, frameworkName: String, batchDuration: Time) =
@@ -17,10 +18,31 @@ class JavaStreamingContext(val ssc: StreamingContext) {
// TODOs:
// - Test StreamingContext functions
// - Test to/from Hadoop functions
- // - Add checkpoint()/remember()
- // - Support creating your own streams
+ // - Support registering InputStreams
// - Add Kafka Stream
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, Int])
+ : DStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream(hostname, port, groupId, Map(topics.toSeq: _*))
+ }
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
@@ -163,6 +185,27 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Time = null) {
+ ssc.checkpoint(directory, interval)
+ }
+
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
+ def remember(duration: Time) {
+ ssc.remember(duration)
+ }
+
+ /**
* Starts the execution of the streams.
*/
def start() = ssc.start()