diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-09 15:06:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-09 15:06:24 -0800 |
commit | f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2 (patch) | |
tree | 452073d1bbee125f0c7c1b8d4908fae54fd1eb2a /streaming | |
parent | 6f713e2a3e56185368b66fb087637dec112a1f5d (diff) | |
parent | 67b9a33628b9934804c36620d8cbc73ef70106ce (diff) | |
download | spark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.tar.gz spark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.tar.bz2 spark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.zip |
Merge branch 'standalone-driver' into driver-test
Conflicts:
core/src/main/scala/org/apache/spark/SparkContext.scala
core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
Diffstat (limited to 'streaming')
29 files changed, 270 insertions, 1273 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml index e3b6fee9b2..459756912d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -61,59 +61,10 @@ <version>1.9.11</version> </dependency> <dependency> - <groupId>com.sksamuel.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.0-beta1</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> - <version>1.2.0</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.xerial.snappy</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>3.0.3</version> - <exclusions> - <exclusion> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> @@ -137,11 +88,6 @@ <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> - <dependency> - <groupId>org.eclipse.paho</groupId> - <artifactId>mqtt-client</artifactId> - <version>0.4.0</version> - </dependency> </dependencies> <build> <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> @@ -151,6 +97,35 @@ <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> </plugin> + + <!-- + This plugin forces the generation of jar containing streaming test classes, + so that the tests classes of external modules can use them. The two execution profiles + are necessary - first one for 'mvn package', second one for 'mvn compile'. Ideally, + 'mvn compile' should not compile test classes and therefore should not need this. + However, an open Maven bug (http://jira.codehaus.org/browse/MNG-3559) + causes the compilation to fail if streaming test-jar is not generated. Hence, the + second execution profile for 'mvn compile'. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + <execution> + <id>test-jar-on-compile</id> + <phase>compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index a32e4852c5..476ae70bc9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -24,10 +24,9 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkException, Logging} +import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.streaming.scheduler.JobGenerator @@ -36,14 +35,14 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName - val sparkHome = ssc.sc.sparkHome + val sparkHome = ssc.sc.getSparkHome.getOrElse(null) val jars = ssc.sc.jars - val environment = ssc.sc.environment val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes() - val delaySeconds = MetadataCleaner.getDelaySeconds + val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) + val sparkConf = ssc.conf def validate() { assert(master != null, "Checkpoint.master is null") @@ -59,11 +58,12 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoopConf: Configuration) extends Logging { +class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) + extends Logging { val file = new Path(checkpointDir, "graph") val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) - val compressionCodec = CompressionCodec.createCodec() + val compressionCodec = CompressionCodec.createCodec(conf) // The file to which we actually write - and then "move" to file val writeFile = new Path(file.getParent, file.getName + ".next") // The file to which existing checkpoint is backed up (i.e. "moved") @@ -162,7 +162,7 @@ object CheckpointReader extends Logging { private val graphFileNames = Seq("graph", "graph.bk") - def read(checkpointDir: String, hadoopConf: Configuration): Option[Checkpoint] = { + def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) @@ -175,7 +175,7 @@ object CheckpointReader extends Logging { } logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - val compressionCodec = CompressionCodec.createCodec() + val compressionCodec = CompressionCodec.createCodec(conf) existingFiles.foreach(file => { logInfo("Attempting to load checkpoint from file '" + file + "'") try { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 20074249d7..2cdd45291d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { - initLogging() - // ======================================================================= // Methods that should be implemented by subclasses of DStream // ======================================================================= @@ -208,7 +206,7 @@ abstract class DStream[T: ClassTag] ( checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." ) - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds + val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) logInfo("metadataCleanupDelay = " + metadataCleanerDelay) assert( metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index 1081d3c807..1589bc19a2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -52,7 +52,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // Add the checkpoint files to the data to be serialized - if (!currentCheckpointFiles.isEmpty) { + if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles timeToCheckpointFile ++= currentCheckpointFiles diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 34919d315c..0cca6d50e6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -24,7 +24,6 @@ import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job final private[streaming] class DStreamGraph extends Serializable with Logging { - initLogging() private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 80af96c060..56dbcbda23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -108,8 +108,9 @@ extends Serializable { createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner) : DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) + partitioner: Partitioner, + mapSideCombine: Boolean = true): DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } /** @@ -173,7 +174,13 @@ extends Serializable { slideDuration: Duration, partitioner: Partitioner ): DStream[(K, Seq[V])] = { - self.window(windowDuration, slideDuration).groupByKey(partitioner) + val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 + self.groupByKey(partitioner) + .window(windowDuration, slideDuration) + .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 30deba417e..76be81603c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,23 +17,6 @@ package org.apache.spark.streaming -import akka.actor.Props -import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe -import akka.util.ByteString - -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.StreamingListener - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy -import org.apache.spark.streaming.receivers.ZeroMQReceiver -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner -import org.apache.spark.streaming.receivers.ActorReceiver -import org.apache.spark.streaming.scheduler.{JobScheduler, NetworkInputTracker} - import scala.collection.mutable.Queue import scala.collection.Map import scala.reflect.ClassTag @@ -41,14 +24,21 @@ import scala.reflect.ClassTag import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger +import akka.actor.Props +import akka.actor.SupervisorStrategy import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.{LocalFileSystem, Path} +import org.apache.hadoop.fs.Path -import twitter4j.Status -import twitter4j.auth.Authorization +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receivers._ +import org.apache.spark.streaming.scheduler._ import org.apache.hadoop.conf.Configuration /** @@ -64,18 +54,27 @@ class StreamingContext private ( /** * Create a StreamingContext using an existing SparkContext. - * @param sparkContext Existing SparkContext - * @param batchDuration The time interval at which streaming data will be divided into batches + * @param sparkContext existing SparkContext + * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(sparkContext: SparkContext, batchDuration: Duration) = { this(sparkContext, null, batchDuration) } /** + * Create a StreamingContext by providing the configuration necessary for a new SparkContext. + * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters + * @param batchDuration the time interval at which streaming data will be divided into batches + */ + def this(conf: SparkConf, batchDuration: Duration) = { + this(StreamingContext.createNewSparkContext(conf), null, batchDuration) + } + + /** * Create a StreamingContext by providing the details necessary for creating a new SparkContext. - * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param appName A name for your job, to display on the cluster web UI - * @param batchDuration The time interval at which streaming data will be divided into batches + * @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName a name for your job, to display on the cluster web UI + * @param batchDuration the time interval at which streaming data will be divided into batches */ def this( master: String, @@ -95,20 +94,20 @@ class StreamingContext private ( * HDFS compatible filesystems */ def this(path: String, hadoopConf: Configuration = new Configuration) = - this(null, CheckpointReader.read(path, hadoopConf).get, null) - - initLogging() + this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null) if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") } - if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) { - MetadataCleaner.setDelaySeconds(cp_.delaySeconds) + private val conf_ = Option(sc_).map(_.conf).getOrElse(cp_.sparkConf) + + if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds(conf_) < 0) { + MetadataCleaner.setDelaySeconds(conf_, cp_.delaySeconds) } - if (MetadataCleaner.getDelaySeconds < 0) { + if (MetadataCleaner.getDelaySeconds(conf_) < 0) { throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; " + "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)") } @@ -117,12 +116,14 @@ class StreamingContext private ( protected[streaming] val sc: SparkContext = { if (isCheckpointPresent) { - new SparkContext(cp_.master, cp_.framework, cp_.sparkHome, cp_.jars, cp_.environment) + new SparkContext(cp_.sparkConf) } else { sc_ } } + protected[streaming] val conf = sc.conf + protected[streaming] val env = SparkEnv.get protected[streaming] val graph: DStreamGraph = { @@ -231,74 +232,6 @@ class StreamingContext private ( } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( - kafkaParams, - topics, - storageLevel) - } - - /** - * Create an input stream that pulls messages from a Kafka Broker. - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def kafkaStream[ - K: ClassTag, - V: ClassTag, - U <: kafka.serializer.Decoder[_]: Manifest, - T <: kafka.serializer.Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. @@ -337,22 +270,6 @@ class StreamingContext private ( } /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create a input stream from network source hostname:port, where data is received * as serialized blocks (serialized using the Spark's serializer) that can be directly * pushed into the block manager without deserializing them. This is the most efficient @@ -427,24 +344,6 @@ class StreamingContext private ( } /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) - registerInputStream(inputStream) - inputStream - } - - /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs @@ -477,21 +376,6 @@ class StreamingContext private ( inputStream } -/** - * Create an input stream that receives messages pushed by a mqtt publisher. - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - - def mqttStream( - brokerUrl: String, - topic: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = { - val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel) - registerInputStream(inputStream) - inputStream - } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ @@ -614,7 +498,7 @@ object StreamingContext extends Logging { ): StreamingContext = { try { - CheckpointReader.read(checkpointPath, hadoopConf) match { + CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) match { case Some(checkpoint) => return new StreamingContext(null, checkpoint, null) case None => @@ -634,18 +518,40 @@ object StreamingContext extends Logging { } } + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to StreamingContext. + */ + def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls) + + + protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second batch intervals. + if (MetadataCleaner.getDelaySeconds(conf) < 0) { + MetadataCleaner.setDelaySeconds(conf, 3600) + } + val sc = new SparkContext(conf) + sc + } + protected[streaming] def createNewSparkContext( master: String, appName: String, sparkHome: String, jars: Seq[String], - environment: Map[String, String]): SparkContext = { + environment: Map[String, String] + ): SparkContext = { + + val conf = SparkContext.updatedConf( + new SparkConf(), master, appName, sparkHome, jars, environment) // Set the default cleaner delay to an hour if not already set. - // This should be sufficient for even 1 second interval. - if (MetadataCleaner.getDelaySeconds < 0) { - MetadataCleaner.setDelaySeconds(3600) + // This should be sufficient for even 1 second batch intervals. + if (MetadataCleaner.getDelaySeconds(conf) < 0) { + MetadataCleaner.setDelaySeconds(conf, 3600) } - new SparkContext(master, appName, sparkHome, jars, environment) + val sc = new SparkContext(master, appName, sparkHome, jars, environment) + sc } protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index dfd6e27c3e..6c3467d405 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more * information. */ def combineByKey[C](createCombiner: JFunction[V, C], @@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C](createCombiner: JFunction[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner, + mapSideCombine: Boolean + ): JavaPairDStream[K, C] = { + implicit val cm: ClassTag[C] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine) + } + + /** * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index f38d145317..d96e9ac7b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,28 +17,23 @@ package org.apache.spark.streaming.api.java -import java.lang.{Integer => JInt} -import java.io.InputStream -import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import twitter4j.Status -import akka.actor.Props -import akka.actor.SupervisorStrategy -import akka.zeromq.Subscribe -import akka.util.ByteString +import java.io.InputStream +import java.lang.{Integer => JInt} +import java.util.{List => JList, Map => JMap} -import twitter4j.auth.Authorization +import akka.actor.{Props, SupervisorStrategy} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.hadoop.conf.Configuration @@ -125,6 +120,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(sparkContext.sc, batchDuration)) /** + * Creates a StreamingContext using an existing SparkContext. + * @param conf A Spark application configuration + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(conf: SparkConf, batchDuration: Duration) = + this(new StreamingContext(conf, batchDuration)) + + /** * Re-creates a StreamingContext from a checkpoint file. * @param path Path to the directory that was specified as the checkpoint directory */ @@ -141,81 +144,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt]) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - StorageLevel.MEMORY_ONLY_SER_2) - - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - * - */ - def kafkaStream( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[String, String] = { - implicit val cmt: ClassTag[String] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] - ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration paramaters. - * See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. Defaults to memory-only - */ - def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel) - : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] - implicit val valueCmt: ClassTag[V] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] - - implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] - implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] - - ssc.kafkaStream[K, V, U, T]( - kafkaParams.toMap, - Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** * Create a input stream from network source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited * lines. @@ -329,98 +257,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): - JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port, storageLevel) - } - - - /** - * Create a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { - ssc.flumeStream(hostname, port) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization object - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - filters: Array[String], - storageLevel: StorageLevel - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters, storageLevel) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - twitterAuth: Authorization, - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth), filters) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - * @param filters Set of filter strings to get only those tweets that match them - */ - def twitterStream( - filters: Array[String] - ): JavaDStream[Status] = { - ssc.twitterStream(None, filters) - } - - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J Authorization - */ - def twitterStream( - twitterAuth: Authorization - ): JavaDStream[Status] = { - ssc.twitterStream(Some(twitterAuth)) - } - - /** - * Create a input stream that returns tweets received from Twitter using Twitter4J's default - * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret to be set. - */ - def twitterStream(): JavaDStream[Status] = { - ssc.twitterStream() - } - - /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor @@ -483,70 +319,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) - } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - */ - def zeroMQStream[T]( - publisherUrl:String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] - ): JavaDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - ssc.zeroMQStream[T](publisherUrl, subscribe, fn) - } - - /** * Registers an output stream that will be computed every interval */ def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) { @@ -716,7 +488,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { def stop() = ssc.stop() } - /** * JavaStreamingContext object contains a number of static utility functions. */ @@ -787,6 +558,12 @@ object JavaStreamingContext { }, hadoopConf, createOnError) new JavaStreamingContext(ssc) } + + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to StreamingContext. + */ + def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala deleted file mode 100644 index 60d79175f1..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import java.net.InetSocketAddress -import java.io.{ObjectInput, ObjectOutput, Externalizable} -import java.nio.ByteBuffer - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.avro.ipc.NettyServer - -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.util.Utils -import org.apache.spark.storage.StorageLevel - -private[streaming] -class FlumeInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event : AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) - val key : String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.read(valBuff) - val value : String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { - val body = event.getBody.array() - out.writeInt(body.length) - out.write(body) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -private[streaming] -class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { - override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) - Status.OK - } - - override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface.*/ -private[streaming] -class FlumeReceiver( - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { - - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() - server.start() - logInfo("Flume receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - logInfo("Flume receiver stopped") - } - - override def getLocationPreference = Some(host) -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala deleted file mode 100644 index 526f5564c7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext - -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient._ - -import scala.collection.Map -import scala.reflect.ClassTag - -/** - * Input stream that pulls messages from a Kafka Broker. - * - * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - */ -private[streaming] -class KafkaInputDStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - @transient ssc_ : StreamingContext, - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[(K, V)] = { - new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[(K, V)]] - } -} - -private[streaming] -class KafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: Manifest, - T <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - - // Handles pushing data into the BlockManager - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - // Connection to Kafka - var consumerConnector : ConsumerConnector = null - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) - - // Kafka connection properties - val props = new Properties() - kafkaParams.foreach(param => props.put(param._1, param._2)) - - // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zookeeper.connect")) - - // When autooffset.reset is defined, it is our responsibility to try and whack the - // consumer group zk node. - if (kafkaParams.contains("auto.offset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) - } - - val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[V]] - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams( - topics, keyDecoder, valueDecoder) - - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - } - - // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) - extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - for (msgAndMetadata <- stream) { - blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) - } - } - } - - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because - // Kafka 0.7.2 only honors this param when the group is not in zookeeper. - // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' - // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': - // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala - private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { - try { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - zk.deleteRecursive(dir) - zk.close() - } catch { - case _ : Throwable => // swallow - } - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala deleted file mode 100644 index ef4a737568..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } - -import java.util.Properties -import java.util.concurrent.Executors -import java.io.IOException - -import org.eclipse.paho.client.mqttv3.MqttCallback -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic - -import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -/** - * Input stream that subscribe messages from a Mqtt Broker. - * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ - * @param brokerUrl Url of remote mqtt publisher - * @param topic topic name to subscribe to - * @param storageLevel RDD storage level. - */ - -private[streaming] -class MQTTInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new MQTTReceiver(brokerUrl, topic, storageLevel) - .asInstanceOf[NetworkReceiver[T]] - } -} - -private[streaming] -class MQTTReceiver(brokerUrl: String, - topic: String, - storageLevel: StorageLevel - ) extends NetworkReceiver[Any] { - lazy protected val blockGenerator = new BlockGenerator(storageLevel) - - def onStop() { - blockGenerator.stop() - } - - def onStart() { - - blockGenerator.start() - - // Set up persistence for messages - var peristance: MqttClientPersistence = new MemoryPersistence() - - // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance - var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) - - // Connect to MqttBroker - client.connect() - - // Subscribe to Mqtt topic - client.subscribe(topic) - - // Callback automatically triggers as and when new message arrives on specified topic - var callback: MqttCallback = new MqttCallback() { - - // Handles Mqtt message - override def messageArrived(arg0: String, arg1: MqttMessage) { - blockGenerator += new String(arg1.getPayload()) - } - - override def deliveryComplete(arg0: IMqttDeliveryToken) { - } - - override def connectionLost(arg0: Throwable) { - logInfo("Connection lost " + arg0) - } - } - - // Set up callback for MqttClient - client.setCallback(callback) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index 5add20871e..27d474c0a0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe */ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging { - initLogging() - lazy protected val env = SparkEnv.get lazy protected val actor = env.actorSystem.actorOf( @@ -176,8 +174,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** A helper actor that communicates with the NetworkInputTracker */ private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") - val ip = System.getProperty("spark.driver.host", "localhost") - val port = System.getProperty("spark.driver.port", "7077").toInt + val ip = env.conf.get("spark.driver.host", "localhost") + val port = env.conf.get("spark.driver.port", "7077").toInt val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorSelection(url) val timeout = 5.seconds @@ -214,7 +212,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong + val blockInterval = env.conf.get("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index e6e0022097..84e69f277b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner - ) extends DStream [(K,C)] (parent.ssc) { + partitioner: Partitioner, + mapSideCombine: Boolean = true + ) extends DStream[(K,C)] (parent.ssc) { override def dependencies = List(parent) @@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { - case Some(rdd) => - Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case Some(rdd) => Some(rdd.combineByKey[C]( + createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala deleted file mode 100644 index 387e15b0e6..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark._ -import org.apache.spark.streaming._ -import storage.StorageLevel -import twitter4j._ -import twitter4j.auth.Authorization -import java.util.prefs.Preferences -import twitter4j.conf.ConfigurationBuilder -import twitter4j.conf.PropertyConfiguration -import twitter4j.auth.OAuthAuthorization -import twitter4j.auth.AccessToken - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -* -* If no Authorization object is provided, initializes OAuth authorization using the system -* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. -*/ -private[streaming] -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - twitterAuth: Option[Authorization], - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { - - private def createOAuthAuthorization(): Authorization = { - new OAuthAuthorization(new ConfigurationBuilder().build()) - } - - private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - - override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(authorization, filters, storageLevel) - } -} - -private[streaming] -class TwitterReceiver( - twitterAuth: Authorization, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { - - var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - blockGenerator.start() - twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - blockGenerator += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { stopOnError(e) } - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - logInfo("Twitter receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - twitterStream.shutdown() - logInfo("Twitter receiver stopped") - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 73d959331a..89c43ff935 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.UnionRDD +import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import org.apache.spark.streaming._ +import org.apache.spark.streaming.Duration import scala.reflect.ClassTag @@ -51,6 +51,14 @@ class WindowedDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) - Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) + val rddsInWindow = parent.slice(currentWindow) + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + logDebug("Using partition aware union for windowing at " + validTime) + new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) + } else { + logDebug("Using normal union for windowing at " + validTime) + new UnionRDD(ssc.sc,rddsInWindow) + } + Some(windowRDD) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala deleted file mode 100644 index f164d516b0..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.receivers - -import scala.reflect.ClassTag - -import akka.actor.Actor -import akka.util.ByteString -import akka.zeromq._ - -import org.apache.spark.Logging - -/** - * A receiver to subscribe to ZeroMQ stream. - */ -private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) - extends Actor with Receiver with Logging { - - override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), - Connect(publisherUrl), subscribe) - - def receive: Receive = { - - case Connecting ⇒ logInfo("connecting ...") - - case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.frame(0)) - - //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail - pushBlock(bytesToObjects(bytes)) - - case Closed ⇒ logInfo("received closed ") - - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 6fbe6da921..917b4c57f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -37,8 +37,6 @@ private[scheduler] case class ClearCheckpointData(time: Time) extends JobGenerat private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { - initLogging() - val ssc = jobScheduler.ssc val graph = ssc.graph val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor { @@ -49,14 +47,14 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } })) val clock = { - val clockClass = System.getProperty( + val clockClass = ssc.sc.conf.get( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") Class.forName(clockClass).newInstance().asInstanceOf[Clock] } val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventProcessorActor ! GenerateJobs(new Time(longTime))) lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(this, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } @@ -68,7 +66,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { startFirstTime() } } - + def stop() { timer.stop() if (checkpointWriter != null) checkpointWriter.stop() @@ -112,7 +110,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds - val jumpTime = System.getProperty("spark.streaming.manualClock.jump", "0").toLong + val jumpTime = ssc.sc.conf.get("spark.streaming.manualClock.jump", "0").toLong clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 9511ccfbed..9304fc1a93 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -25,15 +25,13 @@ import org.apache.spark.streaming._ /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate - * the jobs and runs them using a thread pool. Number of threads + * the jobs and runs them using a thread pool. Number of threads */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { - initLogging() - val jobSets = new ConcurrentHashMap[Time, JobSet] - val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt + val numConcurrentJobs = ssc.conf.get("spark.streaming.concurrentJobs", "1").toInt val executor = Executors.newFixedThreadPool(numConcurrentJobs) val generator = new JobGenerator(this) val listenerBus = new StreamingListenerBus() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index abff55d77c..75f7244643 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -160,7 +160,9 @@ class NetworkInputTracker( } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. - ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + if (!ssc.sparkContext.isLocal) { + ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + } // Distribute the receivers and start them ssc.sparkContext.runJob(tempRDD, startReceiver) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 4a3993e3e3..1559f7a9f7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration private[streaming] object MasterFailureTest extends Logging { - initLogging() @volatile var killed = false @volatile var killCount = 0 @@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag]( */ private[streaming] class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging { - initLogging() override def run() { try { @@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread private[streaming] class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) extends Thread with Logging { - initLogging() override def run() { val localTestDir = Files.createTempDir() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index fc8655a083..6585d494a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer import org.apache.spark.util.{RateLimitedOutputStream, IntParam} import java.net.ServerSocket -import org.apache.spark.{Logging} +import org.apache.spark.{SparkConf, Logging} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import scala.io.Source import java.io.IOException @@ -42,7 +42,7 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) - val ser = new KryoSerializer().newInstance() + val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 while (bufferStream.position < blockSize) { diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..0d2145da9a 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,23 +17,19 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; +import scala.Tuple2; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.io.*; +import java.util.*; -import scala.Tuple2; -import twitter4j.Status; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -43,39 +39,11 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; -import org.apache.spark.streaming.JavaTestUtils; -import org.apache.spark.streaming.JavaCheckpointTestUtils; - -import java.io.*; -import java.util.*; - -import akka.actor.Props; -import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -101,7 +69,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon")); - List<List<Integer>> expected = Arrays.asList( + List<List<Integer>> expected = Arrays.asList( Arrays.asList(5,5), Arrays.asList(9,4)); @@ -1597,26 +1565,6 @@ public class JavaAPISuite implements Serializable { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testKafkaStream() { - HashMap<String, Integer> topics = Maps.newHashMap(); - JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK()); - - HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = ssc.kafkaStream( - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test public void testSocketTextStream() { JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @@ -1654,36 +1602,4 @@ public class JavaAPISuite implements Serializable { public void testRawSocketStream() { JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } - - @Test - public void testFlumeStream() { - JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testFileStream() { - JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); - } - - @Test - public void testTwitterStream() { - String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testActorStream() { - JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testZeroMQStream() { - JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { - @Override - public Iterable<String> call(byte[][] b) throws Exception { - return null; - } - }); - } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000..34bee56885 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index b35ca00b53..ee6b433d1f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -23,9 +23,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import util.ManualClock +import org.apache.spark.{SparkContext, SparkConf} class BasicOperationsSuite extends TestSuiteBase { - test("map") { val input = Seq(1 to 4, 5 to 8, 9 to 12) testOperation( @@ -375,7 +375,11 @@ class BasicOperationsSuite extends TestSuiteBase { } test("slice") { - val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1)) + val conf2 = new SparkConf() + .setMaster("local[2]") + .setAppName("BasicOperationsSuite") + .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + val ssc = new StreamingContext(new SparkContext(conf2), Seconds(1)) val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 53bc24ff7a..6499de98c9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -57,7 +57,7 @@ class CheckpointSuite extends TestSuiteBase { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) val fs = FileSystem.getLocal(new Configuration()) @@ -132,8 +132,9 @@ class CheckpointSuite extends TestSuiteBase { assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") - // Adjust manual clock time as if it is being restarted after a delay - System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) + // Adjust manual clock time as if it is being restarted after a delay; this is a hack because + // we modify the conf object, but it works for this one property + ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) ssc.start() advanceTimeWithRealDelay(ssc, 4) ssc.stop() @@ -196,10 +197,6 @@ class CheckpointSuite extends TestSuiteBase { // It also tests whether batches, whose processing was incomplete due to the // failure, are re-processed or not. test("recovery with file input stream") { - // Disable manual clock as FileInputDStream does not work with manual clock - val clockProperty = System.getProperty("spark.streaming.clock") - System.clearProperty("spark.streaming.clock") - // Set up the streaming context and input streams val testDir = Files.createTempDir() var ssc = new StreamingContext(master, framework, Seconds(1)) @@ -296,10 +293,6 @@ class CheckpointSuite extends TestSuiteBase { ) // To ensure that all the inputs were received correctly assert(expectedOutput.last === output.last) - - // Enable manual clock back again for other tests - if (clockProperty != null) - System.setProperty("spark.streaming.clock", clockProperty) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 5fa14ad7c4..a8e053278a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,7 +23,7 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent} +import org.apache.spark.streaming.dstream.{NetworkReceiver} import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} @@ -31,18 +31,11 @@ import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.ManualClock import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter -import org.apache.flume.source.avro.AvroSourceProtocol -import org.apache.flume.source.avro.AvroFlumeEvent -import org.apache.flume.source.avro.Status -import org.apache.avro.ipc.{specific, NettyTransceiver} -import org.apache.avro.ipc.specific.SpecificRequestor -import java.nio.ByteBuffer import collection.JavaConversions._ -import java.nio.charset.Charset import com.google.common.io.Files import java.util.concurrent.atomic.AtomicInteger @@ -56,9 +49,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) ssc.registerOutputStream(outputStream) @@ -99,62 +92,13 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("flume input stream") { - // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework, batchDuration) - val flumeStream = ssc.flumeStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] - with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) - ssc.start() - - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val input = Seq(1, 2, 3, 4, 5) - Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) - val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver) - - for (i <- 0 until input.size) { - val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) - event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) - client.append(event) - Thread.sleep(500) - clock.addToTime(batchDuration.milliseconds) - } - - val startTime = System.currentTimeMillis() - while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) - Thread.sleep(100) - } - Thread.sleep(1000) - val timeTaken = System.currentTimeMillis() - startTime - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - logInfo("Stopping context") - ssc.stop() - - val decoder = Charset.forName("UTF-8").newDecoder() - - assert(outputBuffer.size === input.length) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - val str = decoder.decode(outputBuffer(i).head.event.getBody) - assert(str.toString === input(i).toString) - assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") - } - } - - test("file input stream") { // Disable manual clock as FileInputDStream does not work with manual clock - System.clearProperty("spark.streaming.clock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") // Set up the streaming context and input streams val testDir = Files.createTempDir() - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val fileStream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) @@ -195,7 +139,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(testDir) // Enable manual clock back again for other tests - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } @@ -206,7 +150,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor", StorageLevel.MEMORY_AND_DISK) //Had to pass the local value of port to prevent from closing over entire scope val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] @@ -249,20 +193,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("kafka input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) - val topics = Map("my-topic" -> 1) - val test1 = ssc.kafkaStream("localhost:12345", "group", topics) - val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) - - // Test specifying decoder - val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") - val test3 = ssc.kafkaStream[ - String, - String, - kafka.serializer.StringDecoder, - kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) - } test("multi-thread receiver") { // set up the test receiver @@ -273,7 +203,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { MultiThreadTestReceiver.haveAllThreadsFinished = false // set up the network stream using the test receiver - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) val networkStream = ssc.networkStream[Int](testReceiver) val countStream = networkStream.count val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index e969e91d13..b20d02f996 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -28,7 +28,7 @@ import java.io.{ObjectInputStream, IOException} import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, SparkConf, Logging} import org.apache.spark.rdd.RDD /** @@ -133,20 +133,22 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { // Whether to actually wait in real time before changing manual clock def actuallyWait = false + //// A SparkConf to use in tests. Can be modified before calling setupStreams to configure things. + val conf = new SparkConf() + .setMaster(master) + .setAppName(framework) + .set("spark.cleaner.ttl", "3600") + // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { if (useManualClock) { - System.setProperty( - "spark.streaming.clock", - "org.apache.spark.streaming.util.ManualClock" - ) + logInfo("Using manual clock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") } else { - System.clearProperty("spark.streaming.clock") + logInfo("Using real clock") + conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } // Default after function for any streaming test suite. Override this @@ -169,9 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { operation: DStream[U] => DStream[V], numPartitions: Int = numInputPartitions ): StreamingContext = { - // Create StreamingContext - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } @@ -195,9 +196,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W] ): StreamingContext = { - // Create StreamingContext - val ssc = new StreamingContext(master, framework, batchDuration) + val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } @@ -273,7 +273,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(100) + Thread.sleep(10) } val timeTaken = System.currentTimeMillis() - startTime diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index 6b4aaefcdf..c39abfc21b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { @@ -225,9 +224,7 @@ class WindowOperationsSuite extends TestSuiteBase { val slideDuration = Seconds(1) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.groupByKeyAndWindow(windowDuration, slideDuration) - .map(x => (x._1, x._2.toSet)) - .persist() + s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet)) } testOperation(input, operation, expectedOutput, numBatches, true) } |