From b36c4f7cce53446753ecc0ce6f9bdccb12b3350b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 10 Jan 2013 19:29:22 -0800 Subject: More work on StreamingContext --- .../streaming/api/java/JavaStreamingContext.scala | 47 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 37ce037d5c..e8cd03847a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -9,6 +9,7 @@ import spark.storage.StorageLevel import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import java.io.InputStream +import java.util.{Map => JMap} class JavaStreamingContext(val ssc: StreamingContext) { def this(master: String, frameworkName: String, batchDuration: Time) = @@ -17,10 +18,31 @@ class JavaStreamingContext(val ssc: StreamingContext) { // TODOs: // - Test StreamingContext functions // - Test to/from Hadoop functions - // - Add checkpoint()/remember() - // - Support creating your own streams + // - Support registering InputStreams // - Add Kafka Stream + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, Int]) + : DStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream(hostname, port, groupId, Map(topics.toSeq: _*)) + } /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited @@ -162,6 +184,27 @@ class JavaStreamingContext(val ssc: StreamingContext) { ssc.registerOutputStream(outputStream.dstream) } + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Time = null) { + ssc.checkpoint(directory, interval) + } + + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Time) { + ssc.remember(duration) + } + /** * Starts the execution of the streams. */ -- cgit v1.2.3