aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/pom.xml78
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala72
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala166
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala205
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala123
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala117
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala311
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java21
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala23
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala129
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala102
17 files changed, 0 insertions, 1756 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
deleted file mode 100644
index d650dd034d..0000000000
--- a/external/flume/pom.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-flume</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Flume</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
deleted file mode 100644
index 5c773d4b07..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ObjectInput, ObjectOutput}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A simple object that provides the implementation of readExternal and writeExternal for both
- * the wrapper classes for Flume-style Events.
- */
-private[streaming] object EventTransformer extends Logging {
- def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence],
- Array[Byte]) = {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.readFully(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.readFully(keyBuff)
- val key: String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.readFully(valBuff)
- val value: String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
- (headers, bodyBuff)
- }
-
- def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence],
- body: Array[Byte]) {
- out.writeInt(body.length)
- out.write(body)
- val numHeaders = headers.size()
- out.writeInt(numHeaders)
- for ((k, v) <- headers.asScala) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
deleted file mode 100644
index 3555fa68b6..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.google.common.base.Throwables
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.flume.sink._
-
-/**
- * This class implements the core functionality of [[FlumePollingReceiver]]. When started it
- * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be
- * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]]
- *
- * @param receiver The receiver that owns this instance.
- */
-
-private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with
- Logging {
-
- def run(): Unit = {
- while (!receiver.isStopped()) {
- val connection = receiver.getConnections.poll()
- val client = connection.client
- var batchReceived = false
- var seq: CharSequence = null
- try {
- getBatch(client) match {
- case Some(eventBatch) =>
- batchReceived = true
- seq = eventBatch.getSequenceNumber
- val events = toSparkFlumeEvents(eventBatch.getEvents)
- if (store(events)) {
- sendAck(client, seq)
- } else {
- sendNack(batchReceived, client, seq)
- }
- case None =>
- }
- } catch {
- case e: Exception =>
- Throwables.getRootCause(e) match {
- // If the cause was an InterruptedException, then check if the receiver is stopped -
- // if yes, just break out of the loop. Else send a Nack and log a warning.
- // In the unlikely case, the cause was not an Exception,
- // then just throw it out and exit.
- case interrupted: InterruptedException =>
- if (!receiver.isStopped()) {
- logWarning("Interrupted while receiving data from Flume", interrupted)
- sendNack(batchReceived, client, seq)
- }
- case exception: Exception =>
- logWarning("Error while receiving data from Flume", exception)
- sendNack(batchReceived, client, seq)
- }
- } finally {
- receiver.getConnections.add(connection)
- }
- }
- }
-
- /**
- * Gets a batch of events from the specified client. This method does not handle any exceptions
- * which will be propagated to the caller.
- * @param client Client to get events from
- * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]]
- */
- private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = {
- val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
- if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
- // No error, proceed with processing data
- logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " +
- s"number: ${eventBatch.getSequenceNumber}")
- Some(eventBatch)
- } else {
- logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
- eventBatch.getErrorMsg)
- None
- }
- }
-
- /**
- * Store the events in the buffer to Spark. This method will not propagate any exceptions,
- * but will propagate any other errors.
- * @param buffer The buffer to store
- * @return true if the data was stored without any exception being thrown, else false
- */
- private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
- try {
- receiver.store(buffer)
- true
- } catch {
- case e: Exception =>
- logWarning("Error while attempting to store data received from Flume", e)
- false
- }
- }
-
- /**
- * Send an ack to the client for the sequence number. This method does not handle any exceptions
- * which will be propagated to the caller.
- * @param client client to send the ack to
- * @param seq sequence number of the batch to be ack-ed.
- * @return
- */
- private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = {
- logDebug("Sending ack for sequence number: " + seq)
- client.ack(seq)
- logDebug("Ack sent for sequence number: " + seq)
- }
-
- /**
- * This method sends a Nack if a batch was received to the client with the given sequence
- * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made
- * to handle it.
- * @param batchReceived true if a batch was received. If this is false, no nack is sent
- * @param client The client to which the nack should be sent
- * @param seq The sequence number of the batch that is being nack-ed.
- */
- private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback,
- seq: CharSequence): Unit = {
- if (batchReceived) {
- // Let Flume know that the events need to be pushed back into the channel.
- logDebug("Sending nack for sequence number: " + seq)
- client.nack(seq) // If the agent is down, even this could fail and throw
- logDebug("Nack sent for sequence number: " + seq)
- }
- }
-
- /**
- * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
- * @param events - Events to convert to SparkFlumeEvents
- * @return - The SparkFlumeEvent generated from SparkSinkEvent
- */
- private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
- ArrayBuffer[SparkFlumeEvent] = {
- // Convert each Flume event to a serializable SparkFlumeEvent
- val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
- var j = 0
- while (j < events.size()) {
- val event = events.get(j)
- val sparkFlumeEvent = new SparkFlumeEvent()
- sparkFlumeEvent.event.setBody(event.getBody)
- sparkFlumeEvent.event.setHeaders(event.getHeaders)
- buffer += sparkFlumeEvent
- j += 1
- }
- buffer
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
deleted file mode 100644
index 74bd0165c6..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-import java.util.concurrent.Executors
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status}
-import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- host: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
- override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel, enableDecompression)
- }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
- var event: AvroFlumeEvent = new AvroFlumeEvent()
-
- /* De-serialize from bytes. */
- def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
- val bodyLength = in.readInt()
- val bodyBuff = new Array[Byte](bodyLength)
- in.readFully(bodyBuff)
-
- val numHeaders = in.readInt()
- val headers = new java.util.HashMap[CharSequence, CharSequence]
-
- for (i <- 0 until numHeaders) {
- val keyLength = in.readInt()
- val keyBuff = new Array[Byte](keyLength)
- in.readFully(keyBuff)
- val key: String = Utils.deserialize(keyBuff)
-
- val valLength = in.readInt()
- val valBuff = new Array[Byte](valLength)
- in.readFully(valBuff)
- val value: String = Utils.deserialize(valBuff)
-
- headers.put(key, value)
- }
-
- event.setBody(ByteBuffer.wrap(bodyBuff))
- event.setHeaders(headers)
- }
-
- /* Serialize to bytes. */
- def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
- val body = event.getBody
- out.writeInt(body.remaining())
- Utils.writeByteBuffer(body, out)
-
- val numHeaders = event.getHeaders.size()
- out.writeInt(numHeaders)
- for ((k, v) <- event.getHeaders.asScala) {
- val keyBuff = Utils.serialize(k.toString)
- out.writeInt(keyBuff.length)
- out.write(keyBuff)
- val valBuff = Utils.serialize(v.toString)
- out.writeInt(valBuff.length)
- out.write(valBuff)
- }
- }
-}
-
-private[streaming] object SparkFlumeEvent {
- def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
- val event = new SparkFlumeEvent
- event.event = in
- event
- }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
- override def append(event: AvroFlumeEvent): Status = {
- receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
- Status.OK
- }
-
- override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
- events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
- Status.OK
- }
-}
-
-/** A NetworkReceiver which listens for events using the
- * Flume Avro interface. */
-private[streaming]
-class FlumeReceiver(
- host: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
- lazy val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- var server: NettyServer = null
-
- private def initServer() = {
- if (enableDecompression) {
- val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool())
- val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
- new NettyServer(
- responder,
- new InetSocketAddress(host, port),
- channelFactory,
- channelPipelineFactory,
- null)
- } else {
- new NettyServer(responder, new InetSocketAddress(host, port))
- }
- }
-
- def onStart() {
- synchronized {
- if (server == null) {
- server = initServer()
- server.start()
- } else {
- logWarning("Flume receiver being asked to start more then once with out close")
- }
- }
- logInfo("Flume receiver started")
- }
-
- def onStop() {
- synchronized {
- if (server != null) {
- server.close()
- server = null
- }
- }
- logInfo("Flume receiver stopped")
- }
-
- override def preferredLocation: Option[String] = Option(host)
-
- /** A Netty Pipeline factory that will decompress incoming data from
- * and the Netty client and compress data going back to the client.
- *
- * The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
- */
- private[streaming]
- class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
- def getPipeline(): ChannelPipeline = {
- val pipeline = Channels.pipeline()
- val encoder = new ZlibEncoder(6)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- pipeline
- }
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
deleted file mode 100644
index d9c25e8654..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running
- * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
- * @param _ssc Streaming context that will execute this input stream
- * @param addresses List of addresses at which SparkSinks are listening
- * @param maxBatchSize Maximum size of a batch
- * @param parallelism Number of parallel connections to open
- * @param storageLevel The storage level to use.
- * @tparam T Class type of the object of this stream
- */
-private[streaming] class FlumePollingInputDStream[T: ClassTag](
- _ssc: StreamingContext,
- val addresses: Seq[InetSocketAddress],
- val maxBatchSize: Int,
- val parallelism: Int,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
- override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel)
- }
-}
-
-private[streaming] class FlumePollingReceiver(
- addresses: Seq[InetSocketAddress],
- maxBatchSize: Int,
- parallelism: Int,
- storageLevel: StorageLevel
- ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
- lazy val channelFactoryExecutor =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
- setNameFormat("Flume Receiver Channel Thread - %d").build())
-
- lazy val channelFactory =
- new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
-
- lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build())
-
- private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
-
- override def onStart(): Unit = {
- // Create the connections to each Flume agent.
- addresses.foreach(host => {
- val transceiver = new NettyTransceiver(host, channelFactory)
- val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
- connections.add(new FlumeConnection(transceiver, client))
- })
- for (i <- 0 until parallelism) {
- logInfo("Starting Flume Polling Receiver worker threads..")
- // Threads that pull data from Flume.
- receiverExecutor.submit(new FlumeBatchFetcher(this))
- }
- }
-
- override def onStop(): Unit = {
- logInfo("Shutting down Flume Polling Receiver")
- receiverExecutor.shutdown()
- // Wait upto a minute for the threads to die
- if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- receiverExecutor.shutdownNow()
- }
- connections.asScala.foreach(_.transceiver.close())
- channelFactory.releaseExternalResources()
- }
-
- private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
- this.connections
- }
-
- private[flume] def getMaxBatchSize: Int = {
- this.maxBatchSize
- }
-}
-
-/**
- * A wrapper around the transceiver and the Avro IPC API.
- * @param transceiver The transceiver to use for communication with Flume
- * @param client The client that the callbacks are received on.
- */
-private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
- val client: SparkFlumeProtocol.Callback)
-
-
-
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
deleted file mode 100644
index 945cfa7295..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList}
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
-
-import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class FlumeTestUtils {
-
- private var transceiver: NettyTransceiver = null
-
- private val testPort: Int = findFreePort()
-
- def getTestPort(): Int = testPort
-
- /** Find a free port */
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, new SparkConf())._2
- }
-
- /** Send data to the flume receiver */
- def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
- val testAddress = new InetSocketAddress("localhost", testPort)
-
- val inputEvents = input.asScala.map { item =>
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
- event.setHeaders(Collections.singletonMap("test", "header"))
- event
- }
-
- // if last attempted transceiver had succeeded, close it
- close()
-
- // Create transceiver
- transceiver = {
- if (enableCompression) {
- new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
- } else {
- new NettyTransceiver(testAddress)
- }
- }
-
- // Create Avro client with the transceiver
- val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
- if (client == null) {
- throw new AssertionError("Cannot create client")
- }
-
- // Send data
- val status = client.appendBatch(inputEvents.asJava)
- if (status != avro.Status.OK) {
- throw new AssertionError("Sent events unsuccessfully")
- }
- }
-
- def close(): Unit = {
- if (transceiver != null) {
- transceiver.close()
- transceiver = null
- }
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
deleted file mode 100644
index 3e3ed712f0..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ByteArrayOutputStream, DataOutputStream}
-import java.net.InetSocketAddress
-import java.util.{List => JList, Map => JMap}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.api.python.PythonRDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object FlumeUtils {
- private val DEFAULT_POLLING_PARALLELISM = 5
- private val DEFAULT_POLLING_BATCH_SIZE = 1000
-
- /**
- * Create a input stream from a Flume source.
- * @param ssc StreamingContext object
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream (
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createStream(ssc, hostname, port, storageLevel, false)
- }
-
- /**
- * Create a input stream from a Flume source.
- * @param ssc StreamingContext object
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- * @param enableDecompression should netty server decompress input stream
- */
- def createStream (
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](
- ssc, hostname, port, storageLevel, enableDecompression)
-
- inputStream
- }
-
- /**
- * Creates a input stream from a Flume source.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel, false)
- }
-
- /**
- * Creates a input stream from a Flume source.
- * @param hostname Hostname of the slave machine to which the flume data will be sent
- * @param port Port of the slave machine to which the flume data will be sent
- * @param storageLevel Storage level to use for storing the received objects
- * @param enableDecompression should netty server decompress input stream
- */
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Address of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param addresses List of InetSocketAddresses representing the hosts to connect to.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- addresses: Seq[InetSocketAddress],
- storageLevel: StorageLevel
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(ssc, addresses, storageLevel,
- DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * @param addresses List of InetSocketAddresses representing the hosts to connect to.
- * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a
- * single RPC call
- * @param parallelism Number of concurrent requests this stream should send to the sink. Note
- * that having a higher number of requests concurrently being pulled will
- * result in this stream using more threads
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- ssc: StreamingContext,
- addresses: Seq[InetSocketAddress],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): ReceiverInputDStream[SparkFlumeEvent] = {
- new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
- parallelism, storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Hostname of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param hostname Hostname of the host on which the Spark Sink is running
- * @param port Port of the host at which the Spark Sink is listening
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * This stream will use a batch size of 1000 events and run 5 threads to pull data.
- * @param addresses List of InetSocketAddresses on which the Spark Sink is running.
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- addresses: Array[InetSocketAddress],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc, addresses, storageLevel,
- DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
- }
-
- /**
- * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
- * This stream will poll the sink for data and will pull events as they are available.
- * @param addresses List of InetSocketAddresses on which the Spark Sink is running
- * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a
- * single RPC call
- * @param parallelism Number of concurrent requests this stream should send to the sink. Note
- * that having a higher number of requests concurrently being pulled will
- * result in this stream using more threads
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createPollingStream(
- jssc: JavaStreamingContext,
- addresses: Array[InetSocketAddress],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
- }
-}
-
-/**
- * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
- * function so that it can be easily instantiated and called from Python's FlumeUtils.
- */
-private[flume] class FlumeUtilsPythonHelper {
-
- def createStream(
- jssc: JavaStreamingContext,
- hostname: String,
- port: Int,
- storageLevel: StorageLevel,
- enableDecompression: Boolean
- ): JavaPairDStream[Array[Byte], Array[Byte]] = {
- val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
- FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
- }
-
- def createPollingStream(
- jssc: JavaStreamingContext,
- hosts: JList[String],
- ports: JList[Int],
- storageLevel: StorageLevel,
- maxBatchSize: Int,
- parallelism: Int
- ): JavaPairDStream[Array[Byte], Array[Byte]] = {
- assert(hosts.size() == ports.size())
- val addresses = hosts.asScala.zip(ports.asScala).map {
- case (host, port) => new InetSocketAddress(host, port)
- }
- val dstream = FlumeUtils.createPollingStream(
- jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
- FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
- }
-
-}
-
-private object FlumeUtilsPythonHelper {
-
- private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
- val byteStream = new ByteArrayOutputStream()
- val output = new DataOutputStream(byteStream)
- try {
- output.writeInt(map.size)
- map.asScala.foreach { kv =>
- PythonRDD.writeUTF(kv._1.toString, output)
- PythonRDD.writeUTF(kv._2.toString, output)
- }
- byteStream.toByteArray
- }
- finally {
- output.close()
- }
- }
-
- private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
- JavaPairDStream[Array[Byte], Array[Byte]] = {
- dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
- override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
- val event = sparkEvent.event
- val byteBuffer = event.getBody
- val body = new Array[Byte](byteBuffer.remaining())
- byteBuffer.get(body)
- (stringMapToByteArray(event.getHeaders), body)
- }
- })
- }
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
deleted file mode 100644
index 1a96df6e94..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.nio.charset.StandardCharsets
-import java.util.{Collections, List => JList, Map => JMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flume.event.EventBuilder
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-
-import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class PollingFlumeTestUtils {
-
- private val batchCount = 5
- val eventsPerBatch = 100
- private val totalEventsPerChannel = batchCount * eventsPerBatch
- private val channelCapacity = 5000
-
- def getTotalEvents: Int = totalEventsPerChannel * channels.size
-
- private val channels = new ArrayBuffer[MemoryChannel]
- private val sinks = new ArrayBuffer[SparkSink]
-
- /**
- * Start a sink and return the port of this sink
- */
- def startSingleSink(): Int = {
- channels.clear()
- sinks.clear()
-
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- channels += (channel)
- sinks += sink
-
- sink.getPort()
- }
-
- /**
- * Start 2 sinks and return the ports
- */
- def startMultipleSinks(): Seq[Int] = {
- channels.clear()
- sinks.clear()
-
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val channel2 = new MemoryChannel()
- Configurables.configure(channel2, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- val sink2 = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink2, context)
- sink2.setChannel(channel2)
- sink2.start()
-
- sinks += sink
- sinks += sink2
- channels += channel
- channels += channel2
-
- sinks.map(_.getPort())
- }
-
- /**
- * Send data and wait until all data has been received
- */
- def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
- val executor = Executors.newCachedThreadPool()
- val executorCompletion = new ExecutorCompletionService[Void](executor)
-
- val latch = new CountDownLatch(batchCount * channels.size)
- sinks.foreach(_.countdownWhenBatchReceived(latch))
-
- channels.foreach(channel => {
- executorCompletion.submit(new TxnSubmitter(channel))
- })
-
- for (i <- 0 until channels.size) {
- executorCompletion.take()
- }
-
- latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
- }
-
- /**
- * A Python-friendly method to assert the output
- */
- def assertOutput(
- outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
- require(outputHeaders.size == outputBodies.size)
- val eventSize = outputHeaders.size
- if (eventSize != totalEventsPerChannel * channels.size) {
- throw new AssertionError(
- s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
- }
- var counter = 0
- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
- val eventBodyToVerify = s"${channels(k).getName}-$i"
- val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header")
- var found = false
- var j = 0
- while (j < eventSize && !found) {
- if (eventBodyToVerify == outputBodies.get(j) &&
- eventHeaderToVerify == outputHeaders.get(j)) {
- found = true
- counter += 1
- }
- j += 1
- }
- }
- if (counter != totalEventsPerChannel * channels.size) {
- throw new AssertionError(
- s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
- }
- }
-
- def assertChannelsAreEmpty(): Unit = {
- channels.foreach(assertChannelIsEmpty)
- }
-
- private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
- val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
- queueRemaining.setAccessible(true)
- val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
- throw new AssertionError(s"Channel ${channel.getName} is not empty")
- }
- }
-
- def close(): Unit = {
- sinks.foreach(_.stop())
- sinks.clear()
- channels.foreach(_.stop())
- channels.clear()
- }
-
- private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
- override def call(): Void = {
- var t = 0
- for (i <- 0 until batchCount) {
- val tx = channel.getTransaction
- tx.begin()
- for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody(
- s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
- Collections.singletonMap(s"test-$t", "header")))
- t += 1
- }
- tx.commit()
- tx.close()
- Thread.sleep(500) // Allow some time for the events to reach
- }
- null
- }
- }
-
-}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
deleted file mode 100644
index d31aa5f5c0..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark streaming receiver for Flume.
- */
-package org.apache.spark.streaming.flume; \ No newline at end of file
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
deleted file mode 100644
index 9bfab68c4b..0000000000
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Spark streaming receiver for Flume.
- */
-package object flume
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91654..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- InetSocketAddress[] addresses = new InetSocketAddress[] {
- new InetSocketAddress("localhost", 12345)
- };
- JavaReceiverInputDStream<SparkFlumeEvent> test1 =
- FlumeUtils.createPollingStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
- ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index 3b5e0c7746..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2(), false);
- }
-}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27ca7c..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output.add(collected)
- }, false) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- ois.defaultReadObject()
- output.clear()
- }
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 10dcbf98bc..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val maxAttempts = 5
- val batchDuration = Seconds(1)
-
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
- val utils = new PollingFlumeTestUtils
-
- test("flume polling test") {
- testMultipleTimes(testFlumePolling)
- }
-
- test("flume polling test multiple hosts") {
- testMultipleTimes(testFlumePollingMultipleHost)
- }
-
- /**
- * Run the given test until no more java.net.BindException's are thrown.
- * Do this only up to a certain attempt limit.
- */
- private def testMultipleTimes(test: () => Unit): Unit = {
- var testPassed = false
- var attempt = 0
- while (!testPassed && attempt < maxAttempts) {
- try {
- test()
- testPassed = true
- } catch {
- case e: Exception if Utils.isBindCollision(e) =>
- logWarning("Exception when running flume polling test: " + e)
- attempt += 1
- }
- }
- assert(testPassed, s"Test failed after $attempt attempts!")
- }
-
- private def testFlumePolling(): Unit = {
- try {
- val port = utils.startSingleSink()
-
- writeAndVerify(Seq(port))
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- private def testFlumePollingMultipleHost(): Unit = {
- try {
- val ports = utils.startMultipleSinks()
- writeAndVerify(ports)
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- utils.eventsPerBatch, 5)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
-
- ssc.start()
- try {
- utils.sendDatAndEnsureAllDataHasBeenReceived()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutput = outputQueue.asScala.toSeq.flatten
- val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
- case (key, value) => (key.toString, value.toString)
- }).map(_.asJava)
- val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
- utils.assertOutput(headers.asJava, bodies.asJava)
- }
- } finally {
- ssc.stop()
- }
- }
-
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 38208c6518..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
- var ssc: StreamingContext = null
-
- test("flume input stream") {
- testFlumeStream(testCompression = false)
- }
-
- test("flume input compressed stream") {
- testFlumeStream(testCompression = true)
- }
-
- /** Run test on flume stream */
- private def testFlumeStream(testCompression: Boolean): Unit = {
- val input = (1 to 100).map { _.toString }
- val utils = new FlumeTestUtils
- try {
- val outputQueue = startContext(utils.getTestPort(), testCompression)
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- utils.writeInput(input.asJava, testCompression)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
- output should be (input)
- }
- } finally {
- if (ssc != null) {
- ssc.stop()
- }
- utils.close()
- }
- }
-
- /** Setup and start the streaming context */
- private def startContext(
- testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
- ssc = new StreamingContext(conf, Milliseconds(200))
- val flumeStream = FlumeUtils.createStream(
- ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
- ssc.start()
- outputQueue
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-}