aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/akka/pom.xml70
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala306
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala147
-rw-r--r--external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java68
-rw-r--r--external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala67
-rw-r--r--external/flume-assembly/pom.xml168
-rw-r--r--external/flume-sink/pom.xml129
-rw-r--r--external/flume-sink/src/main/avro/sparkflume.avdl40
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala127
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala166
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala171
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala35
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala28
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala252
-rw-r--r--external/flume-sink/src/test/resources/log4j.properties28
-rw-r--r--external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala218
-rw-r--r--external/flume/pom.xml78
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala166
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala205
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala123
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala117
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala311
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java21
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala129
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala102
-rw-r--r--external/mqtt-assembly/pom.xml175
-rw-r--r--external/mqtt/pom.xml104
-rw-r--r--external/mqtt/src/main/assembly/assembly.xml44
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala102
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala92
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java21
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala23
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java37
-rw-r--r--external/mqtt/src/test/resources/log4j.properties28
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala79
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala111
-rw-r--r--external/twitter/pom.xml70
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala115
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala132
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java21
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala23
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java44
-rw-r--r--external/twitter/src/test/resources/log4j.properties28
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala59
-rw-r--r--external/zeromq/pom.xml74
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala55
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala163
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java21
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala23
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java63
-rw-r--r--external/zeromq/src/test/resources/log4j.properties28
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala63
63 files changed, 0 insertions, 5706 deletions
diff --git a/external/akka/pom.xml b/external/akka/pom.xml
deleted file mode 100644
index bbe644e3b3..0000000000
--- a/external/akka/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-akka_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-akka</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Akka</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
- <version>${akka.version}</version>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
deleted file mode 100644
index 33415c15be..0000000000
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.reflect.ClassTag
-
-import akka.actor._
-import akka.actor.SupervisorStrategy.{Escalate, Restart}
-import akka.pattern.ask
-import akka.util.Timeout
-import com.typesafe.config.ConfigFactory
-
-import org.apache.spark.{Logging, TaskContext}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * :: DeveloperApi ::
- * A helper with set of defaults for supervisor strategy
- */
-@DeveloperApi
-object ActorReceiver {
-
- /**
- * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
- * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
- * others, it just escalates the failure to the supervisor of the supervisor.
- */
- val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
- 15 millis) {
- case _: RuntimeException => Restart
- case _: Exception => Escalate
- }
-
- /**
- * A default ActorSystem creator. It will use a unique system name
- * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
- * communication.
- */
- val defaultActorSystemCreator: () => ActorSystem = () => {
- val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
- val akkaConf = ConfigFactory.parseString(
- s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
- |""".stripMargin)
- ActorSystem(uniqueSystemName, akkaConf)
- }
-}
-
-/**
- * :: DeveloperApi ::
- * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- * class MyActor extends ActorReceiver {
- * def receive {
- * case anything: String => store(anything)
- * }
- * }
- *
- * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of push block and InputDStream
- * should be same.
- */
-@DeveloperApi
-abstract class ActorReceiver extends Actor {
-
- /** Store an iterator of received data as a data block into Spark's memory. */
- def store[T](iter: Iterator[T]) {
- context.parent ! IteratorData(iter)
- }
-
- /**
- * Store the bytes of received data as a data block into Spark's memory. Note
- * that the data in the ByteBuffer must be serialized using the same serializer
- * that Spark is configured to use.
- */
- def store(bytes: ByteBuffer) {
- context.parent ! ByteBufferData(bytes)
- }
-
- /**
- * Store a single item of received data to Spark's memory asynchronously.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- */
- def store[T](item: T) {
- context.parent ! SingleItemData(item)
- }
-
- /**
- * Store a single item of received data to Spark's memory and returns a `Future`.
- * The `Future` will be completed when the operator finishes, or with an
- * `akka.pattern.AskTimeoutException` after the given timeout has expired.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- *
- * This method allows the user to control the flow speed using `Future`
- */
- def store[T](item: T, timeout: Timeout): Future[Unit] = {
- context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
- }
-}
-
-/**
- * :: DeveloperApi ::
- * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
- * processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- * class MyActor extends JavaActorReceiver {
- * @Override
- * public void onReceive(Object msg) throws Exception {
- * store((String) msg);
- * }
- * }
- *
- * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of push block and InputDStream
- * should be same.
- */
-@DeveloperApi
-abstract class JavaActorReceiver extends UntypedActor {
-
- /** Store an iterator of received data as a data block into Spark's memory. */
- def store[T](iter: Iterator[T]) {
- context.parent ! IteratorData(iter)
- }
-
- /**
- * Store the bytes of received data as a data block into Spark's memory. Note
- * that the data in the ByteBuffer must be serialized using the same serializer
- * that Spark is configured to use.
- */
- def store(bytes: ByteBuffer) {
- context.parent ! ByteBufferData(bytes)
- }
-
- /**
- * Store a single item of received data to Spark's memory.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- */
- def store[T](item: T) {
- context.parent ! SingleItemData(item)
- }
-
- /**
- * Store a single item of received data to Spark's memory and returns a `Future`.
- * The `Future` will be completed when the operator finishes, or with an
- * `akka.pattern.AskTimeoutException` after the given timeout has expired.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- *
- * This method allows the user to control the flow speed using `Future`
- */
- def store[T](item: T, timeout: Timeout): Future[Unit] = {
- context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `AkkaUtils.createStream` and
- * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
- */
-@DeveloperApi
-case class Statistics(numberOfMsgs: Int,
- numberOfWorkers: Int,
- numberOfHiccups: Int,
- otherInfo: String)
-
-/** Case class to receive data sent by child actors */
-private[akka] sealed trait ActorReceiverData
-private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
-private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
-private[akka] object Ack extends ActorReceiverData
-
-/**
- * Provides Actors as receivers for receiving stream.
- *
- * As Actors can also be used to receive data from almost any stream source.
- * A nice set of abstraction(s) for actors as receivers is already provided for
- * a few general cases. It is thus exposed as an API where user may come with
- * their own Actor to run as receiver for Spark Streaming input source.
- *
- * This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
- *
- * Here's a way to start more supervisor/workers as its children.
- *
- * @example {{{
- * context.parent ! Props(new Supervisor)
- * }}} OR {{{
- * context.parent ! Props(new Worker, "Worker")
- * }}}
- */
-private[akka] class ActorReceiverSupervisor[T: ClassTag](
- actorSystemCreator: () => ActorSystem,
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- receiverSupervisorStrategy: SupervisorStrategy
- ) extends Receiver[T](storageLevel) with Logging {
-
- private lazy val actorSystem = actorSystemCreator()
- protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
- "Supervisor" + streamId)
-
- class Supervisor extends Actor {
-
- override val supervisorStrategy = receiverSupervisorStrategy
- private val worker = context.actorOf(props, name)
- logInfo("Started receiver worker at:" + worker.path)
-
- private val n: AtomicInteger = new AtomicInteger(0)
- private val hiccups: AtomicInteger = new AtomicInteger(0)
-
- override def receive: PartialFunction[Any, Unit] = {
-
- case IteratorData(iterator) =>
- logDebug("received iterator")
- store(iterator.asInstanceOf[Iterator[T]])
-
- case SingleItemData(msg) =>
- logDebug("received single")
- store(msg.asInstanceOf[T])
- n.incrementAndGet
-
- case AskStoreSingleItemData(msg) =>
- logDebug("received single sync")
- store(msg.asInstanceOf[T])
- n.incrementAndGet
- sender() ! Ack
-
- case ByteBufferData(bytes) =>
- logDebug("received bytes")
- store(bytes)
-
- case props: Props =>
- val worker = context.actorOf(props)
- logInfo("Started receiver worker at:" + worker.path)
- sender ! worker
-
- case (props: Props, name: String) =>
- val worker = context.actorOf(props, name)
- logInfo("Started receiver worker at:" + worker.path)
- sender ! worker
-
- case _: PossiblyHarmful => hiccups.incrementAndGet()
-
- case _: Statistics =>
- val workers = context.children
- sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
-
- }
- }
-
- def onStart(): Unit = {
- actorSupervisor
- logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
- }
-
- def onStop(): Unit = {
- actorSupervisor ! PoisonPill
- actorSystem.shutdown()
- actorSystem.awaitTermination()
- }
-}
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
deleted file mode 100644
index 38c35c5ae7..0000000000
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import scala.reflect.ClassTag
-
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-
-import org.apache.spark.api.java.function.{Function0 => JFunction0}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object AkkaUtils {
-
- /**
- * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
- *
- * @param ssc The StreamingContext instance
- * @param propsForActor Props object defining creation of the actor
- * @param actorName Name of the actor
- * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping (default:
- * ActorReceiver.defaultActorSystemCreator)
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of data received and createStream
- * should be same.
- */
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- propsForActor: Props,
- actorName: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
- supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
- ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
- val cleanF = ssc.sc.clean(actorSystemCreator)
- ssc.receiverStream(new ActorReceiverSupervisor[T](
- cleanF,
- propsForActor,
- actorName,
- storageLevel,
- supervisorStrategy))
- }
-
- /**
- * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
- *
- * @param jssc The StreamingContext instance
- * @param propsForActor Props object defining creation of the actor
- * @param actorName Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping.
- * @param supervisorStrategy the supervisor strategy
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of data received and createStream
- * should be same.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- propsForActor: Props,
- actorName: String,
- storageLevel: StorageLevel,
- actorSystemCreator: JFunction0[ActorSystem],
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- createStream[T](
- jssc.ssc,
- propsForActor,
- actorName,
- storageLevel,
- () => actorSystemCreator.call(),
- supervisorStrategy)
- }
-
- /**
- * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
- *
- * @param jssc The StreamingContext instance
- * @param propsForActor Props object defining creation of the actor
- * @param actorName Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of data received and createStream
- * should be same.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- propsForActor: Props,
- actorName: String,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
- }
-
- /**
- * Create an input stream with a user-defined actor. Storage level of the data will be the default
- * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
- *
- * @param jssc The StreamingContext instance
- * @param propsForActor Props object defining creation of the actor
- * @param actorName Name of the actor
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e. parametrized type of data received and createStream
- * should be same.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- propsForActor: Props,
- actorName: String
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- createStream[T](jssc.ssc, propsForActor, actorName)
- }
-}
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
deleted file mode 100644
index ac5ef31c8b..0000000000
--- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.SupervisorStrategy;
-import akka.util.Timeout;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function0;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
-public class JavaAkkaUtilsSuite {
-
- @Test // tests the API, does not actually test data receiving
- public void testAkkaUtils() {
- JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- try {
- JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
- jsc, Props.create(JavaTestActor.class), "test");
- JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
- jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
- jsc,
- Props.create(JavaTestActor.class),
- "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
- new ActorSystemCreatorForTest(),
- SupervisorStrategy.defaultStrategy());
- } finally {
- jsc.stop();
- }
- }
-}
-
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
- @Override
- public ActorSystem call() {
- return null;
- }
-}
-
-
-class JavaTestActor extends JavaActorReceiver {
- @Override
- public void onReceive(Object message) throws Exception {
- store((String) message);
- store((String) message, new Timeout(1000));
- }
-}
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
deleted file mode 100644
index ce95d9dd72..0000000000
--- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.akka
-
-import scala.concurrent.duration._
-
-import akka.actor.{Props, SupervisorStrategy}
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class AkkaUtilsSuite extends SparkFunSuite {
-
- test("createStream") {
- val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
- try {
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc, Props[TestActor](), "test")
- val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc,
- Props[TestActor](),
- "test",
- StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy = SupervisorStrategy.defaultStrategy)
- val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
- val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
- val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
- ssc,
- Props[TestActor](),
- "test",
- StorageLevel.MEMORY_AND_DISK_SER_2,
- () => null,
- SupervisorStrategy.defaultStrategy)
- } finally {
- ssc.stop()
- }
- }
-}
-
-class TestActor extends ActorReceiver {
- override def receive: Receive = {
- case m: String => store(m)
- case m => store(m, 10.seconds)
- }
-}
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
deleted file mode 100644
index ac15b93c04..0000000000
--- a/external/flume-assembly/pom.xml
+++ /dev/null
@@ -1,168 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume-assembly_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project External Flume Assembly</name>
- <url>http://spark.apache.org/</url>
-
- <properties>
- <hadoop.deps.scope>provided</hadoop.deps.scope>
- <sbt.project.name>streaming-flume-assembly</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <!--
- Demote already included in the Spark assembly. These are transitive dependencies of flume
- or spark-streaming-flume, and this need to be explicitly included even through the parent
- pom may declare them with ${hadoop.deps.scope}.
- -->
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-ipc</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <classifier>${avro.mapred.classifier}</classifier>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resource>log4j.properties</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <profiles>
- <profile>
- <id>flume-provided</id>
- <properties>
- <flume.deps.scope>provided</flume.deps.scope>
- </properties>
- </profile>
- </profiles>
-</project>
-
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
deleted file mode 100644
index e4effe158c..0000000000
--- a/external/flume-sink/pom.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume-sink_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-flume-sink</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Flume Sink</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- <exclusions>
- <!-- Guava is excluded to avoid its use in this module. -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <!--
- Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
- dependency.
- -->
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
- <!-- Add Guava in test scope since flume actually needs it. -->
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <!--
- Netty explicitly added in test as it has been excluded from
- Flume dependency (to avoid runtime problems when running with
- Spark) but unit tests need it. Version of Netty on which
- Flume 1.4.0 depends on is "3.4.0.Final" .
- -->
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.4.0.Final</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-maven-plugin</artifactId>
- <version>${avro.version}</version>
- <configuration>
- <!-- Generate the output in the same directory as the sbt-avro-plugin -->
- <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
- </configuration>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>idl-protocol</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <!-- Disable all relocations defined in the parent pom. -->
- <relocations combine.self="override" />
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
deleted file mode 100644
index 8806e863ac..0000000000
--- a/external/flume-sink/src/main/avro/sparkflume.avdl
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-@namespace("org.apache.spark.streaming.flume.sink")
-
-protocol SparkFlumeProtocol {
-
- record SparkSinkEvent {
- map<string> headers;
- bytes body;
- }
-
- record EventBatch {
- string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
- string sequenceNumber;
- array<SparkSinkEvent> events;
- }
-
- EventBatch getEventBatch (int n);
-
- void ack (string sequenceNumber);
-
- void nack (string sequenceNumber);
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
deleted file mode 100644
index 09d3fe91e4..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import org.slf4j.{Logger, LoggerFactory}
-
-/**
- * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
- * The org.apache.spark.Logging is not used so that all of Spark is not brought
- * in as a dependency.
- */
-private[sink] trait Logging {
- // Make the log field transient so that objects with Logging can
- // be serialized and used on another machine
- @transient private var _log: Logger = null
-
- // Method to get or create the logger for this object
- protected def log: Logger = {
- if (_log == null) {
- initializeIfNecessary()
- var className = this.getClass.getName
- // Ignore trailing $'s in the class names for Scala objects
- if (className.endsWith("$")) {
- className = className.substring(0, className.length - 1)
- }
- _log = LoggerFactory.getLogger(className)
- }
- _log
- }
-
- // Log methods that take only a String
- protected def logInfo(msg: => String) {
- if (log.isInfoEnabled) log.info(msg)
- }
-
- protected def logDebug(msg: => String) {
- if (log.isDebugEnabled) log.debug(msg)
- }
-
- protected def logTrace(msg: => String) {
- if (log.isTraceEnabled) log.trace(msg)
- }
-
- protected def logWarning(msg: => String) {
- if (log.isWarnEnabled) log.warn(msg)
- }
-
- protected def logError(msg: => String) {
- if (log.isErrorEnabled) log.error(msg)
- }
-
- // Log methods that take Throwables (Exceptions/Errors) too
- protected def logInfo(msg: => String, throwable: Throwable) {
- if (log.isInfoEnabled) log.info(msg, throwable)
- }
-
- protected def logDebug(msg: => String, throwable: Throwable) {
- if (log.isDebugEnabled) log.debug(msg, throwable)
- }
-
- protected def logTrace(msg: => String, throwable: Throwable) {
- if (log.isTraceEnabled) log.trace(msg, throwable)
- }
-
- protected def logWarning(msg: => String, throwable: Throwable) {
- if (log.isWarnEnabled) log.warn(msg, throwable)
- }
-
- protected def logError(msg: => String, throwable: Throwable) {
- if (log.isErrorEnabled) log.error(msg, throwable)
- }
-
- protected def isTraceEnabled(): Boolean = {
- log.isTraceEnabled
- }
-
- private def initializeIfNecessary() {
- if (!Logging.initialized) {
- Logging.initLock.synchronized {
- if (!Logging.initialized) {
- initializeLogging()
- }
- }
- }
- }
-
- private def initializeLogging() {
- Logging.initialized = true
-
- // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
- // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
- log
- }
-}
-
-private[sink] object Logging {
- @volatile private var initialized = false
- val initLock = new Object()
- try {
- // We use reflection here to handle the case where users remove the
- // slf4j-to-jul bridge order to route their logs to JUL.
- // scalastyle:off classforname
- val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
- // scalastyle:on classforname
- bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
- val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
- if (!installed) {
- bridgeClass.getMethod("install").invoke(null)
- }
- } catch {
- case e: ClassNotFoundException => // can't log anything yet so just fail silently
- }
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
deleted file mode 100644
index 719fca0938..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.util.UUID
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.mutable
-
-import org.apache.flume.Channel
-
-/**
- * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
- * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
- * @param threads Number of threads to use to process requests.
- * @param channel The channel that the sink pulls events from
- * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
- * is rolled back.
- */
-// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
-// rolled back from the thread it was originally created in. So each getEvents call from Spark
-// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
-// and events are pulled off the channel. Once the events are sent to spark,
-// that thread is blocked and the TransactionProcessor is saved in a map,
-// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
-// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
-// unblocked, at which point the transaction is committed or rolled back.
-
-private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
- val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
- val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
- new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
- // Protected by `sequenceNumberToProcessor`
- private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
- // This sink will not persist sequence numbers and reuses them if it gets restarted.
- // So it is possible to commit a transaction which may have been meant for the sink before the
- // restart.
- // Since the new txn may not have the same sequence number we must guard against accidentally
- // committing a new transaction. To reduce the probability of that happening a random string is
- // prepended to the sequence number. Does not change for life of sink
- private val seqBase = UUID.randomUUID().toString.substring(0, 8)
- private val seqCounter = new AtomicLong(0)
-
- // Protected by `sequenceNumberToProcessor`
- private var stopped = false
-
- @volatile private var isTest = false
- private var testLatch: CountDownLatch = null
-
- /**
- * Returns a bunch of events to Spark over Avro RPC.
- * @param n Maximum number of events to return in a batch
- * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
- */
- override def getEventBatch(n: Int): EventBatch = {
- logDebug("Got getEventBatch call from Spark.")
- val sequenceNumber = seqBase + seqCounter.incrementAndGet()
- createProcessor(sequenceNumber, n) match {
- case Some(processor) =>
- transactionExecutorOpt.foreach(_.submit(processor))
- // Wait until a batch is available - will be an error if error message is non-empty
- val batch = processor.getEventBatch
- if (SparkSinkUtils.isErrorBatch(batch)) {
- // Remove the processor if it is an error batch since no ACK is sent.
- removeAndGetProcessor(sequenceNumber)
- logWarning("Received an error batch - no events were received from channel! ")
- }
- batch
- case None =>
- new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
- }
- }
-
- private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
- sequenceNumberToProcessor.synchronized {
- if (!stopped) {
- val processor = new TransactionProcessor(
- channel, seq, n, transactionTimeout, backOffInterval, this)
- sequenceNumberToProcessor.put(seq, processor)
- if (isTest) {
- processor.countDownWhenBatchAcked(testLatch)
- }
- Some(processor)
- } else {
- None
- }
- }
- }
-
- /**
- * Called by Spark to indicate successful commit of a batch
- * @param sequenceNumber The sequence number of the event batch that was successful
- */
- override def ack(sequenceNumber: CharSequence): Void = {
- logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
- completeTransaction(sequenceNumber, success = true)
- null
- }
-
- /**
- * Called by Spark to indicate failed commit of a batch
- * @param sequenceNumber The sequence number of the event batch that failed
- * @return
- */
- override def nack(sequenceNumber: CharSequence): Void = {
- completeTransaction(sequenceNumber, success = false)
- logInfo("Spark failed to commit transaction. Will reattempt events.")
- null
- }
-
- /**
- * Helper method to commit or rollback a transaction.
- * @param sequenceNumber The sequence number of the batch that was completed
- * @param success Whether the batch was successful or not.
- */
- private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
- removeAndGetProcessor(sequenceNumber).foreach(processor => {
- processor.batchProcessed(success)
- })
- }
-
- /**
- * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
- * @param sequenceNumber
- * @return An `Option` of the transaction processor for the corresponding batch. Note that this
- * instance is no longer tracked and the caller is responsible for that txn processor.
- */
- private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
- Option[TransactionProcessor] = {
- sequenceNumberToProcessor.synchronized {
- sequenceNumberToProcessor.remove(sequenceNumber.toString)
- }
- }
-
- private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
- testLatch = latch
- isTest = true
- }
-
- /**
- * Shuts down the executor used to process transactions.
- */
- def shutdown() {
- logInfo("Shutting down Spark Avro Callback Handler")
- sequenceNumberToProcessor.synchronized {
- stopped = true
- sequenceNumberToProcessor.values.foreach(_.shutdown())
- }
- transactionExecutorOpt.foreach(_.shutdownNow())
- }
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
deleted file mode 100644
index 14dffb15fe..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.net.InetSocketAddress
-import java.util.concurrent._
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.Context
-import org.apache.flume.Sink.Status
-import org.apache.flume.conf.{Configurable, ConfigurationException}
-import org.apache.flume.sink.AbstractSink
-
-/**
- * A sink that uses Avro RPC to run a server that can be polled by Spark's
- * FlumePollingInputDStream. This sink has the following configuration parameters:
- *
- * hostname - The hostname to bind to. Default: 0.0.0.0
- * port - The port to bind to. (No default - mandatory)
- * timeout - Time in seconds after which a transaction is rolled back,
- * if an ACK is not received from Spark within that time
- * threads - Number of threads to use to receive requests from Spark (Default: 10)
- *
- * This sink is unlike other Flume sinks in the sense that it does not push data,
- * instead the process method in this sink simply blocks the SinkRunner the first time it is
- * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
- *
- * Each time a getEventBatch call comes, creates a transaction and reads events
- * from the channel. When enough events are read, the events are sent to the Spark receiver and
- * the thread itself is blocked and a reference to it saved off.
- *
- * When the ack for that batch is received,
- * the thread which created the transaction is is retrieved and it commits the transaction with the
- * channel from the same thread it was originally created in (since Flume transactions are
- * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
- * is received within the specified timeout, the transaction is rolled back too. If an ack comes
- * after that, it is simply ignored and the events get re-sent.
- *
- */
-
-class SparkSink extends AbstractSink with Logging with Configurable {
-
- // Size of the pool to use for holding transaction processors.
- private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
-
- // Timeout for each transaction. If spark does not respond in this much time,
- // rollback the transaction
- private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
-
- // Address info to bind on
- private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
- private var port: Int = 0
-
- private var backOffInterval: Int = 200
-
- // Handle to the server
- private var serverOpt: Option[NettyServer] = None
-
- // The handler that handles the callback from Avro
- private var handler: Option[SparkAvroCallbackHandler] = None
-
- // Latch that blocks off the Flume framework from wasting 1 thread.
- private val blockingLatch = new CountDownLatch(1)
-
- override def start() {
- logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
- hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
- transactionTimeout + ".")
- handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
- backOffInterval))
- val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
- // Using the constructor that takes specific thread-pools requires bringing in netty
- // dependencies which are being excluded in the build. In practice,
- // Netty dependencies are already available on the JVM as Flume would have pulled them in.
- serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
- serverOpt.foreach(server => {
- logInfo("Starting Avro server for sink: " + getName)
- server.start()
- })
- super.start()
- }
-
- override def stop() {
- logInfo("Stopping Spark Sink: " + getName)
- handler.foreach(callbackHandler => {
- callbackHandler.shutdown()
- })
- serverOpt.foreach(server => {
- logInfo("Stopping Avro Server for sink: " + getName)
- server.close()
- server.join()
- })
- blockingLatch.countDown()
- super.stop()
- }
-
- override def configure(ctx: Context) {
- import SparkSinkConfig._
- hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
- port = Option(ctx.getInteger(CONF_PORT)).
- getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
- poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
- transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
- backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
- logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
- "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
- "backoffInterval: " + backOffInterval)
- }
-
- override def process(): Status = {
- // This method is called in a loop by the Flume framework - block it until the sink is
- // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
- // being shut down.
- logInfo("Blocking Sink Runner, sink will continue to run..")
- blockingLatch.await()
- Status.BACKOFF
- }
-
- private[flume] def getPort(): Int = {
- serverOpt
- .map(_.getPort)
- .getOrElse(
- throw new RuntimeException("Server was not started!")
- )
- }
-
- /**
- * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
- * batch is received. The test can simply call await on this latch till the expected number of
- * batches are received.
- * @param latch
- */
- private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
- handler.foreach(_.countDownWhenBatchAcked(latch))
- }
-}
-
-/**
- * Configuration parameters and their defaults.
- */
-private[flume]
-object SparkSinkConfig {
- val THREADS = "threads"
- val DEFAULT_THREADS = 10
-
- val CONF_TRANSACTION_TIMEOUT = "timeout"
- val DEFAULT_TRANSACTION_TIMEOUT = 60
-
- val CONF_HOSTNAME = "hostname"
- val DEFAULT_HOSTNAME = "0.0.0.0"
-
- val CONF_PORT = "port"
-
- val CONF_BACKOFF_INTERVAL = "backoffInterval"
- val DEFAULT_BACKOFF_INTERVAL = 200
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
deleted file mode 100644
index 845fc8debd..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.util.concurrent.ThreadFactory
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * Thread factory that generates daemon threads with a specified name format.
- */
-private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
-
- private val threadId = new AtomicLong()
-
- override def newThread(r: Runnable): Thread = {
- val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
- t.setDaemon(true)
- t
- }
-
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
deleted file mode 100644
index 47c0e294d6..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-private[flume] object SparkSinkUtils {
- /**
- * This method determines if this batch represents an error or not.
- * @param batch - The batch to check
- * @return - true if the batch represents an error
- */
- def isErrorBatch(batch: EventBatch): Boolean = {
- !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
- }
-}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
deleted file mode 100644
index b15c2097e5..0000000000
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
-
-import scala.util.control.Breaks
-
-import org.apache.flume.{Channel, Transaction}
-
-// Flume forces transactions to be thread-local (horrible, I know!)
-// So the sink basically spawns a new thread to pull the events out within a transaction.
-// The thread fills in the event batch object that is set before the thread is scheduled.
-// After filling it in, the thread waits on a condition - which is released only
-// when the success message comes back for the specific sequence number for that event batch.
-/**
- * This class represents a transaction on the Flume channel. This class runs a separate thread
- * which owns the transaction. The thread is blocked until the success call for that transaction
- * comes back with an ACK or NACK.
- * @param channel The channel from which to pull events
- * @param seqNum The sequence number to use for the transaction. Must be unique
- * @param maxBatchSize The maximum number of events to process per batch
- * @param transactionTimeout Time in seconds after which a transaction must be rolled back
- * without waiting for an ACK from Spark
- * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
- */
-private class TransactionProcessor(val channel: Channel, val seqNum: String,
- var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
- val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
-
- // If a real batch is not returned, we always have to return an error batch.
- @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
- util.Collections.emptyList())
-
- // Synchronization primitives
- val batchGeneratedLatch = new CountDownLatch(1)
- val batchAckLatch = new CountDownLatch(1)
-
- // Sanity check to ensure we don't loop like crazy
- val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
-
- // OK to use volatile, since the change would only make this true (otherwise it will be
- // changed to false - we never apply a negation operation to this) - which means the transaction
- // succeeded.
- @volatile private var batchSuccess = false
-
- @volatile private var stopped = false
-
- @volatile private var isTest = false
-
- private var testLatch: CountDownLatch = null
-
- // The transaction that this processor would handle
- var txOpt: Option[Transaction] = None
-
- /**
- * Get an event batch from the channel. This method will block until a batch of events is
- * available from the channel. If no events are available after a large number of attempts of
- * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
- *
- * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
- * maximum of maxBatchSize events
- */
- def getEventBatch: EventBatch = {
- batchGeneratedLatch.await()
- eventBatch
- }
-
- /**
- * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
- * method is a no-op if it is called after transactionTimeout has expired since
- * getEventBatch returned a batch of events.
- * @param success True if an ACK was received and the transaction should be committed, else false.
- */
- def batchProcessed(success: Boolean) {
- logDebug("Batch processed for sequence number: " + seqNum)
- batchSuccess = success
- batchAckLatch.countDown()
- }
-
- private[flume] def shutdown(): Unit = {
- logDebug("Shutting down transaction processor")
- stopped = true
- }
-
- /**
- * Populates events into the event batch. If the batch cannot be populated,
- * this method will not set the events into the event batch, but it sets an error message.
- */
- private def populateEvents() {
- try {
- txOpt = Option(channel.getTransaction)
- if(txOpt.isEmpty) {
- eventBatch.setErrorMsg("Something went wrong. Channel was " +
- "unable to create a transaction!")
- }
- txOpt.foreach(tx => {
- tx.begin()
- val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
- val loop = new Breaks
- var gotEventsInThisTxn = false
- var loopCounter: Int = 0
- loop.breakable {
- while (!stopped && events.size() < maxBatchSize
- && loopCounter < totalAttemptsToRemoveFromChannel) {
- loopCounter += 1
- Option(channel.take()) match {
- case Some(event) =>
- events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
- ByteBuffer.wrap(event.getBody)))
- gotEventsInThisTxn = true
- case None =>
- if (!gotEventsInThisTxn && !stopped) {
- logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
- " the current transaction")
- TimeUnit.MILLISECONDS.sleep(backOffInterval)
- } else {
- loop.break()
- }
- }
- }
- }
- if (!gotEventsInThisTxn && !stopped) {
- val msg = "Tried several times, " +
- "but did not get any events from the channel!"
- logWarning(msg)
- eventBatch.setErrorMsg(msg)
- } else {
- // At this point, the events are available, so fill them into the event batch
- eventBatch = new EventBatch("", seqNum, events)
- }
- })
- } catch {
- case interrupted: InterruptedException =>
- // Don't pollute logs if the InterruptedException came from this being stopped
- if (!stopped) {
- logWarning("Error while processing transaction.", interrupted)
- }
- case e: Exception =>
- logWarning("Error while processing transaction.", e)
- eventBatch.setErrorMsg(e.getMessage)
- try {
- txOpt.foreach(tx => {
- rollbackAndClose(tx, close = true)
- })
- } finally {
- txOpt = None
- }
- } finally {
- batchGeneratedLatch.countDown()
- }
- }
-
- /**
- * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
- * this method commits the transaction with the channel. If the ACK does not come in within
- * that time or a NACK comes in, this method rolls back the transaction.
- */
- private def processAckOrNack() {
- batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
- txOpt.foreach(tx => {
- if (batchSuccess) {
- try {
- logDebug("Committing transaction")
- tx.commit()
- } catch {
- case e: Exception =>
- logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
- "back", e)
- rollbackAndClose(tx, close = false) // tx will be closed later anyway
- } finally {
- tx.close()
- if (isTest) {
- testLatch.countDown()
- }
- }
- } else {
- logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
- rollbackAndClose(tx, close = true)
- // This might have been due to timeout or a NACK. Either way the following call does not
- // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
- parent.removeAndGetProcessor(seqNum)
- }
- })
- }
-
- /**
- * Helper method to rollback and optionally close a transaction
- * @param tx The transaction to rollback
- * @param close Whether the transaction should be closed or not after rolling back
- */
- private def rollbackAndClose(tx: Transaction, close: Boolean) {
- try {
- logWarning("Spark was unable to successfully process the events. Transaction is being " +
- "rolled back.")
- tx.rollback()
- } catch {
- case e: Exception =>
- logError("Error rolling back transaction. Rollback may have failed!", e)
- } finally {
- if (close) {
- tx.close()
- }
- }
- }
-
- /**
- * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
- * @param inMap The map to be converted
- * @return The converted map
- */
- private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
- CharSequence] = {
- val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
- charSeqMap.putAll(inMap)
- charSeqMap
- }
-
- /**
- * When the thread is started it sets as many events as the batch size or less (if enough
- * events aren't available) into the eventBatch and object and lets any threads waiting on the
- * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
- * or for a specified timeout and commits or rolls back the transaction.
- * @return
- */
- override def call(): Void = {
- populateEvents()
- processAckOrNack()
- null
- }
-
- private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
- testLatch = latch
- isTest = true
- }
-}
diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties
deleted file mode 100644
index 42df8792f1..0000000000
--- a/external/flume-sink/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file streaming/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
deleted file mode 100644
index e8ca1e7163..0000000000
--- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.net.InetSocketAddress
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.event.EventBuilder
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-// Due to MNG-1378, there is not a way to include test dependencies transitively.
-// We cannot include Spark core tests as a dependency here because it depends on
-// Spark core main, which has too many dependencies to require here manually.
-// For this reason, we continue to use FunSuite and ignore the scalastyle checks
-// that fail if this is detected.
-// scalastyle:off
-import org.scalatest.FunSuite
-
-class SparkSinkSuite extends FunSuite {
-// scalastyle:on
-
- val eventsPerBatch = 1000
- val channelCapacity = 5000
-
- test("Success with ack") {
- val (channel, sink, latch) = initializeChannelAndSink()
- channel.start()
- sink.start()
-
- putEvents(channel, eventsPerBatch)
-
- val port = sink.getPort
- val address = new InetSocketAddress("0.0.0.0", port)
-
- val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
- val events = client.getEventBatch(1000)
- client.ack(events.getSequenceNumber)
- assert(events.getEvents.size() === 1000)
- latch.await(1, TimeUnit.SECONDS)
- assertChannelIsEmpty(channel)
- sink.stop()
- channel.stop()
- transceiver.close()
- }
-
- test("Failure with nack") {
- val (channel, sink, latch) = initializeChannelAndSink()
- channel.start()
- sink.start()
- putEvents(channel, eventsPerBatch)
-
- val port = sink.getPort
- val address = new InetSocketAddress("0.0.0.0", port)
-
- val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
- val events = client.getEventBatch(1000)
- assert(events.getEvents.size() === 1000)
- client.nack(events.getSequenceNumber)
- latch.await(1, TimeUnit.SECONDS)
- assert(availableChannelSlots(channel) === 4000)
- sink.stop()
- channel.stop()
- transceiver.close()
- }
-
- test("Failure with timeout") {
- val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
- .CONF_TRANSACTION_TIMEOUT -> 1.toString))
- channel.start()
- sink.start()
- putEvents(channel, eventsPerBatch)
- val port = sink.getPort
- val address = new InetSocketAddress("0.0.0.0", port)
-
- val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
- val events = client.getEventBatch(1000)
- assert(events.getEvents.size() === 1000)
- latch.await(1, TimeUnit.SECONDS)
- assert(availableChannelSlots(channel) === 4000)
- sink.stop()
- channel.stop()
- transceiver.close()
- }
-
- test("Multiple consumers") {
- testMultipleConsumers(failSome = false)
- }
-
- test("Multiple consumers with some failures") {
- testMultipleConsumers(failSome = true)
- }
-
- def testMultipleConsumers(failSome: Boolean): Unit = {
- implicit val executorContext = ExecutionContext
- .fromExecutorService(Executors.newFixedThreadPool(5))
- val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
- channel.start()
- sink.start()
- (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
- val port = sink.getPort
- val address = new InetSocketAddress("0.0.0.0", port)
- val transceiversAndClients = getTransceiverAndClient(address, 5)
- val batchCounter = new CountDownLatch(5)
- val counter = new AtomicInteger(0)
- transceiversAndClients.foreach(x => {
- Future {
- val client = x._2
- val events = client.getEventBatch(1000)
- if (!failSome || counter.getAndIncrement() % 2 == 0) {
- client.ack(events.getSequenceNumber)
- } else {
- client.nack(events.getSequenceNumber)
- throw new RuntimeException("Sending NACK for failure!")
- }
- events
- }.onComplete {
- case Success(events) =>
- assert(events.getEvents.size() === 1000)
- batchCounter.countDown()
- case Failure(t) =>
- // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
- batchCounter.countDown()
- }
- })
- batchCounter.await()
- latch.await(1, TimeUnit.SECONDS)
- executorContext.shutdown()
- if(failSome) {
- assert(availableChannelSlots(channel) === 3000)
- } else {
- assertChannelIsEmpty(channel)
- }
- sink.stop()
- channel.stop()
- transceiversAndClients.foreach(x => x._1.close())
- }
-
- private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
- batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
- val channel = new MemoryChannel()
- val channelContext = new Context()
-
- channelContext.put("capacity", channelCapacity.toString)
- channelContext.put("transactionCapacity", 1000.toString)
- channelContext.put("keep-alive", 0.toString)
- channelContext.putAll(overrides.asJava)
- channel.setName(scala.util.Random.nextString(10))
- channel.configure(channelContext)
-
- val sink = new SparkSink()
- val sinkContext = new Context()
- sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
- sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
- sink.configure(sinkContext)
- sink.setChannel(channel)
- val latch = new CountDownLatch(batchCounter)
- sink.countdownWhenBatchReceived(latch)
- (channel, sink, latch)
- }
-
- private def putEvents(ch: MemoryChannel, count: Int): Unit = {
- val tx = ch.getTransaction
- tx.begin()
- (1 to count).foreach(x =>
- ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
- tx.commit()
- tx.close()
- }
-
- private def getTransceiverAndClient(address: InetSocketAddress,
- count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
-
- (1 to count).map(_ => {
- lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
- new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
- lazy val channelFactory =
- new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
- val transceiver = new NettyTransceiver(address, channelFactory)
- val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
- (transceiver, client)
- })
- }
-
- private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
- assert(availableChannelSlots(channel) === channelCapacity)
- }
-
- private def availableChannelSlots(channel: MemoryChannel): Int = {
- val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
- queueRemaining.setAccessible(true)
- val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
- }
-}
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
deleted file mode 100644
index d650dd034d..0000000000
--- a/external/flume/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-flume</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Flume</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
deleted file mode 100644
index 5c773d4b07..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ObjectInput, ObjectOutput}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A simple object that provides the implementation of readExternal and writeExternal for both
- * the wrapper classes for Flume-style Events.
- */
-private[streaming] object EventTransformer extends Logging {
- def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
- Array[Byte]) = {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.readFully(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.readFully(keyBuff)
- val key: String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.readFully(valBuff)
- val value: String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
- (headers, bodyBuff)
- }
-
- def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
- body: Array[Byte]) {
- out.writeInt(body.length)
- out.write(body)
- val numHeaders = headers.size()
- out.writeInt(numHeaders)
- for ((k, v) <- headers.asScala) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
deleted file mode 100644
index 3555fa68b6..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.google.common.base.Throwables
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.flume.sink._
-
-/**
- * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
- * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
- * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
- *
- * @param receiver The receiver that owns this instance.
- */
-
-private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
- Logging {
-
- def run(): Unit = {
- while (!receiver.isStopped()) {
- val connection = receiver.getConnections.poll()
- val client = connection.client
- var batchReceived = false
- var seq: CharSequence = null
- try {
- getBatch(client) match {
- case Some(eventBatch) =>
- batchReceived = true
- seq = eventBatch.getSequenceNumber
- val events = toSparkFlumeEvents(eventBatch.getEvents)
- if (store(events)) {
- sendAck(client, seq)
- } else {
- sendNack(batchReceived, client, seq)
- }
- case None =>
- }
- } catch {
- case e: Exception =>
- Throwables.getRootCause(e) match {
- // If the cause was an InterruptedException, then check if the receiver is stopped -
- // if yes, just break out of the loop. Else send a Nack and log a warning.
- // In the unlikely case, the cause was not an Exception,
- // then just throw it out and exit.
- case interrupted: InterruptedException =>
- if (!receiver.isStopped()) {
- logWarning("Interrupted while receiving data from Flume", interrupted)
- sendNack(batchReceived, client, seq)
- }
- case exception: Exception =>
- logWarning("Error while receiving data from Flume", exception)
- sendNack(batchReceived, client, seq)
- }
- } finally {
- receiver.getConnections.add(connection)
- }
- }
- }
-
- /**
- * Gets a batch of events from the specified client. This method does not handle any exceptions
- * which will be propagated to the caller.
- * @param client Client to get events from
- * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
- */
- private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
- val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
- if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
- // No error, proceed with processing data
- logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
- s"number: ${eventBatch.getSequenceNumber}")
- Some(eventBatch)
- } else {
- logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
- eventBatch.getErrorMsg)
- None
- }
- }
-
- /**
- * Store the events in the buffer to Spark. This method will not propagate any exceptions,
- * but will propagate any other errors.
- * @param buffer The buffer to store
- * @return true if the data was stored without any exception being thrown, else false
- */
- private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
- try {
- receiver.store(buffer)
- true
- } catch {
- case e: Exception =>
- logWarning("Error while attempting to store data received from Flume", e)
- false
- }
- }
-
- /**
- * Send an ack to the client for the sequence number. This method does not handle any exceptions
- * which will be propagated to the caller.
- * @param client client to send the ack to
- * @param seq sequence number of the batch to be ack-ed.
- * @return
- */
- private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
- logDebug("Sending ack for sequence number: " + seq)
- client.ack(seq)
- logDebug("Ack sent for sequence number: " + seq)
- }
-
- /**
- * This method sends a Nack if a batch was received to the client with the given sequence
- * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
- * to handle it.
- * @param batchReceived true if a batch was received. If this is false, no nack is sent
- * @param client The client to which the nack should be sent
- * @param seq The sequence number of the batch that is being nack-ed.
- */
- private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
- seq: CharSequence): Unit = {
- if (batchReceived) {
- // Let Flume know that the events need to be pushed back into the channel.
- logDebug("Sending nack for sequence number: " + seq)
- client.nack(seq) // If the agent is down, even this could fail and throw
- logDebug("Nack sent for sequence number: " + seq)
- }
- }
-
- /**
- * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
- * @param events - Events to convert to SparkFlumeEvents
- * @return - The SparkFlumeEvent generated from SparkSinkEvent
- */
- private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
- ArrayBuffer[SparkFlumeEvent] = {
- // Convert each Flume event to a serializable SparkFlumeEvent
- val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
- var j = 0
- while (j < events.size()) {
- val event = events.get(j)
- val sparkFlumeEvent = new SparkFlumeEvent()
- sparkFlumeEvent.event.setBody(event.getBody)
- sparkFlumeEvent.event.setHeaders(event.getHeaders)
- buffer += sparkFlumeEvent
- j += 1
- }
- buffer
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
deleted file mode 100644
index 74bd0165c6..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-import java.util.concurrent.Executors
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status}
-import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- host: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
- override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel, enableDecompression)
- }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
- var event: AvroFlumeEvent = new AvroFlumeEvent()
-
- /* De-serialize from bytes. */
- def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.readFully(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.readFully(keyBuff)
- val key: String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.readFully(valBuff)
- val value: String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
-
- event.setBody(ByteBuffer.wrap(bodyBuff))
- event.setHeaders(headers)
- }
-
- /* Serialize to bytes. */
- def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
- val body = event.getBody
- out.writeInt(body.remaining())
- Utils.writeByteBuffer(body, out)
-
- val numHeaders = event.getHeaders.size()
- out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders.asScala) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
-
-private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
- val event = new SparkFlumeEvent
- event.event = in
- event
- }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
- override def append(event: AvroFlumeEvent): Status = {
- receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
- Status.OK
- }
-
- override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
- events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
- Status.OK
- }
-}
-
-/** A NetworkReceiver which listens for events using the
- * Flume Avro interface. */
-private[streaming]
-class FlumeReceiver(
- host: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
- lazy val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- var server: NettyServer = null
-
- private def initServer() = {
- if (enableDecompression) {
- val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool())
- val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
- new NettyServer(
- responder,
- new InetSocketAddress(host, port),
- channelFactory,
- channelPipelineFactory,
- null)
- } else {
- new NettyServer(responder, new InetSocketAddress(host, port))
- }
- }
-
- def onStart() {
- synchronized {
- if (server == null) {
- server = initServer()
- server.start()
- } else {
- logWarning("Flume receiver being asked to start more then once with out close")
- }
- }
- logInfo("Flume receiver started")
- }
-
- def onStop() {
- synchronized {
- if (server != null) {
- server.close()
- server = null
- }
- }
- logInfo("Flume receiver stopped")
- }
-
- override def preferredLocation: Option[String] = Option(host)
-
- /** A Netty Pipeline factory that will decompress incoming data from
- * and the Netty client and compress data going back to the client.
- *
- * The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
- */
- private[streaming]
- class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
- def getPipeline(): ChannelPipeline = {
- val pipeline = Channels.pipeline()
- val encoder = new ZlibEncoder(6)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- pipeline
- }
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
deleted file mode 100644
index d9c25e8654..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
- * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
- * @param _ssc Streaming context that will execute this input stream
- * @param addresses List of addresses at which SparkSinks are listening
- * @param maxBatchSize Maximum size of a batch
- * @param parallelism Number of parallel connections to open
- * @param storageLevel The storage level to use.
- * @tparam T Class type of the object of this stream
- */
-private[streaming] class FlumePollingInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- val addresses: Seq[InetSocketAddress],
- val maxBatchSize: Int,
- val parallelism: Int,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
- override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
- }
-}
-
-private[streaming] class FlumePollingReceiver(
- addresses: Seq[InetSocketAddress],
- maxBatchSize: Int,
- parallelism: Int,
- storageLevel: StorageLevel
- ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
- lazy val channelFactoryExecutor =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
- setNameFormat("Flume Receiver Channel Thread - %d").build())
-
- lazy val channelFactory =
- new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
-
- lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
-
- private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
-
- override def onStart(): Unit = {
- // Create the connections to each Flume agent.
- addresses.foreach(host => {
- val transceiver = new NettyTransceiver(host, channelFactory)
- val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
- connections.add(new FlumeConnection(transceiver, client))
- })
- for (i <- 0 until parallelism) {
- logInfo("Starting Flume Polling Receiver worker threads..")
- // Threads that pull data from Flume.
- receiverExecutor.submit(new FlumeBatchFetcher(this))
- }
- }
-
- override def onStop(): Unit = {
- logInfo("Shutting down Flume Polling Receiver")
- receiverExecutor.shutdown()
- // Wait upto a minute for the threads to die
- if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- receiverExecutor.shutdownNow()
- }
- connections.asScala.foreach(_.transceiver.close())
- channelFactory.releaseExternalResources()
- }
-
- private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
- this.connections
- }
-
- private[flume] def getMaxBatchSize: Int = {
- this.maxBatchSize
- }
-}
-
-/**
- * A wrapper around the transceiver and the Avro IPC API.
- * @param transceiver The transceiver to use for communication with Flume
- * @param client The client that the callbacks are received on.
- */
-private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
- val client: SparkFlumeProtocol.Callback)
-
-
-
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
deleted file mode 100644
index 945cfa7295..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList}
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
-
-import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class FlumeTestUtils {
-
- private var transceiver: NettyTransceiver = null
-
- private val testPort: Int = findFreePort()
-
- def getTestPort(): Int = testPort
-
- /** Find a free port */
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, new SparkConf())._2
- }
-
- /** Send data to the flume receiver */
- def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
- val testAddress = new InetSocketAddress("localhost", testPort)
-
- val inputEvents = input.asScala.map { item =>
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
- event.setHeaders(Collections.singletonMap("test", "header"))
- event
- }
-
- // if last attempted transceiver had succeeded, close it
- close()
-
- // Create transceiver
- transceiver = {
- if (enableCompression) {
- new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
- } else {
- new NettyTransceiver(testAddress)
- }
- }
-
- // Create Avro client with the transceiver
- val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
- if (client == null) {
- throw new AssertionError("Cannot create client")
- }
-
- // Send data
- val status = client.appendBatch(inputEvents.asJava)
- if (status != avro.Status.OK) {
- throw new AssertionError("Sent events unsuccessfully")
- }
- }
-
- def close(): Unit = {
- if (transceiver != null) {
- transceiver.close()
- transceiver = null
- }
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
deleted file mode 100644
index 3e3ed712f0..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ByteArrayOutputStream, DataOutputStream}
-import java.net.InetSocketAddress
-import java.util.{List => JList, Map => JMap}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.api.python.PythonRDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object FlumeUtils {
- private val DEFAULT_POLLING_PARALLELISM = 5
- private val DEFAULT_POLLING_BATCH_SIZE = 1000
-
- /**
- * Create a input stream from a Flume source.
- * @param ssc StreamingContext object
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream (
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createStream(ssc, hostname, port, storageLevel, false)
- }
-
- /**
- * Create a input stream from a Flume source.
- * @param ssc StreamingContext object
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- * @param enableDecompression should netty server decompress input stream
- */
- def createStream (
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](
- ssc, hostname, port, storageLevel, enableDecompression)
-
- inputStream
- }
-
- /**
- * Creates a input stream from a Flume source.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel, false)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- * @param enableDecompression should netty server decompress input stream
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Address of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param addresses List of InetSocketAddresses representing the hosts to connect to.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- addresses: Seq[InetSocketAddress],
- storageLevel: StorageLevel
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(ssc, addresses, storageLevel,
- DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * @param addresses List of InetSocketAddresses representing the hosts to connect to.
- * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
- * single RPC call
- * @param parallelism Number of concurrent requests this stream should send to the sink. Note
- * that having a higher number of requests concurrently being pulled will
- * result in this stream using more threads
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- addresses: Seq[InetSocketAddress],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
- parallelism, storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Hostname of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Hostname of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- addresses: Array[InetSocketAddress],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, addresses, storageLevel,
- DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * @param addresses List of InetSocketAddresses on which the Spark Sink is running
- * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
- * single RPC call
- * @param parallelism Number of concurrent requests this stream should send to the sink. Note
- * that having a higher number of requests concurrently being pulled will
- * result in this stream using more threads
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- addresses: Array[InetSocketAddress],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
- }
-}
-
-/**
- * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's FlumeUtils.
- */
-private[flume] class FlumeUtilsPythonHelper {
-
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): JavaPairDStream[Array[Byte], Array[Byte]] = {
- val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
- FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
- }
-
- def createPollingStream(
- jssc: JavaStreamingContext,
- hosts: JList[String],
- ports: JList[Int],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): JavaPairDStream[Array[Byte], Array[Byte]] = {
- assert(hosts.size() == ports.size())
- val addresses = hosts.asScala.zip(ports.asScala).map {
- case (host, port) => new InetSocketAddress(host, port)
- }
- val dstream = FlumeUtils.createPollingStream(
- jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
- FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
- }
-
-}
-
-private object FlumeUtilsPythonHelper {
-
- private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
- val byteStream = new ByteArrayOutputStream()
- val output = new DataOutputStream(byteStream)
- try {
- output.writeInt(map.size)
- map.asScala.foreach { kv =>
- PythonRDD.writeUTF(kv._1.toString, output)
- PythonRDD.writeUTF(kv._2.toString, output)
- }
- byteStream.toByteArray
- }
- finally {
- output.close()
- }
- }
-
- private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
- JavaPairDStream[Array[Byte], Array[Byte]] = {
- dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
- override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
- val event = sparkEvent.event
- val byteBuffer = event.getBody
- val body = new Array[Byte](byteBuffer.remaining())
- byteBuffer.get(body)
- (stringMapToByteArray(event.getHeaders), body)
- }
- })
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
deleted file mode 100644
index 1a96df6e94..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.nio.charset.StandardCharsets
-import java.util.{Collections, List => JList, Map => JMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flume.event.EventBuilder
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-
-import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class PollingFlumeTestUtils {
-
- private val batchCount = 5
- val eventsPerBatch = 100
- private val totalEventsPerChannel = batchCount * eventsPerBatch
- private val channelCapacity = 5000
-
- def getTotalEvents: Int = totalEventsPerChannel * channels.size
-
- private val channels = new ArrayBuffer[MemoryChannel]
- private val sinks = new ArrayBuffer[SparkSink]
-
- /**
- * Start a sink and return the port of this sink
- */
- def startSingleSink(): Int = {
- channels.clear()
- sinks.clear()
-
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- channels += (channel)
- sinks += sink
-
- sink.getPort()
- }
-
- /**
- * Start 2 sinks and return the ports
- */
- def startMultipleSinks(): Seq[Int] = {
- channels.clear()
- sinks.clear()
-
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val channel2 = new MemoryChannel()
- Configurables.configure(channel2, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- val sink2 = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink2, context)
- sink2.setChannel(channel2)
- sink2.start()
-
- sinks += sink
- sinks += sink2
- channels += channel
- channels += channel2
-
- sinks.map(_.getPort())
- }
-
- /**
- * Send data and wait until all data has been received
- */
- def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
- val executor = Executors.newCachedThreadPool()
- val executorCompletion = new ExecutorCompletionService[Void](executor)
-
- val latch = new CountDownLatch(batchCount * channels.size)
- sinks.foreach(_.countdownWhenBatchReceived(latch))
-
- channels.foreach(channel => {
- executorCompletion.submit(new TxnSubmitter(channel))
- })
-
- for (i <- 0 until channels.size) {
- executorCompletion.take()
- }
-
- latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
- }
-
- /**
- * A Python-friendly method to assert the output
- */
- def assertOutput(
- outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
- require(outputHeaders.size == outputBodies.size)
- val eventSize = outputHeaders.size
- if (eventSize != totalEventsPerChannel * channels.size) {
- throw new AssertionError(
- s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
- }
- var counter = 0
- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
- val eventBodyToVerify = s"${channels(k).getName}-$i"
- val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
- var found = false
- var j = 0
- while (j < eventSize && !found) {
- if (eventBodyToVerify == outputBodies.get(j) &&
- eventHeaderToVerify == outputHeaders.get(j)) {
- found = true
- counter += 1
- }
- j += 1
- }
- }
- if (counter != totalEventsPerChannel * channels.size) {
- throw new AssertionError(
- s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
- }
- }
-
- def assertChannelsAreEmpty(): Unit = {
- channels.foreach(assertChannelIsEmpty)
- }
-
- private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
- val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
- queueRemaining.setAccessible(true)
- val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
- throw new AssertionError(s"Channel ${channel.getName} is not empty")
- }
- }
-
- def close(): Unit = {
- sinks.foreach(_.stop())
- sinks.clear()
- channels.foreach(_.stop())
- channels.clear()
- }
-
- private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
- override def call(): Void = {
- var t = 0
- for (i <- 0 until batchCount) {
- val tx = channel.getTransaction
- tx.begin()
- for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody(
- s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
- Collections.singletonMap(s"test-$t", "header")))
- t += 1
- }
- tx.commit()
- tx.close()
- Thread.sleep(500) // Allow some time for the events to reach
- }
- null
- }
- }
-
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
deleted file mode 100644
index d31aa5f5c0..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark streaming receiver for Flume.
- */
-package org.apache.spark.streaming.flume; \ No newline at end of file
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
deleted file mode 100644
index 9bfab68c4b..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Spark streaming receiver for Flume.
- */
-package object flume
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91654..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- InetSocketAddress[] addresses = new InetSocketAddress[] {
- new InetSocketAddress("localhost", 12345)
- };
- JavaReceiverInputDStream<SparkFlumeEvent> test1 =
- FlumeUtils.createPollingStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
- ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index 3b5e0c7746..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2(), false);
- }
-}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27ca7c..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output.add(collected)
- }, false) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- ois.defaultReadObject()
- output.clear()
- }
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 10dcbf98bc..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val maxAttempts = 5
- val batchDuration = Seconds(1)
-
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
- val utils = new PollingFlumeTestUtils
-
- test("flume polling test") {
- testMultipleTimes(testFlumePolling)
- }
-
- test("flume polling test multiple hosts") {
- testMultipleTimes(testFlumePollingMultipleHost)
- }
-
- /**
- * Run the given test until no more java.net.BindException's are thrown.
- * Do this only up to a certain attempt limit.
- */
- private def testMultipleTimes(test: () => Unit): Unit = {
- var testPassed = false
- var attempt = 0
- while (!testPassed && attempt < maxAttempts) {
- try {
- test()
- testPassed = true
- } catch {
- case e: Exception if Utils.isBindCollision(e) =>
- logWarning("Exception when running flume polling test: " + e)
- attempt += 1
- }
- }
- assert(testPassed, s"Test failed after $attempt attempts!")
- }
-
- private def testFlumePolling(): Unit = {
- try {
- val port = utils.startSingleSink()
-
- writeAndVerify(Seq(port))
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- private def testFlumePollingMultipleHost(): Unit = {
- try {
- val ports = utils.startMultipleSinks()
- writeAndVerify(ports)
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- utils.eventsPerBatch, 5)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
-
- ssc.start()
- try {
- utils.sendDatAndEnsureAllDataHasBeenReceived()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutput = outputQueue.asScala.toSeq.flatten
- val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
- case (key, value) => (key.toString, value.toString)
- }).map(_.asJava)
- val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
- utils.assertOutput(headers.asJava, bodies.asJava)
- }
- } finally {
- ssc.stop()
- }
- }
-
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 38208c6518..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
- var ssc: StreamingContext = null
-
- test("flume input stream") {
- testFlumeStream(testCompression = false)
- }
-
- test("flume input compressed stream") {
- testFlumeStream(testCompression = true)
- }
-
- /** Run test on flume stream */
- private def testFlumeStream(testCompression: Boolean): Unit = {
- val input = (1 to 100).map { _.toString }
- val utils = new FlumeTestUtils
- try {
- val outputQueue = startContext(utils.getTestPort(), testCompression)
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- utils.writeInput(input.asJava, testCompression)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
- output should be (input)
- }
- } finally {
- if (ssc != null) {
- ssc.stop()
- }
- utils.close()
- }
- }
-
- /** Setup and start the streaming context */
- private def startContext(
- testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
- ssc = new StreamingContext(conf, Milliseconds(200))
- val flumeStream = FlumeUtils.createStream(
- ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
- ssc.start()
- outputQueue
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-}
diff --git a/external/mqtt-assembly/pom.xml b/external/mqtt-assembly/pom.xml
deleted file mode 100644
index ac2a3f65ed..0000000000
--- a/external/mqtt-assembly/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-mqtt-assembly_2.11</artifactId>
- <packaging>jar</packaging>
- <name>Spark Project External MQTT Assembly</name>
- <url>http://spark.apache.org/</url>
-
- <properties>
- <sbt.project.name>streaming-mqtt-assembly</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <!--
- Demote already included in the Spark assembly.
- -->
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro-mapred</artifactId>
- <classifier>${avro.mapred.classifier}</classifier>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
-
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <configuration>
- <shadedArtifactAttached>false</shadedArtifactAttached>
- <artifactSet>
- <includes>
- <include>*:*</include>
- </includes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
- <resource>reference.conf</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
- <resource>log4j.properties</resource>
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
- </transformers>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
deleted file mode 100644
index d0d968782c..0000000000
--- a/external/mqtt/pom.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-mqtt_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-mqtt</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External MQTT</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.0.2</version>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.7.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-
- <plugins>
- <!-- Assemble a jar with test dependencies for Python tests -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <executions>
- <execution>
- <id>test-jar-with-dependencies</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- <configuration>
- <!-- Make sure the file path is same as the sbt build -->
- <finalName>spark-streaming-mqtt-test-${project.version}</finalName>
- <outputDirectory>${project.build.directory}/scala-${scala.binary.version}/</outputDirectory>
- <appendAssemblyId>false</appendAssemblyId>
- <!-- Don't publish it since it's only for Python tests -->
- <attach>false</attach>
- <descriptors>
- <descriptor>src/main/assembly/assembly.xml</descriptor>
- </descriptors>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/external/mqtt/src/main/assembly/assembly.xml b/external/mqtt/src/main/assembly/assembly.xml
deleted file mode 100644
index c110b01b34..0000000000
--- a/external/mqtt/src/main/assembly/assembly.xml
+++ /dev/null
@@ -1,44 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<assembly>
- <id>test-jar-with-dependencies</id>
- <formats>
- <format>jar</format>
- </formats>
- <includeBaseDirectory>false</includeBaseDirectory>
-
- <fileSets>
- <fileSet>
- <directory>${project.build.directory}/scala-${scala.binary.version}/test-classes</directory>
- <outputDirectory></outputDirectory>
- </fileSet>
- </fileSets>
-
- <dependencySets>
- <dependencySet>
- <useTransitiveDependencies>true</useTransitiveDependencies>
- <scope>test</scope>
- <unpack>true</unpack>
- <excludes>
- <exclude>org.apache.hadoop:*:jar</exclude>
- <exclude>org.apache.zookeeper:*:jar</exclude>
- <exclude>org.apache.avro:*:jar</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
-
-</assembly>
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
deleted file mode 100644
index cbad6f7fe4..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import java.nio.charset.StandardCharsets
-
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
-import org.eclipse.paho.client.mqttv3.MqttCallback
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * Input stream that subscribe messages from a Mqtt Broker.
- * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/
- * @param brokerUrl Url of remote mqtt publisher
- * @param topic topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
-
-private[streaming]
-class MQTTInputDStream(
- _ssc: StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[String](_ssc) {
-
- private[streaming] override def name: String = s"MQTT stream [$id]"
-
- def getReceiver(): Receiver[String] = {
- new MQTTReceiver(brokerUrl, topic, storageLevel)
- }
-}
-
-private[streaming]
-class MQTTReceiver(
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ) extends Receiver[String](storageLevel) {
-
- def onStop() {
-
- }
-
- def onStart() {
-
- // Set up persistence for messages
- val persistence = new MemoryPersistence()
-
- // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance
- val client = new MqttClient(brokerUrl, MqttClient.generateClientId(), persistence)
-
- // Callback automatically triggers as and when new message arrives on specified topic
- val callback = new MqttCallback() {
-
- // Handles Mqtt message
- override def messageArrived(topic: String, message: MqttMessage) {
- store(new String(message.getPayload(), StandardCharsets.UTF_8))
- }
-
- override def deliveryComplete(token: IMqttDeliveryToken) {
- }
-
- override def connectionLost(cause: Throwable) {
- restart("Connection lost ", cause)
- }
- }
-
- // Set up callback for MqttClient. This needs to happen before
- // connecting or subscribing, otherwise messages may be lost
- client.setCallback(callback)
-
- // Connect to MqttBroker
- client.connect()
-
- // Subscribe to Mqtt topic
- client.subscribe(topic)
-
- }
-}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
deleted file mode 100644
index 7b8d56d6fa..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object MQTTUtils {
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param ssc StreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- */
- def createStream(
- ssc: StreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[String] = {
- new MQTTInputDStream(ssc, brokerUrl, topic, storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- */
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String
- ): JavaReceiverInputDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, brokerUrl, topic)
- }
-
- /**
- * Create an input stream that receives messages pushed by a MQTT publisher.
- * @param jssc JavaStreamingContext object
- * @param brokerUrl Url of remote MQTT publisher
- * @param topic Topic name to subscribe to
- * @param storageLevel RDD storage level.
- */
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[String] = {
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- createStream(jssc.ssc, brokerUrl, topic, storageLevel)
- }
-}
-
-/**
- * This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's MQTTUtils.
- */
-private[mqtt] class MQTTUtilsPythonHelper {
-
- def createStream(
- jssc: JavaStreamingContext,
- brokerUrl: String,
- topic: String,
- storageLevel: StorageLevel
- ): JavaDStream[String] = {
- MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
- }
-}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
deleted file mode 100644
index 728e0d8663..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * MQTT receiver for Spark Streaming.
- */
-package org.apache.spark.streaming.mqtt; \ No newline at end of file
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
deleted file mode 100644
index 63d0d13818..0000000000
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * MQTT receiver for Spark Streaming.
- */
-package object mqtt
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
deleted file mode 100644
index ce5aa1e0cd..0000000000
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testMQTTStream() {
- String brokerUrl = "abc";
- String topic = "def";
-
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
- JavaReceiverInputDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- }
-}
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/mqtt/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
deleted file mode 100644
index fdcd18c6fb..0000000000
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter {
-
- private val batchDuration = Milliseconds(500)
- private val master = "local[2]"
- private val framework = this.getClass.getSimpleName
- private val topic = "def"
-
- private var ssc: StreamingContext = _
- private var mqttTestUtils: MQTTTestUtils = _
-
- before {
- ssc = new StreamingContext(master, framework, batchDuration)
- mqttTestUtils = new MQTTTestUtils
- mqttTestUtils.setup()
- }
-
- after {
- if (ssc != null) {
- ssc.stop()
- ssc = null
- }
- if (mqttTestUtils != null) {
- mqttTestUtils.teardown()
- mqttTestUtils = null
- }
- }
-
- test("mqtt input stream") {
- val sendMessage = "MQTT demo for spark streaming"
- val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + mqttTestUtils.brokerUri, topic,
- StorageLevel.MEMORY_ONLY)
-
- @volatile var receiveMessage: List[String] = List()
- receiveStream.foreachRDD { rdd =>
- if (rdd.collect.length > 0) {
- receiveMessage = receiveMessage ::: List(rdd.first)
- receiveMessage
- }
- }
-
- ssc.start()
-
- // Retry it because we don't know when the receiver will start.
- eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- mqttTestUtils.publishData(topic, sendMessage)
- assert(sendMessage.equals(receiveMessage(0)))
- }
- ssc.stop()
- }
-}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
deleted file mode 100644
index 3680c13605..0000000000
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.mqtt
-
-import java.net.{ServerSocket, URI}
-import java.nio.charset.StandardCharsets
-
-import scala.language.postfixOps
-
-import org.apache.activemq.broker.{BrokerService, TransportConnector}
-import org.apache.commons.lang3.RandomUtils
-import org.eclipse.paho.client.mqttv3._
-import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.Utils
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[mqtt] class MQTTTestUtils extends Logging {
-
- private val persistenceDir = Utils.createTempDir()
- private val brokerHost = "localhost"
- private val brokerPort = findFreePort()
-
- private var broker: BrokerService = _
- private var connector: TransportConnector = _
-
- def brokerUri: String = {
- s"$brokerHost:$brokerPort"
- }
-
- def setup(): Unit = {
- broker = new BrokerService()
- broker.setDataDirectoryFile(Utils.createTempDir())
- connector = new TransportConnector()
- connector.setName("mqtt")
- connector.setUri(new URI("mqtt://" + brokerUri))
- broker.addConnector(connector)
- broker.start()
- }
-
- def teardown(): Unit = {
- if (broker != null) {
- broker.stop()
- broker = null
- }
- if (connector != null) {
- connector.stop()
- connector = null
- }
- Utils.deleteRecursively(persistenceDir)
- }
-
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, new SparkConf())._2
- }
-
- def publishData(topic: String, data: String): Unit = {
- var client: MqttClient = null
- try {
- val persistence = new MqttDefaultFilePersistence(persistenceDir.getAbsolutePath)
- client = new MqttClient("tcp://" + brokerUri, MqttClient.generateClientId(), persistence)
- client.connect()
- if (client.isConnected) {
- val msgTopic = client.getTopic(topic)
- val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8))
- message.setQos(1)
- message.setRetained(true)
-
- for (i <- 0 to 10) {
- try {
- msgTopic.publish(message)
- } catch {
- case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT =>
- // wait for Spark streaming to consume something from the message queue
- Thread.sleep(50)
- }
- }
- }
- } finally {
- if (client != null) {
- client.disconnect()
- client.close()
- client = null
- }
- }
- }
-
-}
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
deleted file mode 100644
index 5d4053afcb..0000000000
--- a/external/twitter/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-twitter_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-twitter</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Twitter</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>4.0.4</version>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
deleted file mode 100644
index bdd57fdde3..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j._
-import twitter4j.auth.Authorization
-import twitter4j.auth.OAuthAuthorization
-import twitter4j.conf.ConfigurationBuilder
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*
-* If no Authorization object is provided, initializes OAuth authorization using the system
-* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
-*/
-private[streaming]
-class TwitterInputDStream(
- _ssc: StreamingContext,
- twitterAuth: Option[Authorization],
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[Status](_ssc) {
-
- private def createOAuthAuthorization(): Authorization = {
- new OAuthAuthorization(new ConfigurationBuilder().build())
- }
-
- private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
- override def getReceiver(): Receiver[Status] = {
- new TwitterReceiver(authorization, filters, storageLevel)
- }
-}
-
-private[streaming]
-class TwitterReceiver(
- twitterAuth: Authorization,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends Receiver[Status](storageLevel) with Logging {
-
- @volatile private var twitterStream: TwitterStream = _
- @volatile private var stopped = false
-
- def onStart() {
- try {
- val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
- newTwitterStream.addListener(new StatusListener {
- def onStatus(status: Status): Unit = {
- store(status)
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {
- if (!stopped) {
- restart("Error receiving tweets", e)
- }
- }
- })
-
- val query = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.mkString(","))
- newTwitterStream.filter(query)
- } else {
- newTwitterStream.sample()
- }
- setTwitterStream(newTwitterStream)
- logInfo("Twitter receiver started")
- stopped = false
- } catch {
- case e: Exception => restart("Error starting Twitter stream", e)
- }
- }
-
- def onStop() {
- stopped = true
- setTwitterStream(null)
- logInfo("Twitter receiver stopped")
- }
-
- private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
- if (twitterStream != null) {
- twitterStream.shutdown()
- }
- twitterStream = newTwitterStream
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
deleted file mode 100644
index 9cb0106ab1..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object TwitterUtils {
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param ssc StreamingContext object
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- ssc: StreamingContext,
- twitterAuth: Option[Authorization],
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[Status] = {
- new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- */
- def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def createStream(jssc: JavaStreamingContext, filters: Array[String]
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param jssc JavaStreamingContext object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization
- */
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def createStream(
- jssc: JavaStreamingContext,
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java
deleted file mode 100644
index 258c0950a0..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Twitter feed receiver for spark streaming.
- */
-package org.apache.spark.streaming.twitter; \ No newline at end of file
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
deleted file mode 100644
index 580e37fa8f..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Twitter feed receiver for spark streaming.
- */
-package object twitter
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
deleted file mode 100644
index 26ec8af455..0000000000
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter;
-
-import org.junit.Test;
-import twitter4j.Status;
-import twitter4j.auth.Authorization;
-import twitter4j.auth.NullAuthorization;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-
-public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testTwitterStream() {
- String[] filters = { "filter1", "filter2" };
- Authorization auth = NullAuthorization.getInstance();
-
- // tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
- JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters);
- JavaDStream<Status> test3 = TwitterUtils.createStream(
- ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth);
- JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
- JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
- auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- }
-}
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
deleted file mode 100644
index 9a3569789d..0000000000
--- a/external/twitter/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the filetarget/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
deleted file mode 100644
index 7e5fc0cbb9..0000000000
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import org.scalatest.BeforeAndAfter
-import twitter4j.Status
-import twitter4j.auth.{Authorization, NullAuthorization}
-
-import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val batchDuration = Seconds(1)
-
- private val master: String = "local[2]"
-
- private val framework: String = this.getClass.getSimpleName
-
- test("twitter input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val filters = Seq("filter1", "filter2")
- val authorization: Authorization = NullAuthorization.getInstance()
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
- val test2: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, None, filters)
- val test3: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, Some(authorization))
- val test5: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
- ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
-
- // Note that actually testing the data receiving is hard as authentication keys are
- // necessary for accessing Twitter live stream
- ssc.stop()
- }
-}
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
deleted file mode 100644
index f16bc0f319..0000000000
--- a/external/zeromq/pom.xml
+++ /dev/null
@@ -1,74 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-zeromq_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-zeromq</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External ZeroMQ</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
deleted file mode 100644
index dd367cd43b..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.akka.ActorReceiver
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T])
- extends ActorReceiver with Logging {
-
- override def preStart(): Unit = {
- ZeroMQExtension(context.system)
- .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
- }
-
- def receive: Receive = {
-
- case Connecting => logInfo("connecting ...")
-
- case m: ZMQMessage =>
- logDebug("Received message for:" + m.frame(0))
-
- // We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail
- store(bytesToObjects(bytes))
-
- case Closed => logInfo("received closed ")
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
deleted file mode 100644
index 1784d6e862..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object ZeroMQUtils {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping (default:
- * ActorReceiver.defaultActorSystemCreator)
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
- supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
- ): ReceiverInputDStream[T] = {
- AkkaUtils.createStream(
- ssc,
- Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver",
- storageLevel,
- actorSystemCreator,
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping.
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel,
- actorSystemCreator: JFunction0[ActorSystem],
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel,
- () => actorSystemCreator.call(),
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might
- * be deserializer of bytes) to translate from sequence of sequence of
- * bytes, where sequence refer to a frame and sub sequence refer to its
- * payload.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn)
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
deleted file mode 100644
index 587c524e21..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package org.apache.spark.streaming.zeromq; \ No newline at end of file
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index 65e6e57f2c..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package object zeromq
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
deleted file mode 100644
index 9ff4b41f97..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq;
-
-import akka.actor.ActorSystem;
-import akka.actor.SupervisorStrategy;
-import akka.util.ByteString;
-import akka.zeromq.Subscribe;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function0;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
-public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
-
- @Test // tests the API, does not actually test data receiving
- public void testZeroMQStream() {
- String publishUrl = "abc";
- Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
- Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
-
- JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects);
- JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
- SupervisorStrategy.defaultStrategy());
- }
-}
-
-class BytesToObjects implements Function<byte[][], Iterable<String>> {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
-}
-
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
- @Override
- public ActorSystem call() {
- return null;
- }
-}
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/zeromq/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
deleted file mode 100644
index bac2679cab..0000000000
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class ZeroMQStreamSuite extends SparkFunSuite {
-
- val batchDuration = Seconds(1)
-
- private val master: String = "local[2]"
-
- private val framework: String = this.getClass.getSimpleName
-
- test("zeromq input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val publishUrl = "abc"
- val subscribe = new Subscribe(null.asInstanceOf[ByteString])
- val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
- val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
- val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
- val test4: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
-
- // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
- ssc.stop()
- }
-}