diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-08-31 19:27:07 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-09-01 14:13:13 -0700 |
commit | 46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef (patch) | |
tree | 4a46971b36680bc5ef51be81ada8eb47670f6b22 /streaming/src/main/scala/org/apache | |
parent | a30fac16ca0525f2001b127e5f9518c9680844c9 (diff) | |
download | spark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.tar.gz spark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.tar.bz2 spark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.zip |
Initial work to rename package to org.apache.spark
Diffstat (limited to 'streaming/src/main/scala/org/apache')
50 files changed, 7684 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala new file mode 100644 index 0000000000..2d8f072624 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.io._ +import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.Logging +import org.apache.spark.io.CompressionCodec + + +private[streaming] +class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) + extends Logging with Serializable { + val master = ssc.sc.master + val framework = ssc.sc.appName + val sparkHome = ssc.sc.sparkHome + val jars = ssc.sc.jars + val environment = ssc.sc.environment + val graph = ssc.graph + val checkpointDir = ssc.checkpointDir + val checkpointDuration = ssc.checkpointDuration + val pendingTimes = ssc.scheduler.jobManager.getPendingTimes() + + def validate() { + assert(master != null, "Checkpoint.master is null") + assert(framework != null, "Checkpoint.framework is null") + assert(graph != null, "Checkpoint.graph is null") + assert(checkpointTime != null, "Checkpoint.checkpointTime is null") + logInfo("Checkpoint for time " + checkpointTime + " validated") + } +} + + +/** + * Convenience class to speed up the writing of graph checkpoint to file + */ +private[streaming] +class CheckpointWriter(checkpointDir: String) extends Logging { + val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + private var stopped = false + + val conf = new Configuration() + var fs = file.getFileSystem(conf) + val maxAttempts = 3 + val executor = Executors.newFixedThreadPool(1) + + private val compressionCodec = CompressionCodec.createCodec() + + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { + def run() { + var attempts = 0 + val startTime = System.currentTimeMillis() + while (attempts < maxAttempts) { + attempts += 1 + try { + logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) + fos.write(bytes) + fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + + val finishTime = System.currentTimeMillis(); + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") + return + } catch { + case ioe: IOException => + logWarning("Error writing checkpoint to file in " + attempts + " attempts", ioe) + } + } + logError("Could not write checkpoint for time " + checkpointTime + " to file '" + file + "'") + } + } + + def write(checkpoint: Checkpoint) { + val bos = new ByteArrayOutputStream() + val zos = compressionCodec.compressedOutputStream(bos) + val oos = new ObjectOutputStream(zos) + oos.writeObject(checkpoint) + oos.close() + bos.close() + try { + executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) + } catch { + case rej: RejectedExecutionException => + logError("Could not submit checkpoint task to the thread pool executor", rej) + } + } + + def stop() { + synchronized { + if (stopped) return ; + stopped = true + } + executor.shutdown() + val startTime = System.currentTimeMillis() + val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) + val endTime = System.currentTimeMillis() + logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") + } +} + + +private[streaming] +object CheckpointReader extends Logging { + + def read(path: String): Checkpoint = { + val fs = new Path(path).getFileSystem(new Configuration()) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + + val compressionCodec = CompressionCodec.createCodec() + + attempts.foreach(file => { + if (fs.exists(file)) { + logInfo("Attempting to load checkpoint from file '" + file + "'") + try { + val fis = fs.open(file) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) + val zis = compressionCodec.compressedInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader) + val cp = ois.readObject.asInstanceOf[Checkpoint] + ois.close() + fs.close() + cp.validate() + logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) + return cp + } catch { + case e: Exception => + logError("Error loading checkpoint from file '" + file + "'", e) + } + } else { + logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") + } + + }) + throw new Exception("Could not read checkpoint from path '" + path + "'") + } +} + +private[streaming] +class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) + extends ObjectInputStream(inputStream_) { + + override def resolveClass(desc: ObjectStreamClass): Class[_] = { + try { + return loader.loadClass(desc.getName()) + } catch { + case e: Exception => + } + return super.resolveClass(desc) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala new file mode 100644 index 0000000000..362247cc38 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -0,0 +1,702 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.dstream._ +import StreamingContext._ +import org.apache.spark.util.MetadataCleaner + +//import Time._ + +import org.apache.spark.{RDD, Logging} +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ + +abstract class DStream[T: ClassManifest] ( + @transient protected[streaming] var ssc: StreamingContext + ) extends Serializable with Logging { + + initLogging() + + // ======================================================================= + // Methods that should be implemented by subclasses of DStream + // ======================================================================= + + /** Time interval after which the DStream generates a RDD */ + def slideDuration: Duration + + /** List of parent DStreams on which this DStream depends on */ + def dependencies: List[DStream[_]] + + /** Method that generates a RDD for the given time */ + def compute (validTime: Time): Option[RDD[T]] + + // ======================================================================= + // Methods and fields available on all DStreams + // ======================================================================= + + // RDDs generated, marked as protected[streaming] so that testsuites can access it + @transient + protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () + + // Time zero for the DStream + protected[streaming] var zeroTime: Time = null + + // Duration for which the DStream will remember each RDD created + protected[streaming] var rememberDuration: Duration = null + + // Storage level of the RDDs in the stream + protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE + + // Checkpoint details + protected[streaming] val mustCheckpoint = false + protected[streaming] var checkpointDuration: Duration = null + protected[streaming] val checkpointData = new DStreamCheckpointData(this) + + // Reference to whole DStream graph + protected[streaming] var graph: DStreamGraph = null + + protected[streaming] def isInitialized = (zeroTime != null) + + // Duration for which the DStream requires its parent DStream to remember each RDD created + protected[streaming] def parentRememberDuration = rememberDuration + + /** Return the StreamingContext associated with this DStream */ + def context = ssc + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(level: StorageLevel): DStream[T] = { + if (this.isInitialized) { + throw new UnsupportedOperationException( + "Cannot change storage level of an DStream after streaming context has started") + } + this.storageLevel = level + this + } + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): DStream[T] = persist() + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration): DStream[T] = { + if (isInitialized) { + throw new UnsupportedOperationException( + "Cannot change checkpoint interval of an DStream after streaming context has started") + } + persist() + checkpointDuration = interval + this + } + + /** + * Initialize the DStream by setting the "zero" time, based on which + * the validity of future times is calculated. This method also recursively initializes + * its parent DStreams. + */ + protected[streaming] def initialize(time: Time) { + if (zeroTime != null && zeroTime != time) { + throw new Exception("ZeroTime is already initialized to " + zeroTime + + ", cannot initialize it again to " + time) + } + zeroTime = time + + // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger + if (mustCheckpoint && checkpointDuration == null) { + checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt + logInfo("Checkpoint interval automatically set to " + checkpointDuration) + } + + // Set the minimum value of the rememberDuration if not already set + var minRememberDuration = slideDuration + if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { + minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + } + if (rememberDuration == null || rememberDuration < minRememberDuration) { + rememberDuration = minRememberDuration + } + + // Initialize the dependencies + dependencies.foreach(_.initialize(zeroTime)) + } + + protected[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + + assert( + !mustCheckpoint || checkpointDuration != null, + "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + + " Please use DStream.checkpoint() to set the interval." + ) + + assert( + checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, + "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + + " or SparkContext.checkpoint() to set the checkpoint directory." + ) + + assert( + checkpointDuration == null || checkpointDuration >= slideDuration, + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + + "Please set it to at least " + slideDuration + "." + ) + + assert( + checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + + "Please set it to a multiple " + slideDuration + "." + ) + + assert( + checkpointDuration == null || storageLevel != StorageLevel.NONE, + "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + + "level has not been set to enable persisting. Please use DStream.persist() to set the " + + "storage level to use memory for better checkpointing performance." + ) + + assert( + checkpointDuration == null || rememberDuration > checkpointDuration, + "The remember duration for " + this.getClass.getSimpleName + " has been set to " + + rememberDuration + " which is not more than the checkpoint interval (" + + checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." + ) + + val metadataCleanerDelay = MetadataCleaner.getDelaySeconds + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) + assert( + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." + ) + + dependencies.foreach(_.validate()) + + logInfo("Slide time = " + slideDuration) + logInfo("Storage level = " + storageLevel) + logInfo("Checkpoint interval = " + checkpointDuration) + logInfo("Remember duration = " + rememberDuration) + logInfo("Initialized and validated " + this) + } + + protected[streaming] def setContext(s: StreamingContext) { + if (ssc != null && ssc != s) { + throw new Exception("Context is already set in " + this + ", cannot set it again") + } + ssc = s + logInfo("Set context for " + this) + dependencies.foreach(_.setContext(ssc)) + } + + protected[streaming] def setGraph(g: DStreamGraph) { + if (graph != null && graph != g) { + throw new Exception("Graph is already set in " + this + ", cannot set it again") + } + graph = g + dependencies.foreach(_.setGraph(graph)) + } + + protected[streaming] def remember(duration: Duration) { + if (duration != null && duration > rememberDuration) { + rememberDuration = duration + logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) + } + dependencies.foreach(_.remember(parentRememberDuration)) + } + + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ + protected def isTimeValid(time: Time): Boolean = { + if (!isInitialized) { + throw new Exception (this + " has not been initialized") + } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) + false + } else { + logInfo("Time " + time + " is valid") + true + } + } + + /** + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal + * method that should not be called directly. + */ + protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + // If this DStream was not initialized (i.e., zeroTime not set), then do it + // If RDD was already generated, then retrieve it from HashMap + generatedRDDs.get(time) match { + + // If an RDD was already generated and is being reused, then + // probably all RDDs in this DStream will be reused and hence should be cached + case Some(oldRDD) => Some(oldRDD) + + // if RDD was not generated, and if the time is valid + // (based on sliding time of this DStream), then generate the RDD + case None => { + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => + if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) + } + if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { + newRDD.checkpoint() + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) + } + generatedRDDs.put(time, newRDD) + Some(newRDD) + case None => + None + } + } else { + None + } + } + } + } + + /** + * Generate a SparkStreaming job for the given time. This is an internal method that + * should not be called directly. This default implementation creates a job + * that materializes the corresponding RDD. Subclasses of DStream may override this + * to generate their own jobs. + */ + protected[streaming] def generateJob(time: Time): Option[Job] = { + getOrCompute(time) match { + case Some(rdd) => { + val jobFunc = () => { + val emptyFunc = { (iterator: Iterator[T]) => {} } + context.sparkContext.runJob(rdd, emptyFunc) + } + Some(new Job(time, jobFunc)) + } + case None => None + } + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This default + * implementation clears the old generated RDDs. Subclasses of DStream may override + * this to clear their own metadata along with the generated RDDs. + */ + protected[streaming] def clearOldMetadata(time: Time) { + var numForgotten = 0 + val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + generatedRDDs --= oldRDDs.keys + logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + dependencies.foreach(_.clearOldMetadata(time)) + } + + /* Adds metadata to the Stream while it is running. + * This methd should be overwritten by sublcasses of InputDStream. + */ + protected[streaming] def addMetadata(metadata: Any) { + if (metadata != null) { + logInfo("Dropping Metadata: " + metadata.toString) + } + } + + /** + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of + * this stream. This is an internal method that should not be called directly. This is + * a default implementation that saves only the file names of the checkpointed RDDs to + * checkpointData. Subclasses of DStream (especially those of InputDStream) may override + * this method to save custom checkpoint data. + */ + protected[streaming] def updateCheckpointData(currentTime: Time) { + logInfo("Updating checkpoint data for time " + currentTime) + checkpointData.update() + dependencies.foreach(_.updateCheckpointData(currentTime)) + checkpointData.cleanup() + logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) + } + + /** + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method + * that should not be called directly. This is a default implementation that recreates RDDs + * from the checkpoint file names stored in checkpointData. Subclasses of DStream that + * override the updateCheckpointData() method would also need to override this method. + */ + protected[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (graph != null) { + graph.synchronized { + if (graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[T]] () + } + + // ======================================================================= + // DStream operations + // ======================================================================= + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { + new MappedDStream(this, context.sparkContext.clean(mapFunc)) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { + new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) + } + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): DStream[Array[T]] = new GlommedDStream(this) + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U: ClassManifest]( + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean = false + ): DStream[U] = { + new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(reduceFunc: (T, T) => T): DStream[T] = + this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): DStream[Long] = { + this.map(_ => (null, 1L)) + .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) + .reduceByKey(_ + _) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + */ + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: RDD[T] => Unit) { + this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: (RDD[T], Time) => Unit) { + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + newStream + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transform((r: RDD[T], t: Time) => transformFunc(r)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.take(11) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(10).foreach(println) + if (first11.size > 10) println("...") + println() + } + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + } + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + */ + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + new WindowedDStream(this, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + invReduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.map(x => (1, x)) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(T, Long)] = { + + this.map(x => (x, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions, + (x: (T, Long)) => x._2 != 0L + ) + } + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same slideDuration as this DStream. + */ + def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) + + /** + * Return all the RDDs defined by the Interval object (both end times included) + */ + protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = { + slice(interval.beginTime, interval.endTime) + } + + /** + * Return all the RDDs between 'fromTime' to 'toTime' (both included) + */ + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) + } + + /** + * Save each RDD in this DStream as a Sequence file of serialized objects. + * The file name at each batch interval is generated based on `prefix` and + * `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsObjectFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsObjectFile(file) + } + this.foreach(saveFunc) + } + + /** + * Save each RDD in this DStream as at text file, using string representation + * of elements. The file name at each batch interval is generated based on + * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsTextFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(file) + } + this.foreach(saveFunc) + } + + def register() { + ssc.registerOutputStream(this) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala new file mode 100644 index 0000000000..58a0da2870 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.conf.Configuration +import collection.mutable.HashMap +import org.apache.spark.Logging + + + +private[streaming] +class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T]) + extends Serializable with Logging { + protected val data = new HashMap[Time, AnyRef]() + + @transient private var fileSystem : FileSystem = null + @transient private var lastCheckpointFiles: HashMap[Time, String] = null + + protected[streaming] def checkpointFiles = data.asInstanceOf[HashMap[Time, String]] + + /** + * Updates the checkpoint data of the DStream. This gets called every time + * the graph checkpoint is initiated. Default implementation records the + * checkpoint files to which the generate RDDs of the DStream has been saved. + */ + def update() { + + // Get the checkpointed RDDs from the generated RDDs + val newCheckpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + + // Make a copy of the existing checkpoint data (checkpointed RDDs) + lastCheckpointFiles = checkpointFiles.clone() + + // If the new checkpoint data has checkpoints then replace existing with the new one + if (newCheckpointFiles.size > 0) { + checkpointFiles.clear() + checkpointFiles ++= newCheckpointFiles + } + + // TODO: remove this, this is just for debugging + newCheckpointFiles.foreach { + case (time, data) => { logInfo("Added checkpointed RDD for time " + time + " to stream checkpoint") } + } + } + + /** + * Cleanup old checkpoint data. This gets called every time the graph + * checkpoint is initiated, but after `update` is called. Default + * implementation, cleans up old checkpoint files. + */ + def cleanup() { + // If there is at least on checkpoint file in the current checkpoint files, + // then delete the old checkpoint files. + if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { + (lastCheckpointFiles -- checkpointFiles.keySet).foreach { + case (time, file) => { + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(new Configuration()) + } + fileSystem.delete(path, true) + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + } + } + } + } + } + + /** + * Restore the checkpoint data. This gets called once when the DStream graph + * (along with its DStreams) are being restored from a graph checkpoint file. + * Default implementation restores the RDDs from their checkpoint files. + */ + def restore() { + // Create RDDs from the checkpoint data + checkpointFiles.foreach { + case(time, file) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) + } + } + } + + override def toString() = { + "[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]" + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala new file mode 100644 index 0000000000..b9a58fded6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import dstream.InputDStream +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} +import collection.mutable.ArrayBuffer +import org.apache.spark.Logging + +final private[streaming] class DStreamGraph extends Serializable with Logging { + initLogging() + + private val inputStreams = new ArrayBuffer[InputDStream[_]]() + private val outputStreams = new ArrayBuffer[DStream[_]]() + + var rememberDuration: Duration = null + var checkpointInProgress = false + + var zeroTime: Time = null + var startTime: Time = null + var batchDuration: Duration = null + + def start(time: Time) { + this.synchronized { + if (zeroTime != null) { + throw new Exception("DStream graph computation already started") + } + zeroTime = time + startTime = time + outputStreams.foreach(_.initialize(zeroTime)) + outputStreams.foreach(_.remember(rememberDuration)) + outputStreams.foreach(_.validate) + inputStreams.par.foreach(_.start()) + } + } + + def restart(time: Time) { + this.synchronized { startTime = time } + } + + def stop() { + this.synchronized { + inputStreams.par.foreach(_.stop()) + } + } + + def setContext(ssc: StreamingContext) { + this.synchronized { + outputStreams.foreach(_.setContext(ssc)) + } + } + + def setBatchDuration(duration: Duration) { + this.synchronized { + if (batchDuration != null) { + throw new Exception("Batch duration already set as " + batchDuration + + ". cannot set it again.") + } + batchDuration = duration + } + } + + def remember(duration: Duration) { + this.synchronized { + if (rememberDuration != null) { + throw new Exception("Batch duration already set as " + batchDuration + + ". cannot set it again.") + } + rememberDuration = duration + } + } + + def addInputStream(inputStream: InputDStream[_]) { + this.synchronized { + inputStream.setGraph(this) + inputStreams += inputStream + } + } + + def addOutputStream(outputStream: DStream[_]) { + this.synchronized { + outputStream.setGraph(this) + outputStreams += outputStream + } + } + + def getInputStreams() = this.synchronized { inputStreams.toArray } + + def getOutputStreams() = this.synchronized { outputStreams.toArray } + + def generateJobs(time: Time): Seq[Job] = { + this.synchronized { + logInfo("Generating jobs for time " + time) + val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + logInfo("Generated " + jobs.length + " jobs for time " + time) + jobs + } + } + + def clearOldMetadata(time: Time) { + this.synchronized { + logInfo("Clearing old metadata for time " + time) + outputStreams.foreach(_.clearOldMetadata(time)) + logInfo("Cleared old metadata for time " + time) + } + } + + def updateCheckpointData(time: Time) { + this.synchronized { + logInfo("Updating checkpoint data for time " + time) + outputStreams.foreach(_.updateCheckpointData(time)) + logInfo("Updated checkpoint data for time " + time) + } + } + + def restoreCheckpointData() { + this.synchronized { + logInfo("Restoring checkpoint data") + outputStreams.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") + } + } + + def validate() { + this.synchronized { + assert(batchDuration != null, "Batch duration has not been set") + //assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration + " is very low") + assert(getOutputStreams().size > 0, "No output streams registered, so nothing to execute") + } + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + this.synchronized { + logDebug("DStreamGraph.writeObject used") + checkpointInProgress = true + oos.defaultWriteObject() + checkpointInProgress = false + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + this.synchronized { + logDebug("DStreamGraph.readObject used") + checkpointInProgress = true + ois.defaultReadObject() + checkpointInProgress = false + } + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala new file mode 100644 index 0000000000..290ad37812 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.Utils + +case class Duration (private val millis: Long) { + + def < (that: Duration): Boolean = (this.millis < that.millis) + + def <= (that: Duration): Boolean = (this.millis <= that.millis) + + def > (that: Duration): Boolean = (this.millis > that.millis) + + def >= (that: Duration): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Duration = new Duration(millis + that.millis) + + def - (that: Duration): Duration = new Duration(millis - that.millis) + + def * (times: Int): Duration = new Duration(millis * times) + + def / (that: Duration): Double = millis.toDouble / that.millis.toDouble + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.millis == 0) + + def min(that: Duration): Duration = if (this < that) this else that + + def max(that: Duration): Duration = if (this > that) this else that + + def isZero: Boolean = (this.millis == 0) + + override def toString: String = (millis.toString + " ms") + + def toFormattedString: String = millis.toString + + def milliseconds: Long = millis + + def prettyPrint = Utils.msDurationToString(millis) + +} + +/** + * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing + * a given number of milliseconds. + */ +object Milliseconds { + def apply(milliseconds: Long) = new Duration(milliseconds) +} + +/** + * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing + * a given number of seconds. + */ +object Seconds { + def apply(seconds: Long) = new Duration(seconds * 1000) +} + +/** + * Helper object that creates instance of [[org.apache.spark.streaming.Duration]] representing + * a given number of minutes. + */ +object Minutes { + def apply(minutes: Long) = new Duration(minutes * 60000) +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala new file mode 100644 index 0000000000..04c994c136 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +private[streaming] +class Interval(val beginTime: Time, val endTime: Time) { + def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) + + def duration(): Duration = endTime - beginTime + + def + (time: Duration): Interval = { + new Interval(beginTime + time, endTime + time) + } + + def - (time: Duration): Interval = { + new Interval(beginTime - time, endTime - time) + } + + def < (that: Interval): Boolean = { + if (this.duration != that.duration) { + throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") + } + this.endTime < that.endTime + } + + def <= (that: Interval) = (this < that || this == that) + + def > (that: Interval) = !(this <= that) + + def >= (that: Interval) = !(this < that) + + override def toString = "[" + beginTime + ", " + endTime + "]" +} + +private[streaming] +object Interval { + def currentInterval(duration: Duration): Interval = { + val time = new Time(System.currentTimeMillis) + val intervalBegin = time.floor(duration) + new Interval(intervalBegin, intervalBegin + duration) + } +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/Job.scala new file mode 100644 index 0000000000..2128b7c7a6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Job.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.util.concurrent.atomic.AtomicLong + +private[streaming] +class Job(val time: Time, func: () => _) { + val id = Job.getNewId() + def run(): Long = { + val startTime = System.currentTimeMillis + func() + val stopTime = System.currentTimeMillis + (stopTime - startTime) + } + + override def toString = "streaming job " + id + " @ " + time +} + +private[streaming] +object Job { + val id = new AtomicLong(0) + + def getNewId() = id.getAndIncrement() +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala new file mode 100644 index 0000000000..5233129506 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/JobManager.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.Logging +import org.apache.spark.SparkEnv +import java.util.concurrent.Executors +import collection.mutable.HashMap +import collection.mutable.ArrayBuffer + + +private[streaming] +class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { + + class JobHandler(ssc: StreamingContext, job: Job) extends Runnable { + def run() { + SparkEnv.set(ssc.env) + try { + val timeTaken = job.run() + logInfo("Total delay: %.5f s for job %s of time %s (execution: %.5f s)".format( + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, job.time.milliseconds, timeTaken / 1000.0)) + } catch { + case e: Exception => + logError("Running " + job + " failed", e) + } + clearJob(job) + } + } + + initLogging() + + val jobExecutor = Executors.newFixedThreadPool(numThreads) + val jobs = new HashMap[Time, ArrayBuffer[Job]] + + def runJob(job: Job) { + jobs.synchronized { + jobs.getOrElseUpdate(job.time, new ArrayBuffer[Job]) += job + } + jobExecutor.execute(new JobHandler(ssc, job)) + logInfo("Added " + job + " to queue") + } + + def stop() { + jobExecutor.shutdown() + } + + private def clearJob(job: Job) { + var timeCleared = false + val time = job.time + jobs.synchronized { + val jobsOfTime = jobs.get(time) + if (jobsOfTime.isDefined) { + jobsOfTime.get -= job + if (jobsOfTime.get.isEmpty) { + jobs -= time + timeCleared = true + } + } else { + throw new Exception("Job finished for time " + job.time + + " but time does not exist in jobs") + } + } + if (timeCleared) { + ssc.scheduler.clearOldMetadata(time) + } + } + + def getPendingTimes(): Array[Time] = { + jobs.synchronized { + jobs.keySet.toArray + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala new file mode 100644 index 0000000000..aae79a4e6f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} +import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} +import org.apache.spark.Logging +import org.apache.spark.SparkEnv +import org.apache.spark.SparkContext._ + +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue + +import akka.actor._ +import akka.pattern.ask +import akka.util.duration._ +import akka.dispatch._ + +private[streaming] sealed trait NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage + +/** + * This class manages the execution of the receivers of NetworkInputDStreams. + */ +private[streaming] +class NetworkInputTracker( + @transient ssc: StreamingContext, + @transient networkInputStreams: Array[NetworkInputDStream[_]]) + extends Logging { + + val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) + val receiverExecutor = new ReceiverExecutor() + val receiverInfo = new HashMap[Int, ActorRef] + val receivedBlockIds = new HashMap[Int, Queue[String]] + val timeout = 5000.milliseconds + + var currentTime: Time = null + + /** Start the actor and receiver execution thread. */ + def start() { + ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + receiverExecutor.start() + } + + /** Stop the receiver execution thread. */ + def stop() { + // TODO: stop the actor as well + receiverExecutor.interrupt() + receiverExecutor.stopReceivers() + } + + /** Return all the blocks received from a receiver. */ + def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { + val queue = receivedBlockIds.synchronized { + receivedBlockIds.getOrElse(receiverId, new Queue[String]()) + } + val result = queue.synchronized { + queue.dequeueAll(x => true) + } + logInfo("Stream " + receiverId + " received " + result.size + " blocks") + result.toArray + } + + /** Actor to receive messages from the receivers. */ + private class NetworkInputTrackerActor extends Actor { + def receive = { + case RegisterReceiver(streamId, receiverActor) => { + if (!networkInputStreamMap.contains(streamId)) { + throw new Exception("Register received for unexpected id " + streamId) + } + receiverInfo += ((streamId, receiverActor)) + logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) + sender ! true + } + case AddBlocks(streamId, blockIds, metadata) => { + val tmp = receivedBlockIds.synchronized { + if (!receivedBlockIds.contains(streamId)) { + receivedBlockIds += ((streamId, new Queue[String])) + } + receivedBlockIds(streamId) + } + tmp.synchronized { + tmp ++= blockIds + } + networkInputStreamMap(streamId).addMetadata(metadata) + } + case DeregisterReceiver(streamId, msg) => { + receiverInfo -= streamId + logError("De-registered receiver for network stream " + streamId + + " with message " + msg) + //TODO: Do something about the corresponding NetworkInputDStream + } + } + } + + /** This thread class runs all the receivers on the cluster. */ + class ReceiverExecutor extends Thread { + val env = ssc.env + + override def run() { + try { + SparkEnv.set(env) + startReceivers() + } catch { + case ie: InterruptedException => logInfo("ReceiverExecutor interrupted") + } finally { + stopReceivers() + } + } + + /** + * Get the receivers from the NetworkInputDStreams, distributes them to the + * worker nodes as a parallel collection, and runs them. + */ + def startReceivers() { + val receivers = networkInputStreams.map(nis => { + val rcvr = nis.getReceiver() + rcvr.setStreamId(nis.id) + rcvr + }) + + // Right now, we only honor preferences if all receivers have them + val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + + // Create the parallel collection of receivers to distributed them on the worker nodes + val tempRDD = + if (hasLocationPreferences) { + val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) + ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) + } + else { + ssc.sc.makeRDD(receivers, receivers.size) + } + + // Function to start the receiver on the worker node + val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { + if (!iterator.hasNext) { + throw new Exception("Could not start receiver as details not found.") + } + iterator.next().start() + } + // Run the dummy Spark job to ensure that all slaves have registered. + // This avoids all the receivers to be scheduled on the same node. + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + + // Distribute the receivers and start them + ssc.sparkContext.runJob(tempRDD, startReceiver) + } + + /** Stops the receivers. */ + def stopReceivers() { + // Signal the receivers to stop + receiverInfo.values.foreach(_ ! StopReceiver) + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala new file mode 100644 index 0000000000..d8a7381e87 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream} +import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream} +import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream} + +import org.apache.spark.{Manifests, RDD, Partitioner, HashPartitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.conf.Configuration + +class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)]) +extends Serializable { + + private[streaming] def ssc = self.ssc + + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + new HashPartitioner(numPartitions) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + val createCombiner = (v: V) => ArrayBuffer[V](v) + val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) + val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) + combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C: ClassManifest]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner) : DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream; if not specified + * then Spark's default number of partitions will be used + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, Seq[V])] = { + self.window(windowDuration, slideDuration).groupByKey(partitioner) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD + * in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowDuration, slideDuration) + .reduceByKey(cleanedReduceFunc, partitioner) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration = self.slideDuration, + numPartitions: Int = ssc.sc.defaultParallelism, + filterFunc: ((K, V)) => Boolean = null + ): DStream[(K, V)] = { + + reduceByKeyAndWindow( + reduceFunc, invReduceFunc, windowDuration, + slideDuration, defaultPartitioner(numPartitions), filterFunc + ) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner, + filterFunc: ((K, V)) => Boolean + ): DStream[(K, V)] = { + + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) + val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None + new ReducedWindowedDStream[K, V]( + self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, + windowDuration, slideDuration, partitioner + ) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S] + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner()) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S], + numPartitions: Int + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Seq[V], Option[S]) => Option[S], + partitioner: Partitioner + ): DStream[(K, S)] = { + val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { + iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) + } + updateStateByKey(newUpdateFunc, partitioner, true) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[org.apache.spark.Paxrtitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. Note, that + * this function may generate a different a tuple with a different key + * than the input key. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + rememberPartitioner: Boolean + ): DStream[(K, S)] = { + new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) + } + + + def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = { + new MapValuedDStream[K, V, U](self, mapValuesFunc) + } + + def flatMapValues[U: ClassManifest]( + flatMapValuesFunc: V => TraversableOnce[U] + ): DStream[(K, U)] = { + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. + */ + def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner()) + } + + /** + * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. + */ + def cogroup[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Seq[V], Seq[W]))] = { + + val cgd = new CoGroupedDStream[K]( + Seq(self.asInstanceOf[DStream[(K, _)]], other.asInstanceOf[DStream[(K, _)]]), + partitioner + ) + val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)( + classManifest[K], + Manifests.seqSeqManifest + ) + pdfs.mapValues { + case Seq(vs, ws) => + (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + } + } + + /** + * Join `this` DStream with `other` DStream. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ + def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner()) + } + + /** + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, W))] = { + this.cogroup(other, partitioner) + .flatMapValues{ + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassManifest[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + private def getKeyClass() = implicitly[ClassManifest[K]].erasure + + private def getValueClass() = implicitly[ClassManifest[V]].erasure +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala new file mode 100644 index 0000000000..ed892e33e6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import util.{ManualClock, RecurringTimer, Clock} +import org.apache.spark.SparkEnv +import org.apache.spark.Logging + +private[streaming] +class Scheduler(ssc: StreamingContext) extends Logging { + + initLogging() + + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt + val jobManager = new JobManager(ssc, concurrentJobs) + val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(ssc.checkpointDir) + } else { + null + } + + val clockClass = System.getProperty( + "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") + val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => generateJobs(new Time(longTime))) + val graph = ssc.graph + var latestTime: Time = null + + def start() = synchronized { + if (ssc.isCheckpointPresent) { + restart() + } else { + startFirstTime() + } + logInfo("Scheduler started") + } + + def stop() = synchronized { + timer.stop() + jobManager.stop() + if (checkpointWriter != null) checkpointWriter.stop() + ssc.graph.stop() + logInfo("Scheduler stopped") + } + + private def startFirstTime() { + val startTime = new Time(timer.getStartTime()) + graph.start(startTime - graph.batchDuration) + timer.start(startTime.milliseconds) + logInfo("Scheduler's timer started at " + startTime) + } + + private def restart() { + + // If manual clock is being used for testing, then + // either set the manual clock to the last checkpointed time, + // or if the property is defined set it to that time + if (clock.isInstanceOf[ManualClock]) { + val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds + val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) + } + + val batchDuration = ssc.graph.batchDuration + + // Batches when the master was down, that is, + // between the checkpoint and current restart time + val checkpointTime = ssc.initialCheckpoint.checkpointTime + val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) + val downTimes = checkpointTime.until(restartTime, batchDuration) + logInfo("Batches during down time: " + downTimes.mkString(", ")) + + // Batches that were unprocessed before failure + val pendingTimes = ssc.initialCheckpoint.pendingTimes + logInfo("Batches pending processing: " + pendingTimes.mkString(", ")) + // Reschedule jobs for these times + val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + logInfo("Batches to reschedule: " + timesToReschedule.mkString(", ")) + timesToReschedule.foreach(time => + graph.generateJobs(time).foreach(jobManager.runJob) + ) + + // Restart the timer + timer.start(restartTime.milliseconds) + logInfo("Scheduler's timer restarted at " + restartTime) + } + + /** Generate jobs and perform checkpoint for the given `time`. */ + def generateJobs(time: Time) { + SparkEnv.set(ssc.env) + logInfo("\n-----------------------------------------------------\n") + graph.generateJobs(time).foreach(jobManager.runJob) + latestTime = time + doCheckpoint(time) + } + + /** + * Clear old metadata assuming jobs of `time` have finished processing. + * And also perform checkpoint. + */ + def clearOldMetadata(time: Time) { + ssc.graph.clearOldMetadata(time) + doCheckpoint(time) + } + + /** Perform checkpoint for the give `time`. */ + def doCheckpoint(time: Time) = synchronized { + if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { + logInfo("Checkpointing graph for time " + time) + ssc.graph.updateCheckpointData(time) + checkpointWriter.write(new Checkpoint(ssc, time)) + } + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala new file mode 100644 index 0000000000..3852ac2dab --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe + +import org.apache.spark.streaming.dstream._ + +import org.apache.spark._ +import org.apache.spark.streaming.receivers.ActorReceiver +import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy +import org.apache.spark.streaming.receivers.ZeroMQReceiver +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.receivers.ActorReceiver + +import scala.collection.mutable.Queue +import scala.collection.Map + +import java.io.InputStream +import java.util.concurrent.atomic.AtomicInteger +import java.util.UUID + +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.fs.Path +import twitter4j.Status +import twitter4j.auth.Authorization + + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( + sc_ : SparkContext, + cp_ : Checkpoint, + batchDur_ : Duration + ) extends Logging { + + /** + * Create a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Duration) = { + this(sparkContext, null, batchDuration) + } + + /** + * Create a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String = null, + jars: Seq[String] = Nil, + environment: Map[String, String] = Map()) = { + this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment), + null, batchDuration) + } + + + /** + * Re-create a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) + + initLogging() + + if (sc_ == null && cp_ == null) { + throw new Exception("Spark Streaming cannot be initialized with " + + "both SparkContext and checkpoint as null") + } + + if (MetadataCleaner.getDelaySeconds < 0) { + throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") + } + + protected[streaming] val isCheckpointPresent = (cp_ != null) + + protected[streaming] val sc: SparkContext = { + if (isCheckpointPresent) { + new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment) + } else { + sc_ + } + } + + protected[streaming] val env = SparkEnv.get + + protected[streaming] val graph: DStreamGraph = { + if (isCheckpointPresent) { + cp_.graph.setContext(this) + cp_.graph.restoreCheckpointData() + cp_.graph + } else { + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph + } + } + + protected[streaming] val nextNetworkInputStreamId = new AtomicInteger(0) + protected[streaming] var networkInputTracker: NetworkInputTracker = null + + protected[streaming] var checkpointDir: String = { + if (isCheckpointPresent) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) + cp_.checkpointDir + } else { + null + } + } + + protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null + protected[streaming] var receiverJobThread: Thread = null + protected[streaming] var scheduler: Scheduler = null + + /** + * Return the associated Spark context + */ + def sparkContext = sc + + /** + * Set each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Duration) { + graph.remember(duration) + } + + /** + * Set the context to periodically checkpoint the DStream operations for master + * fault-tolerance. The graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + */ + def checkpoint(directory: String) { + if (directory != null) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) + checkpointDir = directory + } else { + checkpointDir = null + } + } + + protected[streaming] def initialCheckpoint: Checkpoint = { + if (isCheckpointPresent) cp_ else null + } + + protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + + /** + * Create an input stream with any arbitrary user implemented network receiver. + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html + * @param receiver Custom implementation of NetworkReceiver + */ + def networkStream[T: ClassManifest]( + receiver: NetworkReceiver[T]): DStream[T] = { + val inputStream = new PluggableInputDStream[T](this, + receiver) + graph.addInputStream(inputStream) + inputStream + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level. Defaults to memory-only. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T: ClassManifest]( + props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassManifest]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + ): DStream[String] = { + val kafkaParams = Map[String, String]( + "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") + kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): DStream[T] = { + val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream from TCP source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def socketTextStream( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[String] = { + socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) + } + + /** + * Create a input stream from TCP source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def socketStream[T: ClassManifest]( + hostname: String, + port: Int, + converter: (InputStream) => Iterator[T], + storageLevel: StorageLevel + ): DStream[T] = { + val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawSocketStream[T: ClassManifest]( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[T] = { + val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): DStream[String] = { + fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Option[Authorization] = None, + filters: Seq[String] = Nil, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[Status] = { + val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) + registerInputStream(inputStream) + inputStream + } + + /** + * Create an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean = true + ): DStream[T] = { + queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + } + + /** + * Create an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. + * Set as null if no RDD should be returned when empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] + ): DStream[T] = { + val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) + registerInputStream(inputStream) + inputStream + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same interval + */ + def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + new UnionDStream[T](streams.toArray) + } + + /** + * Register an input stream that will be started (InputDStream.start() called) to get the + * input data. + */ + def registerInputStream(inputStream: InputDStream[_]) { + graph.addInputStream(inputStream) + } + + /** + * Register an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: DStream[_]) { + graph.addOutputStream(outputStream) + } + + protected def validate() { + assert(graph != null, "Graph is null") + graph.validate() + + assert( + checkpointDir == null || checkpointDuration != null, + "Checkpoint directory has been set, but the graph checkpointing interval has " + + "not been set. Please use StreamingContext.checkpoint() to set the interval." + ) + } + + /** + * Start the execution of the streams. + */ + def start() { + if (checkpointDir != null && checkpointDuration == null && graph != null) { + checkpointDuration = graph.batchDuration + } + + validate() + + val networkInputStreams = graph.getInputStreams().filter(s => s match { + case n: NetworkInputDStream[_] => true + case _ => false + }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray + + if (networkInputStreams.length > 0) { + // Start the network input tracker (must start before receivers) + networkInputTracker = new NetworkInputTracker(this, networkInputStreams) + networkInputTracker.start() + } + + Thread.sleep(1000) + + // Start the scheduler + scheduler = new Scheduler(this) + scheduler.start() + } + + /** + * Stop the execution of the streams. + */ + def stop() { + try { + if (scheduler != null) scheduler.stop() + if (networkInputTracker != null) networkInputTracker.stop() + if (receiverJobThread != null) receiverJobThread.interrupt() + sc.stop() + logInfo("StreamingContext stopped successfully") + } catch { + case e: Exception => logWarning("Error while stopping", e) + } + } +} + + +object StreamingContext { + + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + new PairDStreamFunctions[K, V](stream) + } + + protected[streaming] def createNewSparkContext( + master: String, + appName: String, + sparkHome: String, + jars: Seq[String], + environment: Map[String, String]): SparkContext = { + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second interval. + if (MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(3600) + } + new SparkContext(master, appName, sparkHome, jars, environment) + } + + protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { + if (prefix == null) { + time.milliseconds.toString + } else if (suffix == null || suffix.length ==0) { + prefix + "-" + time.milliseconds + } else { + prefix + "-" + time.milliseconds + "." + suffix + } + } + + protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { + new Path(sscCheckpointDir, UUID.randomUUID.toString).toString + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala new file mode 100644 index 0000000000..2678334f53 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +/** + * This is a simple class that represents an absolute instant of time. + * Internally, it represents time as the difference, measured in milliseconds, between the current + * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by + * System.currentTimeMillis. + */ +case class Time(private val millis: Long) { + + def milliseconds: Long = millis + + def < (that: Time): Boolean = (this.millis < that.millis) + + def <= (that: Time): Boolean = (this.millis <= that.millis) + + def > (that: Time): Boolean = (this.millis > that.millis) + + def >= (that: Time): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Time = new Time(millis + that.milliseconds) + + def - (that: Time): Duration = new Duration(millis - that.millis) + + def - (that: Duration): Time = new Time(millis - that.milliseconds) + + def floor(that: Duration): Time = { + val t = that.milliseconds + val m = math.floor(this.millis / t).toLong + new Time(m * t) + } + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.milliseconds == 0) + + def min(that: Time): Time = if (this < that) this else that + + def max(that: Time): Time = if (this > that) this else that + + def until(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + def to(that: Time, interval: Duration): Seq[Time] = { + (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) + } + + + override def toString: String = (millis.toString + " ms") + +} + +object Time { + val ordering = Ordering.by((time: Time) => time.millis) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala new file mode 100644 index 0000000000..f8c8d8ece1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.RDD + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) + extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] { + + override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = + dstream.filter((x => f(x).booleanValue())) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaDStream[T] = dstream.cache() + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaDStream[T] = dstream.persist() + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) + + /** Generate an RDD for the given duration */ + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + */ + def window(windowDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration) + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration, slideDuration) + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaDStream[T]): JavaDStream[T] = + dstream.union(that.dstream) +} + +object JavaDStream { + implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + new JavaDStream[T](dstream) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala new file mode 100644 index 0000000000..2e6fe9a9c4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import org.apache.spark.streaming._ +import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD} +import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import org.apache.spark.RDD +import JavaDStream._ + +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] + extends Serializable { + implicit val classManifest: ClassManifest[T] + + def dstream: DStream[T] + + def wrapRDD(in: RDD[T]): R + + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { + in.map(new JLong(_)) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() = dstream.print() + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): JavaDStream[JLong] = dstream.count() + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + */ + def countByValue(): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue()) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions)) + } + + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { + dstream.countByWindow(windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration)) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + : JavaPairDStream[T, JLong] = { + JavaPairDStream.scalaToJavaLong( + dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions)) + } + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): JavaDStream[JList[T]] = + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + + /** Return the StreamingContext associated with this DStream */ + def context(): StreamingContext = dstream.context() + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] + new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) + : JavaPairDStream[K2, V2] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) + } + + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowDuration: Duration, + slideDuration: Duration + ): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Return all the RDDs between 'fromDuration' to 'toDuration' (both included) + */ + def slice(fromTime: Time, toTime: Time): JList[R] = { + new util.ArrayList(dstream.slice(fromTime, toTime).map(wrapRDD(_)).toSeq) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction[R, Void]) { + dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = + transformFunc.call(wrapRDD(in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T], time: Time): RDD[U] = + transformFunc.call(wrapRDD(in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T]): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] = + transformFunc.call(wrapRDD(in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration) = { + dstream.checkpoint(interval) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala new file mode 100644 index 0000000000..c203dccd17 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.{RDD, Partitioner} +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD} +import org.apache.spark.storage.StorageLevel +import com.google.common.base.Optional +import org.apache.spark.RDD + +class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( + implicit val kManifiest: ClassManifest[K], + implicit val vManifest: ClassManifest[V]) + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { + + override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) + + // ======================================================================= + // Methods common to all DStream's + // ======================================================================= + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = + dstream.filter((x => f(x).booleanValue())) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaPairDStream[K, V] = dstream.cache() + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaPairDStream[K, V] = dstream.persist() + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + + /** Method that generates a RDD for the given Duration */ + def compute(validTime: Time): JavaPairRDD[K, V] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaPairRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + * @return + */ + def window(windowDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration, slideDuration) + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = + dstream.union(that.dstream) + + // ======================================================================= + // Methods only for PairDStream's + // ======================================================================= + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): JavaPairDStream[K, JList[V]] = + dstream.groupByKey().mapValues(seqAsJavaList _) + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + + /** + * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = + dstream.reduceByKey(func) + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = + dstream.reduceByKey(func, numPartitions) + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C](createCombiner: JFunction[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner + ): JavaPairDStream[K, C] = { + implicit val cm: ClassManifest[C] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + :JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) + .mapValues(seqAsJavaList _) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ):JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) + .mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) + :JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ):JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) + } + + /** + * Return a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + * @param filterFunc function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + * set this to null if you do not want to filter + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int, + filterFunc: JFunction[(K, V), java.lang.Boolean] + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + numPartitions, + (p: (K, V)) => filterFunc(p).booleanValue() + ) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + * set this to null if you do not want to filter + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner, + filterFunc: JFunction[(K, V), java.lang.Boolean] + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + partitioner, + (p: (K, V)) => filterFunc(p).booleanValue() + ) + } + + private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]): + (Seq[V], Option[S]) => Option[S] = { + val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { + val list: JList[V] = values + val scalaState: Optional[S] = JavaUtils.optionToOptional(state) + val result: Optional[S] = in.apply(list, scalaState) + result.isPresent match { + case true => Some(result.get()) + case _ => None + } + } + scalaFunc + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) + : JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + numPartitions: Int) + : JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + partitioner: Partitioner + ): JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) + } + + def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.mapValues(f) + } + + def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { + import scala.collection.JavaConverters._ + def fn = (x: V) => f.apply(x).asScala + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.flatMapValues(fn) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. + */ + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. + */ + def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, partitioner) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Join `this` DStream with `other` DStream. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ + def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream) + } + + /** + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, partitioner) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + override val classManifest: ClassManifest[(K, V)] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] +} + +object JavaPairDStream { + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) + :JavaPairDStream[K, V] = + new JavaPairDStream[K, V](dstream) + + def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairDStream[K, V](dstream.dstream) + } + + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + : JavaPairDStream[K, JLong] = { + StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala new file mode 100644 index 0000000000..f10beb1db3 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -0,0 +1,614 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import org.apache.spark.streaming._ +import receivers.{ActorReceiver, ReceiverSupervisorStrategy} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.{JavaSparkContext, JavaRDD} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import twitter4j.Status +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe +import scala.collection.JavaConversions._ +import java.lang.{Long => JLong, Integer => JInt} +import java.io.InputStream +import java.util.{Map => JMap} +import twitter4j.auth.Authorization +import org.apache.spark.RDD + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class JavaStreamingContext(val ssc: StreamingContext) { + + // TODOs: + // - Test to/from Hadoop functions + // - Support creating and registering InputStreams + + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, appName: String, batchDuration: Duration) = + this(new StreamingContext(master, appName, batchDuration, null, Nil, Map())) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local + * file system or an HDFS, HTTP, HTTPS, or FTP URL. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jarFile: String) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map())) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map())) + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param appName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + */ + def this( + master: String, + appName: String, + batchDuration: Duration, + sparkHome: String, + jars: Array[String], + environment: JMap[String, String]) = + this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment)) + + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext The underlying JavaSparkContext to use + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: JavaSparkContext, batchDuration: Duration) = + this(new StreamingContext(sparkContext.sc, batchDuration)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this (new StreamingContext(path)) + + /** The underlying SparkContext */ + val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt]) + : JavaDStream[String] = { + implicit val cmt: ClassManifest[String] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + StorageLevel.MEMORY_ONLY_SER_2) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + * + */ + def kafkaStream( + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel) + : JavaDStream[String] = { + implicit val cmt: ClassManifest[String] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param typeClass Type of RDD + * @param decoderClass Type of kafka decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( + typeClass: Class[T], + decoderClass: Class[D], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] + ssc.kafkaStream[T, D]( + kafkaParams.toMap, + Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + : JavaDStream[String] = { + ssc.socketTextStream(hostname, port, storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + */ + def socketTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.socketTextStream(hostname, port) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def socketStream[T]( + hostname: String, + port: Int, + converter: JFunction[InputStream, java.lang.Iterable[T]], + storageLevel: StorageLevel) + : JavaDStream[T] = { + def fn = (x: InputStream) => converter.apply(x).toIterator + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.socketStream(hostname, port, fn, storageLevel) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawSocketStream[T]( + hostname: String, + port: Int, + storageLevel: StorageLevel): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @tparam T Type of the objects in the received blocks + */ + def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmf: ClassManifest[F] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + ssc.fileStream[K, V, F](directory); + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } + + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param twitterAuth Twitter4J Authorization + */ + def twitterStream( + twitterAuth: Authorization + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream() + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } + + /** + * Registers an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { + ssc.registerOutputStream(outputStream.dstream) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @tparam T Type of objects in the RDD + */ + def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime) + } + + /** + * Creates a input stream from an queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * + * NOTE: changes to the queue after the stream is created will not be recognized. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T]( + queue: java.util.Queue[JavaRDD[T]], + oneAtATime: Boolean, + defaultRDD: JavaRDD[T]): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val sQueue = new scala.collection.mutable.Queue[RDD[T]] + sQueue.enqueue(queue.map(_.rdd).toSeq: _*) + ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd) + } + + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. The graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + */ + def checkpoint(directory: String) { + ssc.checkpoint(directory) + } + + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of duration and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Duration) { + ssc.remember(duration) + } + + /** + * Starts the execution of the streams. + */ + def start() = ssc.start() + + /** + * Sstops the execution of the streams. + */ + def stop() = ssc.stop() + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala new file mode 100644 index 0000000000..4a9d82211f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.{RDD, Partitioner} +import org.apache.spark.rdd.CoGroupedRDD +import org.apache.spark.streaming.{Time, DStream, Duration} + +private[streaming] +class CoGroupedDStream[K : ClassManifest]( + parents: Seq[DStream[(K, _)]], + partitioner: Partitioner + ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideDuration).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { + val part = partitioner + val rdds = parents.flatMap(_.getOrCompute(validTime)) + if (rdds.size > 0) { + val q = new CoGroupedRDD[K](rdds, part) + Some(q) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala new file mode 100644 index 0000000000..35cc4cb396 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.streaming.{Time, StreamingContext} + +/** + * An input stream that always returns the same RDD on each timestep. Useful for testing. + */ +class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T]) + extends InputDStream[T](ssc_) { + + override def start() {} + + override def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + Some(rdd) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala new file mode 100644 index 0000000000..1c265ed972 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} + +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import scala.collection.mutable.{HashSet, HashMap} +import java.io.{ObjectInputStream, IOException} + +private[streaming] +class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( + @transient ssc_ : StreamingContext, + directory: String, + filter: Path => Boolean = FileInputDStream.defaultFilter, + newFilesOnly: Boolean = true) + extends InputDStream[(K, V)](ssc_) { + + protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData + + // Latest file mod time seen till any point of time + private val lastModTimeFiles = new HashSet[String]() + private var lastModTime = 0L + + @transient private var path_ : Path = null + @transient private var fs_ : FileSystem = null + @transient private[streaming] var files = new HashMap[Time, Array[String]] + + override def start() { + if (newFilesOnly) { + lastModTime = graph.zeroTime.milliseconds + } else { + lastModTime = 0 + } + logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) + } + + override def stop() { } + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. And new files may have the same modification time as the + * latest modification time in the previous call to this method yet was not reported in + * the previous call. + */ + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + + // Create the filter for selecting new files + val newFilter = new PathFilter() { + // Latest file mod time seen in this round of fetching files and its corresponding files + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) + return false + } else { // Accept file only if + val modTime = fs.getFileStatus(path).getModificationTime() + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < lastModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > validTime.milliseconds) { + logDebug("Mod time more than valid time") + return false // If the file was created after the time this function call requires + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) + } + latestModTimeFiles += path.toString + logDebug("Accepted " + path) + return true + } + } + } + logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) + val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) + if (newFiles.length > 0) { + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles + logDebug("Last mod time updated to " + lastModTime) + } + files += ((validTime, newFiles)) + Some(filesToRDD(newFiles)) + } + + /** Clear the old time-to-files mappings along with old RDDs */ + protected[streaming] override def clearOldMetadata(time: Time) { + super.clearOldMetadata(time) + val oldFiles = files.filter(_._1 <= (time - rememberDuration)) + files --= oldFiles.keys + logInfo("Cleared " + oldFiles.size + " old files that were older than " + + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) + logDebug("Cleared files are:\n" + + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) + } + + /** Generate one RDD from an array of files */ + protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + new UnionRDD( + context.sparkContext, + files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) + ) + } + + private def path: Path = { + if (path_ == null) path_ = new Path(directory) + path_ + } + + private def fs: FileSystem = { + if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) + fs_ + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[(K,V)]] () + files = new HashMap[Time, Array[String]] + } + + /** + * A custom version of the DStreamCheckpointData that stores names of + * Hadoop files as checkpoint data. + */ + private[streaming] + class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) { + + def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]] + + override def update() { + hadoopFiles.clear() + hadoopFiles ++= files + } + + override def cleanup() { } + + override def restore() { + hadoopFiles.foreach { + case (t, f) => { + // Restore the metadata in both files and generatedRDDs + logInfo("Restoring files for time " + t + " - " + + f.mkString("[", ", ", "]") ) + files += ((t, f)) + generatedRDDs += ((t, filesToRDD(f))) + } + } + } + + override def toString() = { + "[\n" + hadoopFiles.size + " file sets\n" + + hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" + } + } +} + +private[streaming] +object FileInputDStream { + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala new file mode 100644 index 0000000000..3166c68760 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD + +private[streaming] +class FilteredDStream[T: ClassManifest]( + parent: DStream[T], + filterFunc: T => Boolean + ) extends DStream[T](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + parent.getOrCompute(validTime).map(_.filter(filterFunc)) + } +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala new file mode 100644 index 0000000000..21950ad6ac --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD +import org.apache.spark.SparkContext._ + +private[streaming] +class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + flatMapValueFunc: V => TraversableOnce[U] + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala new file mode 100644 index 0000000000..8377cfe60c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD + +private[streaming] +class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + flatMapFunc: T => Traversable[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala new file mode 100644 index 0000000000..3fb443143c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext + +import org.apache.spark.Utils +import org.apache.spark.storage.StorageLevel + +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer + +import scala.collection.JavaConversions._ + +import java.net.InetSocketAddress +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import java.nio.ByteBuffer + +private[streaming] +class FlumeInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent] { + + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)); + val server = new NettyServer(responder, new InetSocketAddress(host, port)); + blockGenerator.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationPreference = Some(host) +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala new file mode 100644 index 0000000000..c1f95650c8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.streaming.{Duration, DStream, Job, Time} + +private[streaming] +class ForEachDStream[T: ClassManifest] ( + parent: DStream[T], + foreachFunc: (RDD[T], Time) => Unit + ) extends DStream[Unit](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + foreachFunc(rdd, time) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala new file mode 100644 index 0000000000..1e4c7e7fde --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD + +private[streaming] +class GlommedDStream[T: ClassManifest](parent: DStream[T]) + extends DStream[Array[T]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[Array[T]]] = { + parent.getOrCompute(validTime).map(_.glom()) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala new file mode 100644 index 0000000000..674b27118c --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} + +/** + * This is the abstract base class for all input streams. This class provides to methods + * start() and stop() which called by the scheduler to start and stop receiving data/ + * Input streams that can generated RDDs from new data just by running a service on + * the driver node (that is, without running a receiver onworker nodes) can be + * implemented by directly subclassing this InputDStream. For example, + * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory for + * new files and generates RDDs on the new files. For implementing input streams + * that requires running a receiver on the worker nodes, use NetworkInputDStream + * as the parent class. + */ +abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) + extends DStream[T](ssc_) { + + var lastValidTime: Time = null + + /** + * Checks whether the 'time' is valid wrt slideDuration for generating RDD. + * Additionally it also ensures valid times are in strictly increasing order. + * This ensures that InputDStream.compute() is called strictly on increasing + * times. + */ + override protected def isTimeValid(time: Time): Boolean = { + if (!super.isTimeValid(time)) { + false // Time not valid + } else { + // Time is valid, but check it it is more than lastValidTime + if (lastValidTime != null && time < lastValidTime) { + logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + } + lastValidTime = time + true + } + } + + override def dependencies = List() + + override def slideDuration: Duration = { + if (ssc == null) throw new Exception("ssc is null") + if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") + ssc.graph.batchDuration + } + + /** Method called to start receiving data. Subclasses must implement this method. */ + def start() + + /** Method called to stop receiving data. Subclasses must implement this method. */ + def stop() +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala new file mode 100644 index 0000000000..51e913675d --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext} + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.message.{Message, MessageSet, MessageAndMetadata} +import kafka.serializer.Decoder +import kafka.utils.{Utils, ZKGroupTopicDirs} +import kafka.utils.ZkUtils._ +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ + +import scala.collection.Map +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + + +/** + * Input stream that pulls messages from a Kafka Broker. + * + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel RDD storage level. + */ +private[streaming] +class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest]( + @transient ssc_ : StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + + def getReceiver(): NetworkReceiver[T] = { + new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + + // Handles pushing data into the BlockManager + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + // Connection to Kafka + var consumerConnector : ConsumerConnector = null + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // In case we are using multiple Threads to handle Kafka Messages + val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + // Create the connection to the cluster + logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zk.connect")) + + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("autooffset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + } + + // Create Threads for each Topic/Message Stream we are listening + val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] + val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) + + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } + + // Handles Kafka Messages + private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + for (msgAndMetadata <- stream) { + blockGenerator += msgAndMetadata.message + } + } + } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala new file mode 100644 index 0000000000..1d79d707bb --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD + +private[streaming] +class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( + parent: DStream[T], + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala new file mode 100644 index 0000000000..312e0c0567 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD +import org.apache.spark.SparkContext._ + +private[streaming] +class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( + parent: DStream[(K, V)], + mapValueFunc: V => U + ) extends DStream[(K, U)](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K, U)]] = { + parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala new file mode 100644 index 0000000000..af688dde5f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD + +private[streaming] +class MappedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + mapFunc: T => U + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala new file mode 100644 index 0000000000..3d68da36a2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Time, StreamingContext, AddBlocks, RegisterReceiver, DeregisterReceiver} + +import org.apache.spark.{Logging, SparkEnv, RDD} +import org.apache.spark.rdd.BlockRDD +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +import java.nio.ByteBuffer + +import akka.actor.{Props, Actor} +import akka.pattern.ask +import akka.dispatch.Await +import akka.util.duration._ +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} +import java.util.concurrent.ArrayBlockingQueue + +/** + * Abstract class for defining any InputDStream that has to start a receiver on worker + * nodes to receive external data. Specific implementations of NetworkInputDStream must + * define the getReceiver() function that gets the receiver object of type + * [[org.apache.spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive + * data. + * @param ssc_ Streaming context that will execute this input stream + * @tparam T Class type of the object of this stream + */ +abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) + extends InputDStream[T](ssc_) { + + // This is an unique identifier that is used to match the network receiver with the + // corresponding network input stream. + val id = ssc.getNewNetworkStreamId() + + /** + * Gets the receiver object that will be sent to the worker nodes + * to receive data. This method needs to defined by any specific implementation + * of a NetworkInputDStream. + */ + def getReceiver(): NetworkReceiver[T] + + // Nothing to start or stop as both taken care of by the NetworkInputTracker. + def start() {} + + def stop() {} + + override def compute(validTime: Time): Option[RDD[T]] = { + // If this is called for any time before the start time of the context, + // then this returns an empty RDD. This may happen when recovering from a + // master failure + if (validTime >= graph.startTime) { + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + Some(new BlockRDD[T](ssc.sc, blockIds)) + } else { + Some(new BlockRDD[T](ssc.sc, Array[String]())) + } + } +} + + +private[streaming] sealed trait NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage + +/** + * Abstract class of a receiver that can be run on worker nodes to receive external data. See + * [[org.apache.spark.streaming.dstream.NetworkInputDStream]] for an explanation. + */ +abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { + + initLogging() + + lazy protected val env = SparkEnv.get + + lazy protected val actor = env.actorSystem.actorOf( + Props(new NetworkReceiverActor()), "NetworkReceiver-" + streamId) + + lazy protected val receivingThread = Thread.currentThread() + + protected var streamId: Int = -1 + + /** + * This method will be called to start receiving data. All your receiver + * starting code should be implemented by defining this function. + */ + protected def onStart() + + /** This method will be called to stop receiving data. */ + protected def onStop() + + /** Conveys a placement preference (hostname) for this receiver. */ + def getLocationPreference() : Option[String] = None + + /** + * Starts the receiver. First is accesses all the lazy members to + * materialize them. Then it calls the user-defined onStart() method to start + * other threads, etc required to receiver the data. + */ + def start() { + try { + // Access the lazy vals to materialize them + env + actor + receivingThread + + // Call user-defined onStart() + onStart() + } catch { + case ie: InterruptedException => + logInfo("Receiving thread interrupted") + //println("Receiving thread interrupted") + case e: Exception => + stopOnError(e) + } + } + + /** + * Stops the receiver. First it interrupts the main receiving thread, + * that is, the thread that called receiver.start(). Then it calls the user-defined + * onStop() method to stop other threads and/or do cleanup. + */ + def stop() { + receivingThread.interrupt() + onStop() + //TODO: terminate the actor + } + + /** + * Stops the receiver and reports exception to the tracker. + * This should be called whenever an exception is to be handled on any thread + * of the receiver. + */ + protected def stopOnError(e: Exception) { + logError("Error receiving data", e) + stop() + actor ! ReportError(e.toString) + } + + + /** + * Pushes a block (as an ArrayBuffer filled with data) into the block manager. + */ + def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) + actor ! ReportBlock(blockId, metadata) + } + + /** + * Pushes a block (as bytes) into the block manager. + */ + def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + env.blockManager.putBytes(blockId, bytes, level) + actor ! ReportBlock(blockId, metadata) + } + + /** A helper actor that communicates with the NetworkInputTracker */ + private class NetworkReceiverActor extends Actor { + logInfo("Attempting to register with tracker") + val ip = System.getProperty("spark.driver.host", "localhost") + val port = System.getProperty("spark.driver.port", "7077").toInt + val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val tracker = env.actorSystem.actorFor(url) + val timeout = 5.seconds + + override def preStart() { + val future = tracker.ask(RegisterReceiver(streamId, self))(timeout) + Await.result(future, timeout) + } + + override def receive() = { + case ReportBlock(blockId, metadata) => + tracker ! AddBlocks(streamId, Array(blockId), metadata) + case ReportError(msg) => + tracker ! DeregisterReceiver(streamId, msg) + case StopReceiver(msg) => + stop() + tracker ! DeregisterReceiver(streamId, msg) + } + } + + protected[streaming] def setStreamId(id: Int) { + streamId = id + } + + /** + * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into + * appropriately named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ + class BlockGenerator(storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + private def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) + val newBlock = new Block(blockId, newBlockBuffer) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + + private def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala new file mode 100644 index 0000000000..15782f5c11 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext + +private[streaming] +class PluggableInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + receiver + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala new file mode 100644 index 0000000000..b43ecaeebe --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.rdd.UnionRDD + +import scala.collection.mutable.Queue +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.streaming.{Time, StreamingContext} + +private[streaming] +class QueueInputDStream[T: ClassManifest]( + @transient ssc: StreamingContext, + val queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] + ) extends InputDStream[T](ssc) { + + override def start() { } + + override def stop() { } + + override def compute(validTime: Time): Option[RDD[T]] = { + val buffer = new ArrayBuffer[RDD[T]]() + if (oneAtATime && queue.size > 0) { + buffer += queue.dequeue() + } else { + buffer ++= queue + } + if (buffer.size > 0) { + if (oneAtATime) { + Some(buffer.head) + } else { + Some(new UnionRDD(ssc.sc, buffer.toSeq)) + } + } else if (defaultRDD != null) { + Some(defaultRDD) + } else { + None + } + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala new file mode 100644 index 0000000000..c91f12ecd7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext + +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.nio.channels.{ReadableByteChannel, SocketChannel} +import java.io.EOFException +import java.util.concurrent.ArrayBlockingQueue + + +/** + * An input stream that reads blocks of serialized objects from a given network address. + * The blocks will be inserted directly into the block store. This is the fastest way to get + * data into Spark Streaming, though it requires the sender to batch data and serialize it + * in the format that the system is configured with. + */ +private[streaming] +class RawInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + } +} + +private[streaming] +class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any] { + + var blockPushingThread: Thread = null + + override def getLocationPreference = None + + def onStart() { + // Open a socket to the target address and keep reading from it + logInfo("Connecting to " + host + ":" + port) + val channel = SocketChannel.open() + channel.configureBlocking(true) + channel.connect(new InetSocketAddress(host, port)) + logInfo("Connected to " + host + ":" + port) + + val queue = new ArrayBlockingQueue[ByteBuffer](2) + + blockPushingThread = new Thread { + setDaemon(true) + override def run() { + var nextBlockNumber = 0 + while (true) { + val buffer = queue.take() + val blockId = "input-" + streamId + "-" + nextBlockNumber + nextBlockNumber += 1 + pushBlock(blockId, buffer, null, storageLevel) + } + } + } + blockPushingThread.start() + + val lengthBuffer = ByteBuffer.allocate(4) + while (true) { + lengthBuffer.clear() + readFully(channel, lengthBuffer) + lengthBuffer.flip() + val length = lengthBuffer.getInt() + val dataBuffer = ByteBuffer.allocate(length) + readFully(channel, dataBuffer) + dataBuffer.flip() + logInfo("Read a block with " + length + " bytes") + queue.put(dataBuffer) + } + } + + def onStop() { + if (blockPushingThread != null) blockPushingThread.interrupt() + } + + /** Read a buffer fully from a given Channel */ + private def readFully(channel: ReadableByteChannel, dest: ByteBuffer) { + while (dest.position < dest.limit) { + if (channel.read(dest) == -1) { + throw new EOFException("End of channel") + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala new file mode 100644 index 0000000000..b6c672f899 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext._ + +import org.apache.spark.RDD +import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD} +import org.apache.spark.Partitioner +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.streaming.{Duration, Interval, Time, DStream} + +private[streaming] +class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( + parent: DStream[(K, V)], + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + filterFunc: Option[((K, V)) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner + ) extends DStream[(K,V)](parent.ssc) { + + assert(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + assert(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream + val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies = List(reducedStream) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { + super.persist(storageLevel) + reducedStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[(K, V)] = { + super.checkpoint(interval) + //reducedStream.checkpoint(interval) + this + } + + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + val reduceF = reduceFunc + val invReduceF = invReduceFunc + + val currentTime = validTime + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val previousWindow = currentWindow - slideDuration + + logDebug("Window time = " + windowDuration) + logDebug("Slide time = " + slideDuration) + logDebug("ZeroTime = " + zeroTime) + logDebug("Current window = " + currentWindow) + logDebug("Previous window = " + previousWindow) + + // _____________________________ + // | previous window _________|___________________ + // |___________________| current window | --------------> Time + // |_____________________________| + // + // |________ _________| |________ _________| + // | | + // V V + // old RDDs new RDDs + // + + // Get the RDDs of the reduced values in "old time steps" + val oldRDDs = + reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) + logDebug("# old RDDs = " + oldRDDs.size) + + // Get the RDDs of the reduced values in "new time steps" + val newRDDs = + reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) + logDebug("# new RDDs = " + newRDDs.size) + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = + getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + + // Make the list of RDDs that needs to cogrouped together for reducing their reduced values + val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) + //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + + val numOldValues = oldRDDs.size + val numNewValues = newRDDs.size + + val mergeValues = (seqOfValues: Seq[Seq[V]]) => { + if (seqOfValues.size != 1 + numOldValues + numNewValues) { + throw new Exception("Unexpected number of sequences of reduced values") + } + // Getting reduced values "old time steps" that will be removed from current window + val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) + // Getting reduced values "new time steps" + val newValues = + (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) + + if (seqOfValues(0).isEmpty) { + // If previous window's reduce value does not exist, then at least new values should exist + if (newValues.isEmpty) { + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") + } + // Reduce the new values + newValues.reduce(reduceF) // return + } else { + // Get the previous window's reduced value + var tempValue = seqOfValues(0).head + // If old values exists, then inverse reduce then from previous value + if (!oldValues.isEmpty) { + tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) + } + // If new values exists, then reduce them with previous value + if (!newValues.isEmpty) { + tempValue = reduceF(tempValue, newValues.reduce(reduceF)) + } + tempValue // return + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues) + + if (filterFunc.isDefined) { + Some(mergedValuesRDD.filter(filterFunc.get)) + } else { + Some(mergedValuesRDD) + } + } +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala new file mode 100644 index 0000000000..3a0bd2acd7 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.{RDD, Partitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.{Duration, DStream, Time} + +private[streaming] +class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: DStream[(K,V)], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner + ) extends DStream [(K,C)] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[(K,C)]] = { + parent.getOrCompute(validTime) match { + case Some(rdd) => + Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala new file mode 100644 index 0000000000..e2539c7396 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.NextIterator + +import java.io._ +import java.net.Socket + +private[streaming] +class SocketInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) { + + def getReceiver(): NetworkReceiver[T] = { + new SocketReceiver(host, port, bytesToObjects, storageLevel) + } +} + +private[streaming] +class SocketReceiver[T: ClassManifest]( + host: String, + port: Int, + bytesToObjects: InputStream => Iterator[T], + storageLevel: StorageLevel + ) extends NetworkReceiver[T] { + + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + override def getLocationPreference = None + + protected def onStart() { + logInfo("Connecting to " + host + ":" + port) + val socket = new Socket(host, port) + logInfo("Connected to " + host + ":" + port) + blockGenerator.start() + val iterator = bytesToObjects(socket.getInputStream()) + while(iterator.hasNext) { + val obj = iterator.next + blockGenerator += obj + } + } + + protected def onStop() { + blockGenerator.stop() + } + +} + +private[streaming] +object SocketReceiver { + + /** + * This methods translates the data from an inputstream (say, from a socket) + * to '\n' delimited strings and returns an iterator to access the strings. + */ + def bytesToLines(inputStream: InputStream): Iterator[String] = { + val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + new NextIterator[String] { + protected override def getNext() = { + val nextValue = dataInputStream.readLine() + if (nextValue == null) { + finished = true + } + nextValue + } + + protected override def close() { + dataInputStream.close() + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala new file mode 100644 index 0000000000..c1c9f808f0 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.Partitioner +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, Time, DStream} + +private[streaming] +class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest]( + parent: DStream[(K, V)], + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + preservePartitioning: Boolean + ) extends DStream[(K, S)](parent.ssc) { + + super.persist(StorageLevel.MEMORY_ONLY_SER) + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override val mustCheckpoint = true + + override def compute(validTime: Time): Option[RDD[(K, S)]] = { + + // Try to get the previous state RDD + getOrCompute(validTime - slideDuration) match { + + case Some(prevStateRDD) => { // If previous state RDD exists + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on cogrouped RDD; + // first map the cogrouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val i = iterator.map(t => { + (t._1, t._2._1, t._2._2.headOption) + }) + updateFuncLocal(i) + } + val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) + val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime) + return Some(stateRDD) + } + case None => { // If parent RDD does not exist + + // Re-apply the update function to the old state RDD + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, S)]) => { + val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) + updateFuncLocal(i) + } + val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + return Some(stateRDD) + } + } + } + + case None => { // If previous session RDD does not exist (first input data) + + // Try to get the parent RDD + parent.getOrCompute(validTime) match { + case Some(parentRDD) => { // If parent RDD exists, then compute as usual + + // Define the function for the mapPartition operation on grouped RDD; + // first map the grouped tuple to tuples of required type, + // and then apply the update function + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + } + + val groupedRDD = parentRDD.groupByKey(partitioner) + val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) + //logDebug("Generating state RDD for time " + validTime + " (first)") + return Some(sessionRDD) + } + case None => { // If parent RDD does not exist, then nothing to do! + //logDebug("Not generating state RDD (no previous state, no parent)") + return None + } + } + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala new file mode 100644 index 0000000000..edba2032b4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.streaming.{Duration, DStream, Time} + +private[streaming] +class TransformedDStream[T: ClassManifest, U: ClassManifest] ( + parent: DStream[T], + transformFunc: (RDD[T], Time) => RDD[U] + ) extends DStream[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideDuration: Duration = parent.slideDuration + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala new file mode 100644 index 0000000000..387e15b0e6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark._ +import org.apache.spark.streaming._ +import storage.StorageLevel +import twitter4j._ +import twitter4j.auth.Authorization +import java.util.prefs.Preferences +import twitter4j.conf.ConfigurationBuilder +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +* +* If no Authorization object is provided, initializes OAuth authorization using the system +* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. +*/ +private[streaming] +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + twitterAuth: Option[Authorization], + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkInputDStream[Status](ssc_) { + + private def createOAuthAuthorization(): Authorization = { + new OAuthAuthorization(new ConfigurationBuilder().build()) + } + + private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) + + override def getReceiver(): NetworkReceiver[Status] = { + new TwitterReceiver(authorization, filters, storageLevel) + } +} + +private[streaming] +class TwitterReceiver( + twitterAuth: Authorization, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkReceiver[Status] { + + var twitterStream: TwitterStream = _ + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + blockGenerator.start() + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + blockGenerator += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) { stopOnError(e) } + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + logInfo("Twitter receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + twitterStream.shutdown() + logInfo("Twitter receiver stopped") + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala new file mode 100644 index 0000000000..97eab97b2f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.RDD +import collection.mutable.ArrayBuffer +import org.apache.spark.rdd.UnionRDD + +private[streaming] +class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) + extends DStream[T](parents.head.ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different StreamingContexts") + } + + if (parents.map(_.slideDuration).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + parents.map(_.getOrCompute(validTime)).foreach(_ match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + }) + if (rdds.size > 0) { + Some(new UnionRDD(ssc.sc, rdds)) + } else { + None + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala new file mode 100644 index 0000000000..dbbea39e81 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.RDD +import org.apache.spark.rdd.UnionRDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, Interval, Time, DStream} + +private[streaming] +class WindowedDStream[T: ClassManifest]( + parent: DStream[T], + _windowDuration: Duration, + _slideDuration: Duration) + extends DStream[T](parent.ssc) { + + if (!_windowDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + + if (!_slideDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + + parent.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies = List(parent) + + override def slideDuration: Duration = _slideDuration + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def compute(validTime: Time): Option[RDD[T]] = { + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + } +} + + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala new file mode 100644 index 0000000000..4b5d8c467e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receivers + +import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } +import akka.actor.{ actorRef2Scala, ActorRef } +import akka.actor.{ PossiblyHarmful, OneForOneStrategy } + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.NetworkReceiver + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +/** A helper with set of defaults for supervisor strategy **/ +object ReceiverSupervisorStrategy { + + import akka.util.duration._ + import akka.actor.SupervisorStrategy._ + + val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException ⇒ Restart + case _: Exception ⇒ Escalate + } +} + +/** + * A receiver trait to be mixed in with your Actor to gain access to + * pushBlock API. + * + * Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } + * } + * //Can be plugged in actorStream as follows + * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * + * }}} + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of push block and InputDStream + * should be same. + * + */ +trait Receiver { self: Actor ⇒ + def pushBlock[T: ClassManifest](iter: Iterator[T]) { + context.parent ! Data(iter) + } + + def pushBlock[T: ClassManifest](data: T) { + context.parent ! Data(data) + } + +} + +/** + * Statistics for querying the supervisor about state of workers + */ +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors **/ +private[streaming] case class Data[T: ClassManifest](data: T) + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * his own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker,"Worker") + * }}} + * + * + */ +private[streaming] class ActorReceiver[T: ClassManifest]( + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy) + extends NetworkReceiver[T] { + + protected lazy val blocksGenerator: BlockGenerator = + new BlockGenerator(storageLevel) + + protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + private class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + val n: AtomicInteger = new AtomicInteger(0) + val hiccups: AtomicInteger = new AtomicInteger(0) + + def receive = { + + case Data(iter: Iterator[_]) ⇒ pushBlock(iter.asInstanceOf[Iterator[T]]) + + case Data(msg) ⇒ + blocksGenerator += msg.asInstanceOf[T] + n.incrementAndGet + + case props: Props ⇒ + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) ⇒ + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics ⇒ + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + protected def pushBlock(iter: Iterator[T]) { + val buffer = new ArrayBuffer[T] + buffer ++= iter + pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel) + } + + protected def onStart() = { + blocksGenerator.start() + supervisor + logInfo("Supervision tree for receivers initialized at:" + supervisor.path) + } + + protected def onStop() = { + supervisor ! PoisonPill + } + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala new file mode 100644 index 0000000000..043bb8c8bf --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receivers + +import akka.actor.Actor +import akka.zeromq._ + +import org.apache.spark.Logging + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.firstFrameAsString) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail.map(_.payload) + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala new file mode 100644 index 0000000000..f67bb2f6ac --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +private[streaming] +trait Clock { + def currentTime(): Long + def waitTillTime(targetTime: Long): Long +} + +private[streaming] +class SystemClock() extends Clock { + + val minPollTime = 25L + + def currentTime(): Long = { + System.currentTimeMillis() + } + + def waitTillTime(targetTime: Long): Long = { + var currentTime = 0L + currentTime = System.currentTimeMillis() + + var waitTime = targetTime - currentTime + if (waitTime <= 0) { + return currentTime + } + + val pollTime = { + if (waitTime / 10.0 > minPollTime) { + (waitTime / 10.0).toLong + } else { + minPollTime + } + } + + + while (true) { + currentTime = System.currentTimeMillis() + waitTime = targetTime - currentTime + + if (waitTime <= 0) { + + return currentTime + } + val sleepTime = + if (waitTime < pollTime) { + waitTime + } else { + pollTime + } + Thread.sleep(sleepTime) + } + return -1 + } +} + +private[streaming] +class ManualClock() extends Clock { + + var time = 0L + + def currentTime() = time + + def setTime(timeToSet: Long) = { + this.synchronized { + time = timeToSet + this.notifyAll() + } + } + + def addToTime(timeToAdd: Long) = { + this.synchronized { + time += timeToAdd + this.notifyAll() + } + } + def waitTillTime(targetTime: Long): Long = { + this.synchronized { + while (time < targetTime) { + this.wait(100) + } + } + return currentTime() + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala new file mode 100644 index 0000000000..50d72298e4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.apache.spark.{Logging, RDD} +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream.ForEachDStream +import StreamingContext._ + +import scala.util.Random +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import java.io.{File, ObjectInputStream, IOException} +import java.util.UUID + +import com.google.common.io.Files + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.conf.Configuration + + +private[streaming] +object MasterFailureTest extends Logging { + initLogging() + + @volatile var killed = false + @volatile var killCount = 0 + + def main(args: Array[String]) { + if (args.size < 2) { + println( + "Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]") + System.exit(1) + } + val directory = args(0) + val numBatches = args(1).toInt + val batchDuration = if (args.size > 2) Milliseconds(args(2).toInt) else Seconds(1) + + println("\n\n========================= MAP TEST =========================\n\n") + testMap(directory, numBatches, batchDuration) + + println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n") + testUpdateStateByKey(directory, numBatches, batchDuration) + + println("\n\nSUCCESS\n\n") + } + + def testMap(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val input = (1 to numBatches).map(_.toString).toSeq + // Expected output: time=1 ==> [ 1 ] , time=2 ==> [ 2 ] , time=3 ==> [ 3 ] , ... + val expectedOutput = (1 to numBatches) + + val operation = (st: DStream[String]) => st.map(_.toInt) + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size) + logInfo(expectedOutput.mkString("[", ",", "]")) + logInfo("Output, size = " + output.size) + logInfo(output.mkString("[", ",", "]")) + + // Verify whether all the values of the expected output is present + // in the output + assert(output.distinct.toSet == expectedOutput.toSet) + } + + + def testUpdateStateByKey(directory: String, numBatches: Int, batchDuration: Duration) { + // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... + val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq + // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... + val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j)) + + val operation = (st: DStream[String]) => { + val updateFunc = (values: Seq[Long], state: Option[Long]) => { + Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) + } + st.flatMap(_.split(" ")) + .map(x => (x, 1L)) + .updateStateByKey[Long](updateFunc) + .checkpoint(batchDuration * 5) + } + + // Run streaming operation with multiple master failures + val output = testOperation(directory, batchDuration, input, operation, expectedOutput) + + logInfo("Expected output, size = " + expectedOutput.size + "\n" + expectedOutput) + logInfo("Output, size = " + output.size + "\n" + output) + + // Verify whether all the values in the output are among the expected output values + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + + // Verify whether the last expected output value has been generated, there by + // confirming that none of the inputs have been missed + assert(output.last == expectedOutput.last) + } + + /** + * Tests stream operation with multiple master failures, and verifies whether the + * final set of output values is as expected or not. + */ + def testOperation[T: ClassManifest]( + directory: String, + batchDuration: Duration, + input: Seq[String], + operation: DStream[String] => DStream[T], + expectedOutput: Seq[T] + ): Seq[T] = { + + // Just making sure that the expected output does not have duplicates + assert(expectedOutput.distinct.toSet == expectedOutput.toSet) + + // Setup the stream computation with the given operation + val (ssc, checkpointDir, testDir) = setupStreams(directory, batchDuration, operation) + + // Start generating files in the a different thread + val fileGeneratingThread = new FileGeneratingThread(input, testDir, batchDuration.milliseconds) + fileGeneratingThread.start() + + // Run the streams and repeatedly kill it until the last expected output + // has been generated, or until it has run for twice the expected time + val lastExpectedOutput = expectedOutput.last + val maxTimeToRun = expectedOutput.size * batchDuration.milliseconds * 2 + val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) + + // Delete directories + fileGeneratingThread.join() + val fs = checkpointDir.getFileSystem(new Configuration()) + fs.delete(checkpointDir, true) + fs.delete(testDir, true) + logInfo("Finished test after " + killCount + " failures") + mergedOutput + } + + /** + * Sets up the stream computation with the given operation, directory (local or HDFS), + * and batch duration. Returns the streaming context and the directory to which + * files should be written for testing. + */ + private def setupStreams[T: ClassManifest]( + directory: String, + batchDuration: Duration, + operation: DStream[String] => DStream[T] + ): (StreamingContext, Path, Path) = { + // Reset all state + reset() + + // Create the directories for this test + val uuid = UUID.randomUUID().toString + val rootDir = new Path(directory, uuid) + val fs = rootDir.getFileSystem(new Configuration()) + val checkpointDir = new Path(rootDir, "checkpoint") + val testDir = new Path(rootDir, "test") + fs.mkdirs(checkpointDir) + fs.mkdirs(testDir) + + // Setup the streaming computation with the given operation + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) + ssc.checkpoint(checkpointDir.toString) + val inputStream = ssc.textFileStream(testDir.toString) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStream(operatedStream) + ssc.registerOutputStream(outputStream) + (ssc, checkpointDir, testDir) + } + + + /** + * Repeatedly starts and kills the streaming context until timed out or + * the last expected output is generated. Finally, return + */ + private def runStreams[T: ClassManifest]( + ssc_ : StreamingContext, + lastExpectedOutput: T, + maxTimeToRun: Long + ): Seq[T] = { + + var ssc = ssc_ + var totalTimeRan = 0L + var isLastOutputGenerated = false + var isTimedOut = false + val mergedOutput = new ArrayBuffer[T]() + val checkpointDir = ssc.checkpointDir + var batchDuration = ssc.graph.batchDuration + + while(!isLastOutputGenerated && !isTimedOut) { + // Get the output buffer + val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output + def output = outputBuffer.flatMap(x => x) + + // Start the thread to kill the streaming after some time + killed = false + val killingThread = new KillingThread(ssc, batchDuration.milliseconds * 10) + killingThread.start() + + var timeRan = 0L + try { + // Start the streaming computation and let it run while ... + // (i) StreamingContext has not been shut down yet + // (ii) The last expected output has not been generated yet + // (iii) Its not timed out yet + System.clearProperty("spark.streaming.clock") + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + ssc.start() + val startTime = System.currentTimeMillis() + while (!killed && !isLastOutputGenerated && !isTimedOut) { + Thread.sleep(100) + timeRan = System.currentTimeMillis() - startTime + isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput) + isTimedOut = (timeRan + totalTimeRan > maxTimeToRun) + } + } catch { + case e: Exception => logError("Error running streaming context", e) + } + if (killingThread.isAlive) killingThread.interrupt() + ssc.stop() + + logInfo("Has been killed = " + killed) + logInfo("Is last output generated = " + isLastOutputGenerated) + logInfo("Is timed out = " + isTimedOut) + + // Verify whether the output of each batch has only one element or no element + // and then merge the new output with all the earlier output + mergedOutput ++= output + totalTimeRan += timeRan + logInfo("New output = " + output) + logInfo("Merged output = " + mergedOutput) + logInfo("Time ran = " + timeRan) + logInfo("Total time ran = " + totalTimeRan) + + if (!isLastOutputGenerated && !isTimedOut) { + val sleepTime = Random.nextInt(batchDuration.milliseconds.toInt * 10) + logInfo( + "\n-------------------------------------------\n" + + " Restarting stream computation in " + sleepTime + " ms " + + "\n-------------------------------------------\n" + ) + Thread.sleep(sleepTime) + // Recreate the streaming context from checkpoint + ssc = new StreamingContext(checkpointDir) + } + } + mergedOutput + } + + /** + * Verifies the output value are the same as expected. Since failures can lead to + * a batch being processed twice, a batches output may appear more than once + * consecutively. To avoid getting confused with those, we eliminate consecutive + * duplicate batch outputs of values from the `output`. As a result, the + * expected output should not have consecutive batches with the same values as output. + */ + private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) { + // Verify whether expected outputs do not consecutive batches with same output + for (i <- 0 until expectedOutput.size - 1) { + assert(expectedOutput(i) != expectedOutput(i+1), + "Expected output has consecutive duplicate sequence of values") + } + + // Log the output + println("Expected output, size = " + expectedOutput.size) + println(expectedOutput.mkString("[", ",", "]")) + println("Output, size = " + output.size) + println(output.mkString("[", ",", "]")) + + // Match the output with the expected output + output.foreach(o => + assert(expectedOutput.contains(o), "Expected value " + o + " not found") + ) + } + + /** Resets counter to prepare for the test */ + private def reset() { + killed = false + killCount = 0 + } +} + +/** + * This is a output stream just for testing. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + */ +private[streaming] +class TestOutputStream[T: ClassManifest]( + parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + ) extends ForEachDStream[T]( + parent, + (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + } + ) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } +} + + +/** + * Thread to kill streaming context after a random period of time. + */ +private[streaming] +class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { + initLogging() + + override def run() { + try { + // If it is the first killing, then allow the first checkpoint to be created + var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 + val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) + logInfo("Kill wait time = " + killWaitTime) + Thread.sleep(killWaitTime) + logInfo( + "\n---------------------------------------\n" + + "Killing streaming context after " + killWaitTime + " ms" + + "\n---------------------------------------\n" + ) + if (ssc != null) { + ssc.stop() + MasterFailureTest.killed = true + MasterFailureTest.killCount += 1 + } + logInfo("Killing thread finished normally") + } catch { + case ie: InterruptedException => logInfo("Killing thread interrupted") + case e: Exception => logWarning("Exception in killing thread", e) + } + + } +} + + +/** + * Thread to generate input files periodically with the desired text. + */ +private[streaming] +class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) + extends Thread with Logging { + initLogging() + + override def run() { + val localTestDir = Files.createTempDir() + var fs = testDir.getFileSystem(new Configuration()) + val maxTries = 3 + try { + Thread.sleep(5000) // To make sure that all the streaming context has been set up + for (i <- 0 until input.size) { + // Write the data to a local file and then move it to the target test directory + val localFile = new File(localTestDir, (i+1).toString) + val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) + FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + var tries = 0 + var done = false + while (!done && tries < maxTries) { + tries += 1 + try { + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) + done = true + } catch { + case ioe: IOException => { + fs = testDir.getFileSystem(new Configuration()) + logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe) + } + } + } + if (!done) + logError("Could not generate file " + hadoopFile) + else + logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + Thread.sleep(interval) + localFile.delete() + } + logInfo("File generating thread finished normally") + } catch { + case ie: InterruptedException => logInfo("File generating thread interrupted") + case e: Exception => logWarning("File generating in killing thread", e) + } finally { + fs.close() + } + } +} + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala new file mode 100644 index 0000000000..4e6ce6eabd --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import scala.collection.JavaConversions.mapAsScalaMap + +object RawTextHelper { + + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap + * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + */ + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { + val map = new OLMap[String] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.getLong(w) + map.put(w, c + 1) + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator.map{case (k, v) => (k, v)} + } + + /** + * Gets the top k words in terms of word counts. Assumes that each word exists only once + * in the `data` iterator (that is, the counts have been reduced). + */ + def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { + val taken = new Array[(String, Long)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Long) = null + var swap: (String, Long) = null + var count = 0 + + while(data.hasNext) { + value = data.next + if (value != null) { + count += 1 + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + } + return taken.toIterator + } + + /** + * Warms up the SparkContext in master and slave by running tasks to force JIT kick in + * before real workload starts. + */ + def warmUp(sc: SparkContext) { + for(i <- 0 to 1) { + sc.parallelize(1 to 200000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .count() + } + } + + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) + + def max(v1: Long, v2: Long) = math.max(v1, v2) +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala new file mode 100644 index 0000000000..249f6a22ae --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +import java.nio.ByteBuffer +import org.apache.spark.util.{RateLimitedOutputStream, IntParam} +import java.net.ServerSocket +import org.apache.spark.{Logging, KryoSerializer} +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream +import scala.io.Source +import java.io.IOException + +/** + * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a + * specified rate. Used to feed data into RawInputDStream. + */ +object RawTextSender extends Logging { + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println("Usage: RawTextSender <port> <file> <blockSize> <bytesPerSec>") + System.exit(1) + } + // Parse the arguments using a pattern match + val Array(IntParam(port), file, IntParam(blockSize), IntParam(bytesPerSec)) = args + + // Repeat the input data multiple times to fill in a buffer + val lines = Source.fromFile(file).getLines().toArray + val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val ser = new KryoSerializer().newInstance() + val serStream = ser.serializeStream(bufferStream) + var i = 0 + while (bufferStream.position < blockSize) { + serStream.writeObject(lines(i)) + i = (i + 1) % lines.length + } + bufferStream.trim() + val array = bufferStream.array + + val countBuf = ByteBuffer.wrap(new Array[Byte](4)) + countBuf.putInt(array.length) + countBuf.flip() + + val serverSocket = new ServerSocket(port) + logInfo("Listening on port " + port) + + while (true) { + val socket = serverSocket.accept() + logInfo("Got a new connection") + val out = new RateLimitedOutputStream(socket.getOutputStream, bytesPerSec) + try { + while (true) { + out.write(countBuf.array) + out.write(array) + } + } catch { + case e: IOException => + logError("Client disconnected") + socket.close() + } + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala new file mode 100644 index 0000000000..d644240405 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util + +private[streaming] +class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { + + private val minPollTime = 25L + + private val pollTime = { + if (period / 10.0 > minPollTime) { + (period / 10.0).toLong + } else { + minPollTime + } + } + + private val thread = new Thread() { + override def run() { loop } + } + + private var nextTime = 0L + + def getStartTime(): Long = { + (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period + } + + def getRestartTime(originalStartTime: Long): Long = { + val gap = clock.currentTime - originalStartTime + (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime + } + + def start(startTime: Long): Long = { + nextTime = startTime + thread.start() + nextTime + } + + def start(): Long = { + start(getStartTime()) + } + + def stop() { + thread.interrupt() + } + + private def loop() { + try { + while (true) { + clock.waitTillTime(nextTime) + callback(nextTime) + nextTime += period + } + + } catch { + case e: InterruptedException => + } + } +} + +private[streaming] +object RecurringTimer { + + def main(args: Array[String]) { + var lastRecurTime = 0L + val period = 1000 + + def onRecur(time: Long) { + val currentTime = System.currentTimeMillis() + println("" + currentTime + ": " + (currentTime - lastRecurTime)) + lastRecurTime = currentTime + } + val timer = new RecurringTimer(new SystemClock(), period, onRecur) + timer.start() + Thread.sleep(30 * 1000) + timer.stop() + } +} + |