diff options
Diffstat (limited to 'streaming')
16 files changed, 292 insertions, 148 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml index fe869ba66e..1589e3202f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -41,6 +41,12 @@ <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.2.0</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.github.sgroschupf</groupId> @@ -149,5 +155,42 @@ </plugins> </build> </profile> + <profile> + <id>hadoop2-yarn</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2-yarn</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2-yarn</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index e303e33e5e..66e67cbfa1 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -38,11 +38,20 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) private[streaming] class CheckpointWriter(checkpointDir: String) extends Logging { val file = new Path(checkpointDir, "graph") + // The file to which we actually write - and then "move" to file. + private val writeFile = new Path(file.getParent, file.getName + ".next") + private val bakFile = new Path(file.getParent, file.getName + ".bk") + + private var stopped = false + val conf = new Configuration() var fs = file.getFileSystem(conf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) + // Removed code which validates whether there is only one CheckpointWriter per path 'file' since + // I did not notice any errors - reintroduce it ? + class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 @@ -51,15 +60,17 @@ class CheckpointWriter(checkpointDir: String) extends Logging { attempts += 1 try { logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - if (fs.exists(file)) { - val bkFile = new Path(file.getParent, file.getName + ".bk") - FileUtil.copy(fs, file, fs, bkFile, true, true, conf) - logDebug("Moved existing checkpoint file to " + bkFile) - } - val fos = fs.create(file) + // This is inherently thread unsafe .. so alleviating it by writing to '.new' and then doing moves : which should be pretty fast. + val fos = fs.create(writeFile) fos.write(bytes) fos.close() - fos.close() + if (fs.exists(file) && fs.rename(file, bakFile)) { + logDebug("Moved existing checkpoint file to " + bakFile) + } + // paranoia + fs.delete(file, false) + fs.rename(writeFile, file) + val finishTime = System.currentTimeMillis(); logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") @@ -84,7 +95,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging { } def stop() { + synchronized { + if (stopped) return ; + stopped = true + } executor.shutdown() + val startTime = System.currentTimeMillis() + val terminated = executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS) + val endTime = System.currentTimeMillis() + logInfo("CheckpointWriter executor terminated ? " + terminated + ", waited for " + (endTime - startTime) + " ms.") } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 6ad43dd9b5..4eb5e163a2 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -442,7 +442,12 @@ abstract class DStream[T: ClassTag] ( * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) + def count(): DStream[Long] = { + this.map(_ => (null, 1L)) + .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) + .reduceByKey(_ + _) + .map(_._2) + } /** * Return a new DStream in which each RDD contains the counts of each distinct value in @@ -458,7 +463,7 @@ abstract class DStream[T: ClassTag] ( * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { - foreach((r: RDD[T], t: Time) => foreachFunc(r)) + this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) } /** diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index adb7f3a24d..3b331956f5 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -54,8 +54,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { throw new Exception("Batch duration already set as " + batchDuration + ". cannot set it again.") } + batchDuration = duration } - batchDuration = duration } def remember(duration: Duration) { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7646e15521..e06f1bbfd6 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status +import twitter4j.auth.Authorization + /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -187,10 +189,11 @@ class StreamingContext private ( * should be same. */ def actorStream[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } @@ -198,9 +201,10 @@ class StreamingContext private ( * Create an input stream that receives messages pushed by a zeromq publisher. * @param publisherUrl Url of remote zeromq publisher * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ @@ -216,24 +220,39 @@ class StreamingContext private ( } /** - * Create an input stream that pulls messages form a Kafka Broker. + * Create an input stream that pulls messages from a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. + * in its own thread. * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def kafkaStream[T: ClassTag]( + def kafkaStream( zkQuorum: String, groupId: String, topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + ): DStream[String] = { + val kafkaParams = Map[String, String]( + "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") + kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[T: ClassTag, D <: kafka.serializer.Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel) + val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } @@ -363,18 +382,18 @@ class StreamingContext private ( /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret. * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, + twitterAuth: Option[Authorization] = None, filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) + val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) registerInputStream(inputStream) inputStream } @@ -398,7 +417,8 @@ class StreamingContext private ( * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. + * Set as null if no RDD should be returned when empty * @tparam T Type of objects in the RDD */ def queueStream[T: ClassTag]( diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 00e5aa0603..7da732dd88 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -4,24 +4,21 @@ import spark.streaming._ import receivers.{ActorReceiver, ReceiverSupervisorStrategy} import spark.streaming.dstream._ import spark.storage.StorageLevel - import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} import spark.api.java.{JavaSparkContext, JavaRDD} - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - import twitter4j.Status - import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe - import scala.collection.JavaConversions._ + import scala.reflect.ClassTag import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} +import twitter4j.auth.Authorization /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -122,14 +119,16 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. */ - def kafkaStream[T]( + def kafkaStream( zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[T] = { - implicit val cmt: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + : JavaDStream[String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + StorageLevel.MEMORY_ONLY_SER_2) + } /** @@ -137,49 +136,45 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + * */ - def kafkaStream[T]( + def kafkaStream( zkQuorum: String, groupId: String, topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong]) - : JavaDStream[T] = { - implicit val cmt: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.kafkaStream[T]( - zkQuorum, - groupId, - Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) + storageLevel: StorageLevel) + : JavaDStream[String] = { + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. + * @param typeClass Type of RDD + * @param decoderClass Type of kafka decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T]( - zkQuorum: String, - groupId: String, + def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( + typeClass: Class[T], + decoderClass: Class[D], + kafkaParams: JMap[String, String], topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong], storageLevel: StorageLevel) : JavaDStream[T] = { implicit val cmt: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.kafkaStream[T]( - zkQuorum, - groupId, + implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] + ssc.kafkaStream[T, D]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), storageLevel) } @@ -316,44 +311,73 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization object + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + twitterAuth: Authorization, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(Some(twitterAuth), filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ def twitterStream( - username: String, - password: String, filters: Array[String], storageLevel: StorageLevel ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters, storageLevel) + ssc.twitterStream(None, filters, storageLevel) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them */ def twitterStream( - username: String, - password: String, + twitterAuth: Authorization, filters: Array[String] ): JavaDStream[Status] = { - ssc.twitterStream(username, password, filters) + ssc.twitterStream(Some(twitterAuth), filters) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(None, filters) } /** * Create a input stream that returns tweets received from Twitter. - * @param username Twitter username - * @param password Twitter password + * @param twitterAuth Twitter4J Authorization */ def twitterStream( - username: String, - password: String + twitterAuth: Authorization ): JavaDStream[Status] = { - ssc.twitterStream(username, password) + ssc.twitterStream(Some(twitterAuth)) + } + + /** + * Create a input stream that returns tweets received from Twitter using Twitter4J's default + * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, + * .consumerSecret, .accessToken and .accessTokenSecret to be set. + */ + def twitterStream(): JavaDStream[Status] = { + ssc.twitterStream() } /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index e093edb05b..e0f6351ef7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -9,58 +9,51 @@ import java.util.concurrent.Executors import kafka.consumer._ import kafka.message.{Message, MessageSet, MessageAndMetadata} -import kafka.serializer.StringDecoder +import kafka.serializer.Decoder import kafka.utils.{Utils, ZKGroupTopicDirs} import kafka.utils.ZkUtils._ +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ import scala.reflect.ClassTag -// Key for a specific Kafka Partition: (broker, topic, group, part) -case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) - /** * Input stream that pulls messages from a Kafka Broker. * - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. */ private[streaming] -class KafkaInputDStream[T: ClassTag]( +class KafkaInputDStream[T: ClassTag, D <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, - zkQuorum: String, - groupId: String, + kafkaParams: Map[String, String], topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(zkQuorum: String, groupId: String, - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any] { - - // Timeout for establishing a connection to Zookeper in ms. - val ZK_TIMEOUT = 10000 +class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka - var consumerConnector : ZookeeperConsumerConnector = null + var consumerConnector : ConsumerConnector = null def onStop() { blockGenerator.stop() @@ -73,54 +66,59 @@ class KafkaReceiver(zkQuorum: String, groupId: String, // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + groupId) - logInfo("Initial offsets: " + initialOffsets.toString) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) - // Zookeper connection properties + // Kafka connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) - props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) - props.put("groupid", groupId) + kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zkQuorum) + logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zkQuorum) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zk.connect")) - // If specified, set the topic offset - setOffsets(initialOffsets) + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("autooffset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + } // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] + val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } - - } - - // Overwrites the offets in Zookeper. - private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) { - offsets.foreach { case(key, offset) => - val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) - val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, - topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) - } } // Handles Kafka Messages - private class MessageHandler(stream: KafkaStream[String]) extends Runnable { + private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => + for (msgAndMetadata <- stream) { blockGenerator += msgAndMetadata.message - // Keep on handling messages - - true } } } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 52b9968f6e..3b9cd0e89b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -201,7 +201,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case class Block(id: String, iterator: Iterator[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = 200L + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala index c697498862..ff7a58be45 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala @@ -3,34 +3,45 @@ package spark.streaming.dstream import spark._ import spark.streaming._ import storage.StorageLevel - import twitter4j._ -import twitter4j.auth.BasicAuthorization +import twitter4j.auth.Authorization +import java.util.prefs.Preferences +import twitter4j.conf.ConfigurationBuilder +import twitter4j.conf.PropertyConfiguration +import twitter4j.auth.OAuthAuthorization +import twitter4j.auth.AccessToken /* A stream of Twitter statuses, potentially filtered by one or more keywords. * -* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials. * An optional set of string filters can be used to restrict the set of tweets. The Twitter API is * such that this may return a sampled subset of all tweets during each interval. +* +* If no Authorization object is provided, initializes OAuth authorization using the system +* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret. */ private[streaming] class TwitterInputDStream( @transient ssc_ : StreamingContext, - username: String, - password: String, + twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel ) extends NetworkInputDStream[Status](ssc_) { + + private def createOAuthAuthorization(): Authorization = { + new OAuthAuthorization(new ConfigurationBuilder().build()) + } + private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) + override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) + new TwitterReceiver(authorization, filters, storageLevel) } } private[streaming] class TwitterReceiver( - username: String, - password: String, + twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel ) extends NetworkReceiver[Status] { @@ -40,8 +51,7 @@ class TwitterReceiver( protected override def onStart() { blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) + twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { blockGenerator += status diff --git a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala index 3db1eaa834..b3daa5a91b 100644 --- a/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/MasterFailureTest.scala @@ -160,6 +160,7 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") var ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -206,6 +207,7 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { @@ -358,13 +360,16 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) // Write the data to a local file and then move it to the target test directory val localFile = new File(localTestDir, (i+1).toString) val hadoopFile = new Path(testDir, (i+1).toString) + val tempHadoopFile = new Path(testDir, ".tmp_" + (i+1).toString) FileUtils.writeStringToFile(localFile, input(i).toString + "\n") var tries = 0 var done = false while (!done && tries < maxTries) { tries += 1 try { - fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + // fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile) + fs.copyFromLocalFile(new Path(localFile.toString), tempHadoopFile) + fs.rename(tempHadoopFile, hadoopFile) done = true } catch { case ioe: IOException => { diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala index d8b987ec86..bd0b0e74c1 100644 --- a/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/spark/streaming/util/RawTextSender.scala @@ -5,7 +5,7 @@ import spark.util.{RateLimitedOutputStream, IntParam} import java.net.ServerSocket import spark.{Logging, KryoSerializer} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import io.Source +import scala.io.Source import java.io.IOException /** diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 3bed500f73..4cf10582a9 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -4,6 +4,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; @@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; import spark.streaming.InputStreamsSuite; import java.io.*; @@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zk.connect","localhost:12345"); + kafkaParams.put("groupid","consumer-group"); + JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); } @@ -1263,7 +1267,7 @@ public class JavaAPISuite implements Serializable { @Test public void testTwitterStream() { String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY()); + JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); } @Test diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index cf2ed8b1d4..565089a853 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -15,6 +15,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } test("map") { @@ -92,9 +93,9 @@ class BasicOperationsSuite extends TestSuiteBase { test("count") { testOperation( - Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4), + Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4), (s: DStream[Int]) => s.count(), - Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L)) + Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L)) ) } diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 143a26d911..141842c380 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -35,6 +35,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } var ssc: StreamingContext = null @@ -329,6 +330,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 67dca2ac31..b024fc9dcc 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,6 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } @@ -242,6 +243,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i) === expectedOutput(i)) } } + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) + + // Test specifying decoder + val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") + val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + } } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 1b66f3bda2..80d827706f 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -16,6 +16,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") } val largerSlideInput = Seq( |