diff options
Diffstat (limited to 'external/flume')
17 files changed, 0 insertions, 1756 deletions
diff --git a/external/flume/pom.xml b/external/flume/pom.xml deleted file mode 100644 index d650dd034d..0000000000 --- a/external/flume/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-flume_2.11</artifactId> - <properties> - <sbt.project.name>streaming-flume</sbt.project.name> - </properties> - <packaging>jar</packaging> - <name>Spark Project External Flume</name> - <url>http://spark.apache.org/</url> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> - </dependency> - <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> -</project> diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala deleted file mode 100644 index 5c773d4b07..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.io.{ObjectInput, ObjectOutput} - -import scala.collection.JavaConverters._ - -import org.apache.spark.Logging -import org.apache.spark.util.Utils - -/** - * A simple object that provides the implementation of readExternal and writeExternal for both - * the wrapper classes for Flume-style Events. - */ -private[streaming] object EventTransformer extends Logging { - def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, CharSequence], - Array[Byte]) = { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.readFully(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.readFully(keyBuff) - val key: String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.readFully(valBuff) - val value: String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - (headers, bodyBuff) - } - - def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, CharSequence], - body: Array[Byte]) { - out.writeInt(body.length) - out.write(body) - val numHeaders = headers.size() - out.writeInt(numHeaders) - for ((k, v) <- headers.asScala) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala deleted file mode 100644 index 3555fa68b6..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume - -import scala.collection.mutable.ArrayBuffer - -import com.google.common.base.Throwables - -import org.apache.spark.Logging -import org.apache.spark.streaming.flume.sink._ - -/** - * This class implements the core functionality of [[FlumePollingReceiver]]. When started it - * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. This class should be - * run via an [[java.util.concurrent.Executor]] as this implements [[Runnable]] - * - * @param receiver The receiver that owns this instance. - */ - -private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with - Logging { - - def run(): Unit = { - while (!receiver.isStopped()) { - val connection = receiver.getConnections.poll() - val client = connection.client - var batchReceived = false - var seq: CharSequence = null - try { - getBatch(client) match { - case Some(eventBatch) => - batchReceived = true - seq = eventBatch.getSequenceNumber - val events = toSparkFlumeEvents(eventBatch.getEvents) - if (store(events)) { - sendAck(client, seq) - } else { - sendNack(batchReceived, client, seq) - } - case None => - } - } catch { - case e: Exception => - Throwables.getRootCause(e) match { - // If the cause was an InterruptedException, then check if the receiver is stopped - - // if yes, just break out of the loop. Else send a Nack and log a warning. - // In the unlikely case, the cause was not an Exception, - // then just throw it out and exit. - case interrupted: InterruptedException => - if (!receiver.isStopped()) { - logWarning("Interrupted while receiving data from Flume", interrupted) - sendNack(batchReceived, client, seq) - } - case exception: Exception => - logWarning("Error while receiving data from Flume", exception) - sendNack(batchReceived, client, seq) - } - } finally { - receiver.getConnections.add(connection) - } - } - } - - /** - * Gets a batch of events from the specified client. This method does not handle any exceptions - * which will be propagated to the caller. - * @param client Client to get events from - * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]] - */ - private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = { - val eventBatch = client.getEventBatch(receiver.getMaxBatchSize) - if (!SparkSinkUtils.isErrorBatch(eventBatch)) { - // No error, proceed with processing data - logDebug(s"Received batch of ${eventBatch.getEvents.size} events with sequence " + - s"number: ${eventBatch.getSequenceNumber}") - Some(eventBatch) - } else { - logWarning("Did not receive events from Flume agent due to error on the Flume agent: " + - eventBatch.getErrorMsg) - None - } - } - - /** - * Store the events in the buffer to Spark. This method will not propagate any exceptions, - * but will propagate any other errors. - * @param buffer The buffer to store - * @return true if the data was stored without any exception being thrown, else false - */ - private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = { - try { - receiver.store(buffer) - true - } catch { - case e: Exception => - logWarning("Error while attempting to store data received from Flume", e) - false - } - } - - /** - * Send an ack to the client for the sequence number. This method does not handle any exceptions - * which will be propagated to the caller. - * @param client client to send the ack to - * @param seq sequence number of the batch to be ack-ed. - * @return - */ - private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): Unit = { - logDebug("Sending ack for sequence number: " + seq) - client.ack(seq) - logDebug("Ack sent for sequence number: " + seq) - } - - /** - * This method sends a Nack if a batch was received to the client with the given sequence - * number. Any exceptions thrown by the RPC call is simply thrown out as is - no effort is made - * to handle it. - * @param batchReceived true if a batch was received. If this is false, no nack is sent - * @param client The client to which the nack should be sent - * @param seq The sequence number of the batch that is being nack-ed. - */ - private def sendNack(batchReceived: Boolean, client: SparkFlumeProtocol.Callback, - seq: CharSequence): Unit = { - if (batchReceived) { - // Let Flume know that the events need to be pushed back into the channel. - logDebug("Sending nack for sequence number: " + seq) - client.nack(seq) // If the agent is down, even this could fail and throw - logDebug("Nack sent for sequence number: " + seq) - } - } - - /** - * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s - * @param events - Events to convert to SparkFlumeEvents - * @return - The SparkFlumeEvent generated from SparkSinkEvent - */ - private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]): - ArrayBuffer[SparkFlumeEvent] = { - // Convert each Flume event to a serializable SparkFlumeEvent - val buffer = new ArrayBuffer[SparkFlumeEvent](events.size()) - var j = 0 - while (j < events.size()) { - val event = events.get(j) - val sparkFlumeEvent = new SparkFlumeEvent() - sparkFlumeEvent.event.setBody(event.getBody) - sparkFlumeEvent.event.setHeaders(event.getHeaders) - buffer += sparkFlumeEvent - j += 1 - } - buffer - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala deleted file mode 100644 index 74bd0165c6..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.io.{Externalizable, ObjectInput, ObjectOutput} -import java.net.InetSocketAddress -import java.nio.ByteBuffer -import java.util.concurrent.Executors - -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag - -import org.apache.avro.ipc.NettyServer -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status} -import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory -import org.jboss.netty.handler.codec.compression._ - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.util.Utils - -private[streaming] -class FlumeInputDStream[T: ClassTag]( - _ssc: StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel, - enableDecompression: Boolean -) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - - override def getReceiver(): Receiver[SparkFlumeEvent] = { - new FlumeReceiver(host, port, storageLevel, enableDecompression) - } -} - -/** - * A wrapper class for AvroFlumeEvent's with a custom serialization format. - * - * This is necessary because AvroFlumeEvent uses inner data structures - * which are not serializable. - */ -class SparkFlumeEvent() extends Externalizable { - var event: AvroFlumeEvent = new AvroFlumeEvent() - - /* De-serialize from bytes. */ - def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val bodyLength = in.readInt() - val bodyBuff = new Array[Byte](bodyLength) - in.readFully(bodyBuff) - - val numHeaders = in.readInt() - val headers = new java.util.HashMap[CharSequence, CharSequence] - - for (i <- 0 until numHeaders) { - val keyLength = in.readInt() - val keyBuff = new Array[Byte](keyLength) - in.readFully(keyBuff) - val key: String = Utils.deserialize(keyBuff) - - val valLength = in.readInt() - val valBuff = new Array[Byte](valLength) - in.readFully(valBuff) - val value: String = Utils.deserialize(valBuff) - - headers.put(key, value) - } - - event.setBody(ByteBuffer.wrap(bodyBuff)) - event.setHeaders(headers) - } - - /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - val body = event.getBody - out.writeInt(body.remaining()) - Utils.writeByteBuffer(body, out) - - val numHeaders = event.getHeaders.size() - out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders.asScala) { - val keyBuff = Utils.serialize(k.toString) - out.writeInt(keyBuff.length) - out.write(keyBuff) - val valBuff = Utils.serialize(v.toString) - out.writeInt(valBuff.length) - out.write(valBuff) - } - } -} - -private[streaming] object SparkFlumeEvent { - def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = { - val event = new SparkFlumeEvent - event.event = in - event - } -} - -/** A simple server that implements Flume's Avro protocol. */ -private[streaming] -class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol { - override def append(event: AvroFlumeEvent): Status = { - receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) - Status.OK - } - - override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = { - events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) - Status.OK - } -} - -/** A NetworkReceiver which listens for events using the - * Flume Avro interface. */ -private[streaming] -class FlumeReceiver( - host: String, - port: Int, - storageLevel: StorageLevel, - enableDecompression: Boolean - ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { - - lazy val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - var server: NettyServer = null - - private def initServer() = { - if (enableDecompression) { - val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool()) - val channelPipelineFactory = new CompressionChannelPipelineFactory() - - new NettyServer( - responder, - new InetSocketAddress(host, port), - channelFactory, - channelPipelineFactory, - null) - } else { - new NettyServer(responder, new InetSocketAddress(host, port)) - } - } - - def onStart() { - synchronized { - if (server == null) { - server = initServer() - server.start() - } else { - logWarning("Flume receiver being asked to start more then once with out close") - } - } - logInfo("Flume receiver started") - } - - def onStop() { - synchronized { - if (server != null) { - server.close() - server = null - } - } - logInfo("Flume receiver stopped") - } - - override def preferredLocation: Option[String] = Option(host) - - /** A Netty Pipeline factory that will decompress incoming data from - * and the Netty client and compress data going back to the client. - * - * The compression on the return is required because Flume requires - * a successful response to indicate it can remove the event/batch - * from the configured channel - */ - private[streaming] - class CompressionChannelPipelineFactory extends ChannelPipelineFactory { - def getPipeline(): ChannelPipeline = { - val pipeline = Channels.pipeline() - val encoder = new ZlibEncoder(6) - pipeline.addFirst("deflater", encoder) - pipeline.addFirst("inflater", new ZlibDecoder()) - pipeline - } - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala deleted file mode 100644 index d9c25e8654..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume - - -import java.net.InetSocketAddress -import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit} - -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag - -import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.avro.ipc.NettyTransceiver -import org.apache.avro.ipc.specific.SpecificRequestor -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.flume.sink._ -import org.apache.spark.streaming.receiver.Receiver - -/** - * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running - * [[org.apache.spark.streaming.flume.sink.SparkSink]]s. - * @param _ssc Streaming context that will execute this input stream - * @param addresses List of addresses at which SparkSinks are listening - * @param maxBatchSize Maximum size of a batch - * @param parallelism Number of parallel connections to open - * @param storageLevel The storage level to use. - * @tparam T Class type of the object of this stream - */ -private[streaming] class FlumePollingInputDStream[T: ClassTag]( - _ssc: StreamingContext, - val addresses: Seq[InetSocketAddress], - val maxBatchSize: Int, - val parallelism: Int, - storageLevel: StorageLevel - ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { - - override def getReceiver(): Receiver[SparkFlumeEvent] = { - new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) - } -} - -private[streaming] class FlumePollingReceiver( - addresses: Seq[InetSocketAddress], - maxBatchSize: Int, - parallelism: Int, - storageLevel: StorageLevel - ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { - - lazy val channelFactoryExecutor = - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). - setNameFormat("Flume Receiver Channel Thread - %d").build()) - - lazy val channelFactory = - new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) - - lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) - - private lazy val connections = new LinkedBlockingQueue[FlumeConnection]() - - override def onStart(): Unit = { - // Create the connections to each Flume agent. - addresses.foreach(host => { - val transceiver = new NettyTransceiver(host, channelFactory) - val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) - connections.add(new FlumeConnection(transceiver, client)) - }) - for (i <- 0 until parallelism) { - logInfo("Starting Flume Polling Receiver worker threads..") - // Threads that pull data from Flume. - receiverExecutor.submit(new FlumeBatchFetcher(this)) - } - } - - override def onStop(): Unit = { - logInfo("Shutting down Flume Polling Receiver") - receiverExecutor.shutdown() - // Wait upto a minute for the threads to die - if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - receiverExecutor.shutdownNow() - } - connections.asScala.foreach(_.transceiver.close()) - channelFactory.releaseExternalResources() - } - - private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = { - this.connections - } - - private[flume] def getMaxBatchSize: Int = { - this.maxBatchSize - } -} - -/** - * A wrapper around the transceiver and the Avro IPC API. - * @param transceiver The transceiver to use for communication with Flume - * @param client The client that the callbacks are received on. - */ -private[flume] class FlumeConnection(val transceiver: NettyTransceiver, - val client: SparkFlumeProtocol.Callback) - - - diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala deleted file mode 100644 index 945cfa7295..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.net.{InetSocketAddress, ServerSocket} -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.{List => JList} -import java.util.Collections - -import scala.collection.JavaConverters._ - -import org.apache.avro.ipc.NettyTransceiver -import org.apache.avro.ipc.specific.SpecificRequestor -import org.apache.commons.lang3.RandomUtils -import org.apache.flume.source.avro -import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.socket.SocketChannel -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} - -import org.apache.spark.util.Utils -import org.apache.spark.SparkConf - -/** - * Share codes for Scala and Python unit tests - */ -private[flume] class FlumeTestUtils { - - private var transceiver: NettyTransceiver = null - - private val testPort: Int = findFreePort() - - def getTestPort(): Int = testPort - - /** Find a free port */ - private def findFreePort(): Int = { - val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { - val socket = new ServerSocket(trialPort) - socket.close() - (null, trialPort) - }, new SparkConf())._2 - } - - /** Send data to the flume receiver */ - def writeInput(input: JList[String], enableCompression: Boolean): Unit = { - val testAddress = new InetSocketAddress("localhost", testPort) - - val inputEvents = input.asScala.map { item => - val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8))) - event.setHeaders(Collections.singletonMap("test", "header")) - event - } - - // if last attempted transceiver had succeeded, close it - close() - - // Create transceiver - transceiver = { - if (enableCompression) { - new NettyTransceiver(testAddress, new CompressionChannelFactory(6)) - } else { - new NettyTransceiver(testAddress) - } - } - - // Create Avro client with the transceiver - val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver) - if (client == null) { - throw new AssertionError("Cannot create client") - } - - // Send data - val status = client.appendBatch(inputEvents.asJava) - if (status != avro.Status.OK) { - throw new AssertionError("Sent events unsuccessfully") - } - } - - def close(): Unit = { - if (transceiver != null) { - transceiver.close() - transceiver = null - } - } - - /** Class to create socket channel with compression */ - private class CompressionChannelFactory(compressionLevel: Int) - extends NioClientSocketChannelFactory { - - override def newChannel(pipeline: ChannelPipeline): SocketChannel = { - val encoder = new ZlibEncoder(compressionLevel) - pipeline.addFirst("deflater", encoder) - pipeline.addFirst("inflater", new ZlibDecoder()) - super.newChannel(pipeline) - } - } - -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala deleted file mode 100644 index 3e3ed712f0..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.io.{ByteArrayOutputStream, DataOutputStream} -import java.net.InetSocketAddress -import java.util.{List => JList, Map => JMap} - -import scala.collection.JavaConverters._ - -import org.apache.spark.api.java.function.PairFunction -import org.apache.spark.api.python.PythonRDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.ReceiverInputDStream - -object FlumeUtils { - private val DEFAULT_POLLING_PARALLELISM = 5 - private val DEFAULT_POLLING_BATCH_SIZE = 1000 - - /** - * Create a input stream from a Flume source. - * @param ssc StreamingContext object - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def createStream ( - ssc: StreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = { - createStream(ssc, hostname, port, storageLevel, false) - } - - /** - * Create a input stream from a Flume source. - * @param ssc StreamingContext object - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - * @param enableDecompression should netty server decompress input stream - */ - def createStream ( - ssc: StreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel, - enableDecompression: Boolean - ): ReceiverInputDStream[SparkFlumeEvent] = { - val inputStream = new FlumeInputDStream[SparkFlumeEvent]( - ssc, hostname, port, storageLevel, enableDecompression) - - inputStream - } - - /** - * Creates a input stream from a Flume source. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - */ - def createStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port) - } - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - */ - def createStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port, storageLevel, false) - } - - /** - * Creates a input stream from a Flume source. - * @param hostname Hostname of the slave machine to which the flume data will be sent - * @param port Port of the slave machine to which the flume data will be sent - * @param storageLevel Storage level to use for storing the received objects - * @param enableDecompression should netty server decompress input stream - */ - def createStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel, - enableDecompression: Boolean - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param hostname Address of the host on which the Spark Sink is running - * @param port Port of the host at which the Spark Sink is listening - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - ssc: StreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param addresses List of InetSocketAddresses representing the hosts to connect to. - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - ssc: StreamingContext, - addresses: Seq[InetSocketAddress], - storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(ssc, addresses, storageLevel, - DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * @param addresses List of InetSocketAddresses representing the hosts to connect to. - * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a - * single RPC call - * @param parallelism Number of concurrent requests this stream should send to the sink. Note - * that having a higher number of requests concurrently being pulled will - * result in this stream using more threads - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - ssc: StreamingContext, - addresses: Seq[InetSocketAddress], - storageLevel: StorageLevel, - maxBatchSize: Int, - parallelism: Int - ): ReceiverInputDStream[SparkFlumeEvent] = { - new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, - parallelism, storageLevel) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param hostname Hostname of the host on which the Spark Sink is running - * @param port Port of the host at which the Spark Sink is listening - */ - def createPollingStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param hostname Hostname of the host on which the Spark Sink is running - * @param port Port of the host at which the Spark Sink is listening - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 1000 events and run 5 threads to pull data. - * @param addresses List of InetSocketAddresses on which the Spark Sink is running. - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - jssc: JavaStreamingContext, - addresses: Array[InetSocketAddress], - storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(jssc, addresses, storageLevel, - DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) - } - - /** - * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - * This stream will poll the sink for data and will pull events as they are available. - * @param addresses List of InetSocketAddresses on which the Spark Sink is running - * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a - * single RPC call - * @param parallelism Number of concurrent requests this stream should send to the sink. Note - * that having a higher number of requests concurrently being pulled will - * result in this stream using more threads - * @param storageLevel Storage level to use for storing the received objects - */ - def createPollingStream( - jssc: JavaStreamingContext, - addresses: Array[InetSocketAddress], - storageLevel: StorageLevel, - maxBatchSize: Int, - parallelism: Int - ): JavaReceiverInputDStream[SparkFlumeEvent] = { - createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) - } -} - -/** - * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and - * function so that it can be easily instantiated and called from Python's FlumeUtils. - */ -private[flume] class FlumeUtilsPythonHelper { - - def createStream( - jssc: JavaStreamingContext, - hostname: String, - port: Int, - storageLevel: StorageLevel, - enableDecompression: Boolean - ): JavaPairDStream[Array[Byte], Array[Byte]] = { - val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression) - FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) - } - - def createPollingStream( - jssc: JavaStreamingContext, - hosts: JList[String], - ports: JList[Int], - storageLevel: StorageLevel, - maxBatchSize: Int, - parallelism: Int - ): JavaPairDStream[Array[Byte], Array[Byte]] = { - assert(hosts.size() == ports.size()) - val addresses = hosts.asScala.zip(ports.asScala).map { - case (host, port) => new InetSocketAddress(host, port) - } - val dstream = FlumeUtils.createPollingStream( - jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) - FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) - } - -} - -private object FlumeUtilsPythonHelper { - - private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = { - val byteStream = new ByteArrayOutputStream() - val output = new DataOutputStream(byteStream) - try { - output.writeInt(map.size) - map.asScala.foreach { kv => - PythonRDD.writeUTF(kv._1.toString, output) - PythonRDD.writeUTF(kv._2.toString, output) - } - byteStream.toByteArray - } - finally { - output.close() - } - } - - private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]): - JavaPairDStream[Array[Byte], Array[Byte]] = { - dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] { - override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = { - val event = sparkEvent.event - val byteBuffer = event.getBody - val body = new Array[Byte](byteBuffer.remaining()) - byteBuffer.get(body) - (stringMapToByteArray(event.getHeaders), body) - } - }) - } -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala deleted file mode 100644 index 1a96df6e94..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.nio.charset.StandardCharsets -import java.util.{Collections, List => JList, Map => JMap} -import java.util.concurrent._ - -import scala.collection.mutable.ArrayBuffer - -import org.apache.flume.event.EventBuilder -import org.apache.flume.Context -import org.apache.flume.channel.MemoryChannel -import org.apache.flume.conf.Configurables - -import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig} - -/** - * Share codes for Scala and Python unit tests - */ -private[flume] class PollingFlumeTestUtils { - - private val batchCount = 5 - val eventsPerBatch = 100 - private val totalEventsPerChannel = batchCount * eventsPerBatch - private val channelCapacity = 5000 - - def getTotalEvents: Int = totalEventsPerChannel * channels.size - - private val channels = new ArrayBuffer[MemoryChannel] - private val sinks = new ArrayBuffer[SparkSink] - - /** - * Start a sink and return the port of this sink - */ - def startSingleSink(): Int = { - channels.clear() - sinks.clear() - - // Start the channel and sink. - val context = new Context() - context.put("capacity", channelCapacity.toString) - context.put("transactionCapacity", "1000") - context.put("keep-alive", "0") - val channel = new MemoryChannel() - Configurables.configure(channel, context) - - val sink = new SparkSink() - context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") - context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) - Configurables.configure(sink, context) - sink.setChannel(channel) - sink.start() - - channels += (channel) - sinks += sink - - sink.getPort() - } - - /** - * Start 2 sinks and return the ports - */ - def startMultipleSinks(): Seq[Int] = { - channels.clear() - sinks.clear() - - // Start the channel and sink. - val context = new Context() - context.put("capacity", channelCapacity.toString) - context.put("transactionCapacity", "1000") - context.put("keep-alive", "0") - val channel = new MemoryChannel() - Configurables.configure(channel, context) - - val channel2 = new MemoryChannel() - Configurables.configure(channel2, context) - - val sink = new SparkSink() - context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") - context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) - Configurables.configure(sink, context) - sink.setChannel(channel) - sink.start() - - val sink2 = new SparkSink() - context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") - context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) - Configurables.configure(sink2, context) - sink2.setChannel(channel2) - sink2.start() - - sinks += sink - sinks += sink2 - channels += channel - channels += channel2 - - sinks.map(_.getPort()) - } - - /** - * Send data and wait until all data has been received - */ - def sendDatAndEnsureAllDataHasBeenReceived(): Unit = { - val executor = Executors.newCachedThreadPool() - val executorCompletion = new ExecutorCompletionService[Void](executor) - - val latch = new CountDownLatch(batchCount * channels.size) - sinks.foreach(_.countdownWhenBatchReceived(latch)) - - channels.foreach(channel => { - executorCompletion.submit(new TxnSubmitter(channel)) - }) - - for (i <- 0 until channels.size) { - executorCompletion.take() - } - - latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received. - } - - /** - * A Python-friendly method to assert the output - */ - def assertOutput( - outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = { - require(outputHeaders.size == outputBodies.size) - val eventSize = outputHeaders.size - if (eventSize != totalEventsPerChannel * channels.size) { - throw new AssertionError( - s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize") - } - var counter = 0 - for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) { - val eventBodyToVerify = s"${channels(k).getName}-$i" - val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header") - var found = false - var j = 0 - while (j < eventSize && !found) { - if (eventBodyToVerify == outputBodies.get(j) && - eventHeaderToVerify == outputHeaders.get(j)) { - found = true - counter += 1 - } - j += 1 - } - } - if (counter != totalEventsPerChannel * channels.size) { - throw new AssertionError( - s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter") - } - } - - def assertChannelsAreEmpty(): Unit = { - channels.foreach(assertChannelIsEmpty) - } - - private def assertChannelIsEmpty(channel: MemoryChannel): Unit = { - val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") - queueRemaining.setAccessible(true) - val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") - if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) { - throw new AssertionError(s"Channel ${channel.getName} is not empty") - } - } - - def close(): Unit = { - sinks.foreach(_.stop()) - sinks.clear() - channels.foreach(_.stop()) - channels.clear() - } - - private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] { - override def call(): Void = { - var t = 0 - for (i <- 0 until batchCount) { - val tx = channel.getTransaction - tx.begin() - for (j <- 0 until eventsPerBatch) { - channel.put(EventBuilder.withBody( - s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8), - Collections.singletonMap(s"test-$t", "header"))) - t += 1 - } - tx.commit() - tx.close() - Thread.sleep(500) // Allow some time for the events to reach - } - null - } - } - -} diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java deleted file mode 100644 index d31aa5f5c0..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Spark streaming receiver for Flume. - */ -package org.apache.spark.streaming.flume;
\ No newline at end of file diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala deleted file mode 100644 index 9bfab68c4b..0000000000 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -/** - * Spark streaming receiver for Flume. - */ -package object flume diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a042..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java deleted file mode 100644 index 79c5b91654..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume; - -import java.net.InetSocketAddress; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.LocalJavaStreamingContext; - -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; - -public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext { - @Test - public void testFlumeStream() { - // tests the API, does not actually test data receiving - InetSocketAddress[] addresses = new InetSocketAddress[] { - new InetSocketAddress("localhost", 12345) - }; - JavaReceiverInputDStream<SparkFlumeEvent> test1 = - FlumeUtils.createPollingStream(ssc, "localhost", 12345); - JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream( - ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream( - ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream( - ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); - } -} diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java deleted file mode 100644 index 3b5e0c7746..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.LocalJavaStreamingContext; - -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; - -public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { - @Test - public void testFlumeStream() { - // tests the API, does not actually test data receiving - JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); - JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, - StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345, - StorageLevel.MEMORY_AND_DISK_SER_2(), false); - } -} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties deleted file mode 100644 index 75e3b53a09..0000000000 --- a/external/flume/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN - diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala deleted file mode 100644 index c97a27ca7c..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.reflect.ClassTag - -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} -import org.apache.spark.util.Utils - -/** - * This is a output stream just for the testsuites. All the output is collected into a - * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. - * - * The buffer contains a sequence of RDD's, each containing a sequence of items - */ -class TestOutputStream[T: ClassTag](parent: DStream[T], - val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]()) - extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { - val collected = rdd.collect() - output.add(collected) - }, false) { - - // This is to clear the output buffer every it is read from a checkpoint - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { - ois.defaultReadObject() - output.clear() - } -} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala deleted file mode 100644 index 10dcbf98bc..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.net.InetSocketAddress -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{Logging, SparkConf, SparkFunSuite} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream} -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.util.{ManualClock, Utils} - -class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging { - - val maxAttempts = 5 - val batchDuration = Seconds(1) - - val conf = new SparkConf() - .setMaster("local[2]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") - - val utils = new PollingFlumeTestUtils - - test("flume polling test") { - testMultipleTimes(testFlumePolling) - } - - test("flume polling test multiple hosts") { - testMultipleTimes(testFlumePollingMultipleHost) - } - - /** - * Run the given test until no more java.net.BindException's are thrown. - * Do this only up to a certain attempt limit. - */ - private def testMultipleTimes(test: () => Unit): Unit = { - var testPassed = false - var attempt = 0 - while (!testPassed && attempt < maxAttempts) { - try { - test() - testPassed = true - } catch { - case e: Exception if Utils.isBindCollision(e) => - logWarning("Exception when running flume polling test: " + e) - attempt += 1 - } - } - assert(testPassed, s"Test failed after $attempt attempts!") - } - - private def testFlumePolling(): Unit = { - try { - val port = utils.startSingleSink() - - writeAndVerify(Seq(port)) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - private def testFlumePollingMultipleHost(): Unit = { - try { - val ports = utils.startMultipleSinks() - writeAndVerify(ports) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - def writeAndVerify(sinkPorts: Seq[Int]): Unit = { - // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) - val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port)) - val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, - utils.eventsPerBatch, 5) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - - ssc.start() - try { - utils.sendDatAndEnsureAllDataHasBeenReceived() - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - clock.advance(batchDuration.milliseconds) - - // The eventually is required to ensure that all data in the batch has been processed. - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val flattenOutput = outputQueue.asScala.toSeq.flatten - val headers = flattenOutput.map(_.event.getHeaders.asScala.map { - case (key, value) => (key.toString, value.toString) - }).map(_.asJava) - val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody)) - utils.assertOutput(headers.asJava, bodies.asJava) - } - } finally { - ssc.stop() - } - } - -} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala deleted file mode 100644 index 38208c6518..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.socket.SocketChannel -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.handler.codec.compression._ -import org.scalatest.{BeforeAndAfter, Matchers} -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{Logging, SparkConf, SparkFunSuite} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} - -class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") - var ssc: StreamingContext = null - - test("flume input stream") { - testFlumeStream(testCompression = false) - } - - test("flume input compressed stream") { - testFlumeStream(testCompression = true) - } - - /** Run test on flume stream */ - private def testFlumeStream(testCompression: Boolean): Unit = { - val input = (1 to 100).map { _.toString } - val utils = new FlumeTestUtils - try { - val outputQueue = startContext(utils.getTestPort(), testCompression) - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - utils.writeInput(input.asJava, testCompression) - } - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } - outputEvents.foreach { - event => - event.getHeaders.get("test") should be("header") - } - val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody)) - output should be (input) - } - } finally { - if (ssc != null) { - ssc.stop() - } - utils.close() - } - } - - /** Setup and start the streaming context */ - private def startContext( - testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = { - ssc = new StreamingContext(conf, Milliseconds(200)) - val flumeStream = FlumeUtils.createStream( - ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - ssc.start() - outputQueue - } - - /** Class to create socket channel with compression */ - private class CompressionChannelFactory(compressionLevel: Int) - extends NioClientSocketChannelFactory { - - override def newChannel(pipeline: ChannelPipeline): SocketChannel = { - val encoder = new ZlibEncoder(compressionLevel) - pipeline.addFirst("deflater", encoder) - pipeline.addFirst("inflater", new ZlibDecoder()) - super.newChannel(pipeline) - } - } -} |