diff options
Diffstat (limited to 'external/flume-sink')
10 files changed, 0 insertions, 1194 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml deleted file mode 100644 index e4effe158c..0000000000 --- a/external/flume-sink/pom.xml +++ /dev/null @@ -1,129 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-flume-sink_2.11</artifactId> - <properties> - <sbt.project.name>streaming-flume-sink</sbt.project.name> - </properties> - <packaging>jar</packaging> - <name>Spark Project External Flume Sink</name> - <url>http://spark.apache.org/</url> - - <dependencies> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> - <exclusions> - <!-- Guava is excluded to avoid its use in this module. --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <!-- - Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the - dependency. - --> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - <dependency> - <!-- Add Guava in test scope since flume actually needs it. --> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <!-- - Netty explicitly added in test as it has been excluded from - Flume dependency (to avoid runtime problems when running with - Spark) but unit tests need it. Version of Netty on which - Flume 1.4.0 depends on is "3.4.0.Final" . - --> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - <version>3.4.0.Final</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <version>${avro.version}</version> - <configuration> - <!-- Generate the output in the same directory as the sbt-avro-plugin --> - <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory> - </configuration> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>idl-protocol</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <!-- Disable all relocations defined in the parent pom. --> - <relocations combine.self="override" /> - </configuration> - </plugin> - </plugins> - </build> -</project> diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl deleted file mode 100644 index 8806e863ac..0000000000 --- a/external/flume-sink/src/main/avro/sparkflume.avdl +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -@namespace("org.apache.spark.streaming.flume.sink") - -protocol SparkFlumeProtocol { - - record SparkSinkEvent { - map<string> headers; - bytes body; - } - - record EventBatch { - string errorMsg = ""; // If this is empty it is a valid message, else it represents an error - string sequenceNumber; - array<SparkSinkEvent> events; - } - - EventBatch getEventBatch (int n); - - void ack (string sequenceNumber); - - void nack (string sequenceNumber); -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala deleted file mode 100644 index 09d3fe91e4..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import org.slf4j.{Logger, LoggerFactory} - -/** - * Copy of the org.apache.spark.Logging for being used in the Spark Sink. - * The org.apache.spark.Logging is not used so that all of Spark is not brought - * in as a dependency. - */ -private[sink] trait Logging { - // Make the log field transient so that objects with Logging can - // be serialized and used on another machine - @transient private var _log: Logger = null - - // Method to get or create the logger for this object - protected def log: Logger = { - if (_log == null) { - initializeIfNecessary() - var className = this.getClass.getName - // Ignore trailing $'s in the class names for Scala objects - if (className.endsWith("$")) { - className = className.substring(0, className.length - 1) - } - _log = LoggerFactory.getLogger(className) - } - _log - } - - // Log methods that take only a String - protected def logInfo(msg: => String) { - if (log.isInfoEnabled) log.info(msg) - } - - protected def logDebug(msg: => String) { - if (log.isDebugEnabled) log.debug(msg) - } - - protected def logTrace(msg: => String) { - if (log.isTraceEnabled) log.trace(msg) - } - - protected def logWarning(msg: => String) { - if (log.isWarnEnabled) log.warn(msg) - } - - protected def logError(msg: => String) { - if (log.isErrorEnabled) log.error(msg) - } - - // Log methods that take Throwables (Exceptions/Errors) too - protected def logInfo(msg: => String, throwable: Throwable) { - if (log.isInfoEnabled) log.info(msg, throwable) - } - - protected def logDebug(msg: => String, throwable: Throwable) { - if (log.isDebugEnabled) log.debug(msg, throwable) - } - - protected def logTrace(msg: => String, throwable: Throwable) { - if (log.isTraceEnabled) log.trace(msg, throwable) - } - - protected def logWarning(msg: => String, throwable: Throwable) { - if (log.isWarnEnabled) log.warn(msg, throwable) - } - - protected def logError(msg: => String, throwable: Throwable) { - if (log.isErrorEnabled) log.error(msg, throwable) - } - - protected def isTraceEnabled(): Boolean = { - log.isTraceEnabled - } - - private def initializeIfNecessary() { - if (!Logging.initialized) { - Logging.initLock.synchronized { - if (!Logging.initialized) { - initializeLogging() - } - } - } - } - - private def initializeLogging() { - Logging.initialized = true - - // Force a call into slf4j to initialize it. Avoids this happening from multiple threads - // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html - log - } -} - -private[sink] object Logging { - @volatile private var initialized = false - val initLock = new Object() - try { - // We use reflection here to handle the case where users remove the - // slf4j-to-jul bridge order to route their logs to JUL. - // scalastyle:off classforname - val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler") - // scalastyle:on classforname - bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null) - val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean] - if (!installed) { - bridgeClass.getMethod("install").invoke(null) - } - } catch { - case e: ClassNotFoundException => // can't log anything yet so just fail silently - } -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala deleted file mode 100644 index 719fca0938..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.util.UUID -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong - -import scala.collection.mutable - -import org.apache.flume.Channel - -/** - * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process - * requests. Each getEvents, ack and nack call is forwarded to an instance of this class. - * @param threads Number of threads to use to process requests. - * @param channel The channel that the sink pulls events from - * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark - * is rolled back. - */ -// Flume forces transactions to be thread-local. So each transaction *must* be committed, or -// rolled back from the thread it was originally created in. So each getEvents call from Spark -// creates a TransactionProcessor which runs in a new thread, in which the transaction is created -// and events are pulled off the channel. Once the events are sent to spark, -// that thread is blocked and the TransactionProcessor is saved in a map, -// until an ACK or NACK comes back or the transaction times out (after the specified timeout). -// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then -// unblocked, at which point the transaction is committed or rolled back. - -private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, - val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { - val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, - new SparkSinkThreadFactory("Spark Sink Processor Thread - %d"))) - // Protected by `sequenceNumberToProcessor` - private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]() - // This sink will not persist sequence numbers and reuses them if it gets restarted. - // So it is possible to commit a transaction which may have been meant for the sink before the - // restart. - // Since the new txn may not have the same sequence number we must guard against accidentally - // committing a new transaction. To reduce the probability of that happening a random string is - // prepended to the sequence number. Does not change for life of sink - private val seqBase = UUID.randomUUID().toString.substring(0, 8) - private val seqCounter = new AtomicLong(0) - - // Protected by `sequenceNumberToProcessor` - private var stopped = false - - @volatile private var isTest = false - private var testLatch: CountDownLatch = null - - /** - * Returns a bunch of events to Spark over Avro RPC. - * @param n Maximum number of events to return in a batch - * @return [[EventBatch]] instance that has a sequence number and an array of at most n events - */ - override def getEventBatch(n: Int): EventBatch = { - logDebug("Got getEventBatch call from Spark.") - val sequenceNumber = seqBase + seqCounter.incrementAndGet() - createProcessor(sequenceNumber, n) match { - case Some(processor) => - transactionExecutorOpt.foreach(_.submit(processor)) - // Wait until a batch is available - will be an error if error message is non-empty - val batch = processor.getEventBatch - if (SparkSinkUtils.isErrorBatch(batch)) { - // Remove the processor if it is an error batch since no ACK is sent. - removeAndGetProcessor(sequenceNumber) - logWarning("Received an error batch - no events were received from channel! ") - } - batch - case None => - new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList()) - } - } - - private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = { - sequenceNumberToProcessor.synchronized { - if (!stopped) { - val processor = new TransactionProcessor( - channel, seq, n, transactionTimeout, backOffInterval, this) - sequenceNumberToProcessor.put(seq, processor) - if (isTest) { - processor.countDownWhenBatchAcked(testLatch) - } - Some(processor) - } else { - None - } - } - } - - /** - * Called by Spark to indicate successful commit of a batch - * @param sequenceNumber The sequence number of the event batch that was successful - */ - override def ack(sequenceNumber: CharSequence): Void = { - logDebug("Received Ack for batch with sequence number: " + sequenceNumber) - completeTransaction(sequenceNumber, success = true) - null - } - - /** - * Called by Spark to indicate failed commit of a batch - * @param sequenceNumber The sequence number of the event batch that failed - * @return - */ - override def nack(sequenceNumber: CharSequence): Void = { - completeTransaction(sequenceNumber, success = false) - logInfo("Spark failed to commit transaction. Will reattempt events.") - null - } - - /** - * Helper method to commit or rollback a transaction. - * @param sequenceNumber The sequence number of the batch that was completed - * @param success Whether the batch was successful or not. - */ - private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { - removeAndGetProcessor(sequenceNumber).foreach(processor => { - processor.batchProcessed(success) - }) - } - - /** - * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. - * @param sequenceNumber - * @return An `Option` of the transaction processor for the corresponding batch. Note that this - * instance is no longer tracked and the caller is responsible for that txn processor. - */ - private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): - Option[TransactionProcessor] = { - sequenceNumberToProcessor.synchronized { - sequenceNumberToProcessor.remove(sequenceNumber.toString) - } - } - - private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) { - testLatch = latch - isTest = true - } - - /** - * Shuts down the executor used to process transactions. - */ - def shutdown() { - logInfo("Shutting down Spark Avro Callback Handler") - sequenceNumberToProcessor.synchronized { - stopped = true - sequenceNumberToProcessor.values.foreach(_.shutdown()) - } - transactionExecutorOpt.foreach(_.shutdownNow()) - } -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala deleted file mode 100644 index 14dffb15fe..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.net.InetSocketAddress -import java.util.concurrent._ - -import org.apache.avro.ipc.NettyServer -import org.apache.avro.ipc.specific.SpecificResponder -import org.apache.flume.Context -import org.apache.flume.Sink.Status -import org.apache.flume.conf.{Configurable, ConfigurationException} -import org.apache.flume.sink.AbstractSink - -/** - * A sink that uses Avro RPC to run a server that can be polled by Spark's - * FlumePollingInputDStream. This sink has the following configuration parameters: - * - * hostname - The hostname to bind to. Default: 0.0.0.0 - * port - The port to bind to. (No default - mandatory) - * timeout - Time in seconds after which a transaction is rolled back, - * if an ACK is not received from Spark within that time - * threads - Number of threads to use to receive requests from Spark (Default: 10) - * - * This sink is unlike other Flume sinks in the sense that it does not push data, - * instead the process method in this sink simply blocks the SinkRunner the first time it is - * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol. - * - * Each time a getEventBatch call comes, creates a transaction and reads events - * from the channel. When enough events are read, the events are sent to the Spark receiver and - * the thread itself is blocked and a reference to it saved off. - * - * When the ack for that batch is received, - * the thread which created the transaction is is retrieved and it commits the transaction with the - * channel from the same thread it was originally created in (since Flume transactions are - * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack - * is received within the specified timeout, the transaction is rolled back too. If an ack comes - * after that, it is simply ignored and the events get re-sent. - * - */ - -class SparkSink extends AbstractSink with Logging with Configurable { - - // Size of the pool to use for holding transaction processors. - private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS - - // Timeout for each transaction. If spark does not respond in this much time, - // rollback the transaction - private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT - - // Address info to bind on - private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME - private var port: Int = 0 - - private var backOffInterval: Int = 200 - - // Handle to the server - private var serverOpt: Option[NettyServer] = None - - // The handler that handles the callback from Avro - private var handler: Option[SparkAvroCallbackHandler] = None - - // Latch that blocks off the Flume framework from wasting 1 thread. - private val blockingLatch = new CountDownLatch(1) - - override def start() { - logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " + - hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " + - transactionTimeout + ".") - handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout, - backOffInterval)) - val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get) - // Using the constructor that takes specific thread-pools requires bringing in netty - // dependencies which are being excluded in the build. In practice, - // Netty dependencies are already available on the JVM as Flume would have pulled them in. - serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port))) - serverOpt.foreach(server => { - logInfo("Starting Avro server for sink: " + getName) - server.start() - }) - super.start() - } - - override def stop() { - logInfo("Stopping Spark Sink: " + getName) - handler.foreach(callbackHandler => { - callbackHandler.shutdown() - }) - serverOpt.foreach(server => { - logInfo("Stopping Avro Server for sink: " + getName) - server.close() - server.join() - }) - blockingLatch.countDown() - super.stop() - } - - override def configure(ctx: Context) { - import SparkSinkConfig._ - hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME) - port = Option(ctx.getInteger(CONF_PORT)). - getOrElse(throw new ConfigurationException("The port to bind to must be specified")) - poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS) - transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT) - backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL) - logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " + - "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " + - "backoffInterval: " + backOffInterval) - } - - override def process(): Status = { - // This method is called in a loop by the Flume framework - block it until the sink is - // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is - // being shut down. - logInfo("Blocking Sink Runner, sink will continue to run..") - blockingLatch.await() - Status.BACKOFF - } - - private[flume] def getPort(): Int = { - serverOpt - .map(_.getPort) - .getOrElse( - throw new RuntimeException("Server was not started!") - ) - } - - /** - * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each - * batch is received. The test can simply call await on this latch till the expected number of - * batches are received. - * @param latch - */ - private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) { - handler.foreach(_.countDownWhenBatchAcked(latch)) - } -} - -/** - * Configuration parameters and their defaults. - */ -private[flume] -object SparkSinkConfig { - val THREADS = "threads" - val DEFAULT_THREADS = 10 - - val CONF_TRANSACTION_TIMEOUT = "timeout" - val DEFAULT_TRANSACTION_TIMEOUT = 60 - - val CONF_HOSTNAME = "hostname" - val DEFAULT_HOSTNAME = "0.0.0.0" - - val CONF_PORT = "port" - - val CONF_BACKOFF_INTERVAL = "backoffInterval" - val DEFAULT_BACKOFF_INTERVAL = 200 -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala deleted file mode 100644 index 845fc8debd..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.util.concurrent.ThreadFactory -import java.util.concurrent.atomic.AtomicLong - -/** - * Thread factory that generates daemon threads with a specified name format. - */ -private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory { - - private val threadId = new AtomicLong() - - override def newThread(r: Runnable): Thread = { - val t = new Thread(r, nameFormat.format(threadId.incrementAndGet())) - t.setDaemon(true) - t - } - -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala deleted file mode 100644 index 47c0e294d6..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -private[flume] object SparkSinkUtils { - /** - * This method determines if this batch represents an error or not. - * @param batch - The batch to check - * @return - true if the batch represents an error - */ - def isErrorBatch(batch: EventBatch): Boolean = { - !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch. - } -} diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala deleted file mode 100644 index b15c2097e5..0000000000 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.nio.ByteBuffer -import java.util -import java.util.concurrent.{Callable, CountDownLatch, TimeUnit} - -import scala.util.control.Breaks - -import org.apache.flume.{Channel, Transaction} - -// Flume forces transactions to be thread-local (horrible, I know!) -// So the sink basically spawns a new thread to pull the events out within a transaction. -// The thread fills in the event batch object that is set before the thread is scheduled. -// After filling it in, the thread waits on a condition - which is released only -// when the success message comes back for the specific sequence number for that event batch. -/** - * This class represents a transaction on the Flume channel. This class runs a separate thread - * which owns the transaction. The thread is blocked until the success call for that transaction - * comes back with an ACK or NACK. - * @param channel The channel from which to pull events - * @param seqNum The sequence number to use for the transaction. Must be unique - * @param maxBatchSize The maximum number of events to process per batch - * @param transactionTimeout Time in seconds after which a transaction must be rolled back - * without waiting for an ACK from Spark - * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts - */ -private class TransactionProcessor(val channel: Channel, val seqNum: String, - var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int, - val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging { - - // If a real batch is not returned, we always have to return an error batch. - @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "", - util.Collections.emptyList()) - - // Synchronization primitives - val batchGeneratedLatch = new CountDownLatch(1) - val batchAckLatch = new CountDownLatch(1) - - // Sanity check to ensure we don't loop like crazy - val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2 - - // OK to use volatile, since the change would only make this true (otherwise it will be - // changed to false - we never apply a negation operation to this) - which means the transaction - // succeeded. - @volatile private var batchSuccess = false - - @volatile private var stopped = false - - @volatile private var isTest = false - - private var testLatch: CountDownLatch = null - - // The transaction that this processor would handle - var txOpt: Option[Transaction] = None - - /** - * Get an event batch from the channel. This method will block until a batch of events is - * available from the channel. If no events are available after a large number of attempts of - * polling the channel, this method will return an [[EventBatch]] with a non-empty error message - * - * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a - * maximum of maxBatchSize events - */ - def getEventBatch: EventBatch = { - batchGeneratedLatch.await() - eventBatch - } - - /** - * This method is to be called by the sink when it receives an ACK or NACK from Spark. This - * method is a no-op if it is called after transactionTimeout has expired since - * getEventBatch returned a batch of events. - * @param success True if an ACK was received and the transaction should be committed, else false. - */ - def batchProcessed(success: Boolean) { - logDebug("Batch processed for sequence number: " + seqNum) - batchSuccess = success - batchAckLatch.countDown() - } - - private[flume] def shutdown(): Unit = { - logDebug("Shutting down transaction processor") - stopped = true - } - - /** - * Populates events into the event batch. If the batch cannot be populated, - * this method will not set the events into the event batch, but it sets an error message. - */ - private def populateEvents() { - try { - txOpt = Option(channel.getTransaction) - if(txOpt.isEmpty) { - eventBatch.setErrorMsg("Something went wrong. Channel was " + - "unable to create a transaction!") - } - txOpt.foreach(tx => { - tx.begin() - val events = new util.ArrayList[SparkSinkEvent](maxBatchSize) - val loop = new Breaks - var gotEventsInThisTxn = false - var loopCounter: Int = 0 - loop.breakable { - while (!stopped && events.size() < maxBatchSize - && loopCounter < totalAttemptsToRemoveFromChannel) { - loopCounter += 1 - Option(channel.take()) match { - case Some(event) => - events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders), - ByteBuffer.wrap(event.getBody))) - gotEventsInThisTxn = true - case None => - if (!gotEventsInThisTxn && !stopped) { - logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" + - " the current transaction") - TimeUnit.MILLISECONDS.sleep(backOffInterval) - } else { - loop.break() - } - } - } - } - if (!gotEventsInThisTxn && !stopped) { - val msg = "Tried several times, " + - "but did not get any events from the channel!" - logWarning(msg) - eventBatch.setErrorMsg(msg) - } else { - // At this point, the events are available, so fill them into the event batch - eventBatch = new EventBatch("", seqNum, events) - } - }) - } catch { - case interrupted: InterruptedException => - // Don't pollute logs if the InterruptedException came from this being stopped - if (!stopped) { - logWarning("Error while processing transaction.", interrupted) - } - case e: Exception => - logWarning("Error while processing transaction.", e) - eventBatch.setErrorMsg(e.getMessage) - try { - txOpt.foreach(tx => { - rollbackAndClose(tx, close = true) - }) - } finally { - txOpt = None - } - } finally { - batchGeneratedLatch.countDown() - } - } - - /** - * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in - * this method commits the transaction with the channel. If the ACK does not come in within - * that time or a NACK comes in, this method rolls back the transaction. - */ - private def processAckOrNack() { - batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS) - txOpt.foreach(tx => { - if (batchSuccess) { - try { - logDebug("Committing transaction") - tx.commit() - } catch { - case e: Exception => - logWarning("Error while attempting to commit transaction. Transaction will be rolled " + - "back", e) - rollbackAndClose(tx, close = false) // tx will be closed later anyway - } finally { - tx.close() - if (isTest) { - testLatch.countDown() - } - } - } else { - logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.") - rollbackAndClose(tx, close = true) - // This might have been due to timeout or a NACK. Either way the following call does not - // cause issues. This is required to ensure the TransactionProcessor instance is not leaked - parent.removeAndGetProcessor(seqNum) - } - }) - } - - /** - * Helper method to rollback and optionally close a transaction - * @param tx The transaction to rollback - * @param close Whether the transaction should be closed or not after rolling back - */ - private def rollbackAndClose(tx: Transaction, close: Boolean) { - try { - logWarning("Spark was unable to successfully process the events. Transaction is being " + - "rolled back.") - tx.rollback() - } catch { - case e: Exception => - logError("Error rolling back transaction. Rollback may have failed!", e) - } finally { - if (close) { - tx.close() - } - } - } - - /** - * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence] - * @param inMap The map to be converted - * @return The converted map - */ - private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence, - CharSequence] = { - val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size()) - charSeqMap.putAll(inMap) - charSeqMap - } - - /** - * When the thread is started it sets as many events as the batch size or less (if enough - * events aren't available) into the eventBatch and object and lets any threads waiting on the - * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in, - * or for a specified timeout and commits or rolls back the transaction. - * @return - */ - override def call(): Void = { - populateEvents() - processAckOrNack() - null - } - - private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) { - testLatch = latch - isTest = true - } -} diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties deleted file mode 100644 index 42df8792f1..0000000000 --- a/external/flume-sink/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file streaming/target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN - diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala deleted file mode 100644 index e8ca1e7163..0000000000 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.flume.sink - -import java.net.InetSocketAddress -import java.nio.charset.StandardCharsets -import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -import org.apache.avro.ipc.NettyTransceiver -import org.apache.avro.ipc.specific.SpecificRequestor -import org.apache.flume.Context -import org.apache.flume.channel.MemoryChannel -import org.apache.flume.event.EventBuilder -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory - -// Due to MNG-1378, there is not a way to include test dependencies transitively. -// We cannot include Spark core tests as a dependency here because it depends on -// Spark core main, which has too many dependencies to require here manually. -// For this reason, we continue to use FunSuite and ignore the scalastyle checks -// that fail if this is detected. -// scalastyle:off -import org.scalatest.FunSuite - -class SparkSinkSuite extends FunSuite { -// scalastyle:on - - val eventsPerBatch = 1000 - val channelCapacity = 5000 - - test("Success with ack") { - val (channel, sink, latch) = initializeChannelAndSink() - channel.start() - sink.start() - - putEvents(channel, eventsPerBatch) - - val port = sink.getPort - val address = new InetSocketAddress("0.0.0.0", port) - - val (transceiver, client) = getTransceiverAndClient(address, 1)(0) - val events = client.getEventBatch(1000) - client.ack(events.getSequenceNumber) - assert(events.getEvents.size() === 1000) - latch.await(1, TimeUnit.SECONDS) - assertChannelIsEmpty(channel) - sink.stop() - channel.stop() - transceiver.close() - } - - test("Failure with nack") { - val (channel, sink, latch) = initializeChannelAndSink() - channel.start() - sink.start() - putEvents(channel, eventsPerBatch) - - val port = sink.getPort - val address = new InetSocketAddress("0.0.0.0", port) - - val (transceiver, client) = getTransceiverAndClient(address, 1)(0) - val events = client.getEventBatch(1000) - assert(events.getEvents.size() === 1000) - client.nack(events.getSequenceNumber) - latch.await(1, TimeUnit.SECONDS) - assert(availableChannelSlots(channel) === 4000) - sink.stop() - channel.stop() - transceiver.close() - } - - test("Failure with timeout") { - val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig - .CONF_TRANSACTION_TIMEOUT -> 1.toString)) - channel.start() - sink.start() - putEvents(channel, eventsPerBatch) - val port = sink.getPort - val address = new InetSocketAddress("0.0.0.0", port) - - val (transceiver, client) = getTransceiverAndClient(address, 1)(0) - val events = client.getEventBatch(1000) - assert(events.getEvents.size() === 1000) - latch.await(1, TimeUnit.SECONDS) - assert(availableChannelSlots(channel) === 4000) - sink.stop() - channel.stop() - transceiver.close() - } - - test("Multiple consumers") { - testMultipleConsumers(failSome = false) - } - - test("Multiple consumers with some failures") { - testMultipleConsumers(failSome = true) - } - - def testMultipleConsumers(failSome: Boolean): Unit = { - implicit val executorContext = ExecutionContext - .fromExecutorService(Executors.newFixedThreadPool(5)) - val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5) - channel.start() - sink.start() - (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch)) - val port = sink.getPort - val address = new InetSocketAddress("0.0.0.0", port) - val transceiversAndClients = getTransceiverAndClient(address, 5) - val batchCounter = new CountDownLatch(5) - val counter = new AtomicInteger(0) - transceiversAndClients.foreach(x => { - Future { - val client = x._2 - val events = client.getEventBatch(1000) - if (!failSome || counter.getAndIncrement() % 2 == 0) { - client.ack(events.getSequenceNumber) - } else { - client.nack(events.getSequenceNumber) - throw new RuntimeException("Sending NACK for failure!") - } - events - }.onComplete { - case Success(events) => - assert(events.getEvents.size() === 1000) - batchCounter.countDown() - case Failure(t) => - // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout - batchCounter.countDown() - } - }) - batchCounter.await() - latch.await(1, TimeUnit.SECONDS) - executorContext.shutdown() - if(failSome) { - assert(availableChannelSlots(channel) === 3000) - } else { - assertChannelIsEmpty(channel) - } - sink.stop() - channel.stop() - transceiversAndClients.foreach(x => x._1.close()) - } - - private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty, - batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = { - val channel = new MemoryChannel() - val channelContext = new Context() - - channelContext.put("capacity", channelCapacity.toString) - channelContext.put("transactionCapacity", 1000.toString) - channelContext.put("keep-alive", 0.toString) - channelContext.putAll(overrides.asJava) - channel.setName(scala.util.Random.nextString(10)) - channel.configure(channelContext) - - val sink = new SparkSink() - val sinkContext = new Context() - sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0") - sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString) - sink.configure(sinkContext) - sink.setChannel(channel) - val latch = new CountDownLatch(batchCounter) - sink.countdownWhenBatchReceived(latch) - (channel, sink, latch) - } - - private def putEvents(ch: MemoryChannel, count: Int): Unit = { - val tx = ch.getTransaction - tx.begin() - (1 to count).foreach(x => - ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8)))) - tx.commit() - tx.close() - } - - private def getTransceiverAndClient(address: InetSocketAddress, - count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = { - - (1 to count).map(_ => { - lazy val channelFactoryExecutor = Executors.newCachedThreadPool( - new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d")) - lazy val channelFactory = - new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) - val transceiver = new NettyTransceiver(address, channelFactory) - val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) - (transceiver, client) - }) - } - - private def assertChannelIsEmpty(channel: MemoryChannel): Unit = { - assert(availableChannelSlots(channel) === channelCapacity) - } - - private def availableChannelSlots(channel: MemoryChannel): Int = { - val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") - queueRemaining.setAccessible(true) - val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") - m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] - } -} |