aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-25 17:37:16 -0700
committerReynold Xin <rxin@databricks.com>2016-03-25 17:37:16 -0700
commit24587ce433aa30f30a5d1ed6566365f24c222a27 (patch)
treeb1ff8ffa17b643d3e833be33debecf209d20ff6d /external/flume-sink
parent54d13bed87fcf2968f77e1f1153e85184ec91d78 (diff)
downloadspark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.gz
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.bz2
spark-24587ce433aa30f30a5d1ed6566365f24c222a27.zip
[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request? This PR moves flume back to Spark as per the discussion in the dev mail-list. ## How was this patch tested? Existing Jenkins tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11895 from zsxwing/move-flume-back.
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/pom.xml129
-rw-r--r--external/flume-sink/src/main/avro/sparkflume.avdl40
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala127
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala166
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala171
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala35
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala28
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala252
-rw-r--r--external/flume-sink/src/test/resources/log4j.properties28
-rw-r--r--external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala218
10 files changed, 1194 insertions, 0 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000000..e4effe158c
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume-sink_2.11</artifactId>
+ <properties>
+ <sbt.project.name>streaming-flume-sink</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume Sink</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <exclusions>
+ <!-- Guava is excluded to avoid its use in this module. -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <!--
+ Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
+ dependency.
+ -->
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </dependency>
+ <dependency>
+ <!-- Add Guava in test scope since flume actually needs it. -->
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <!--
+ Netty explicitly added in test as it has been excluded from
+ Flume dependency (to avoid runtime problems when running with
+ Spark) but unit tests need it. Version of Netty on which
+ Flume 1.4.0 depends on is "3.4.0.Final" .
+ -->
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.4.0.Final</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <configuration>
+ <!-- Generate the output in the same directory as the sbt-avro-plugin -->
+ <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <!-- Disable all relocations defined in the parent pom. -->
+ <relocations combine.self="override" />
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000000..8806e863ac
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.streaming.flume.sink")
+
+protocol SparkFlumeProtocol {
+
+ record SparkSinkEvent {
+ map<string> headers;
+ bytes body;
+ }
+
+ record EventBatch {
+ string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
+ string sequenceNumber;
+ array<SparkSinkEvent> events;
+ }
+
+ EventBatch getEventBatch (int n);
+
+ void ack (string sequenceNumber);
+
+ void nack (string sequenceNumber);
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
new file mode 100644
index 0000000000..09d3fe91e4
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
+ * The org.apache.spark.Logging is not used so that all of Spark is not brought
+ * in as a dependency.
+ */
+private[sink] trait Logging {
+ // Make the log field transient so that objects with Logging can
+ // be serialized and used on another machine
+ @transient private var _log: Logger = null
+
+ // Method to get or create the logger for this object
+ protected def log: Logger = {
+ if (_log == null) {
+ initializeIfNecessary()
+ var className = this.getClass.getName
+ // Ignore trailing $'s in the class names for Scala objects
+ if (className.endsWith("$")) {
+ className = className.substring(0, className.length - 1)
+ }
+ _log = LoggerFactory.getLogger(className)
+ }
+ _log
+ }
+
+ // Log methods that take only a String
+ protected def logInfo(msg: => String) {
+ if (log.isInfoEnabled) log.info(msg)
+ }
+
+ protected def logDebug(msg: => String) {
+ if (log.isDebugEnabled) log.debug(msg)
+ }
+
+ protected def logTrace(msg: => String) {
+ if (log.isTraceEnabled) log.trace(msg)
+ }
+
+ protected def logWarning(msg: => String) {
+ if (log.isWarnEnabled) log.warn(msg)
+ }
+
+ protected def logError(msg: => String) {
+ if (log.isErrorEnabled) log.error(msg)
+ }
+
+ // Log methods that take Throwables (Exceptions/Errors) too
+ protected def logInfo(msg: => String, throwable: Throwable) {
+ if (log.isInfoEnabled) log.info(msg, throwable)
+ }
+
+ protected def logDebug(msg: => String, throwable: Throwable) {
+ if (log.isDebugEnabled) log.debug(msg, throwable)
+ }
+
+ protected def logTrace(msg: => String, throwable: Throwable) {
+ if (log.isTraceEnabled) log.trace(msg, throwable)
+ }
+
+ protected def logWarning(msg: => String, throwable: Throwable) {
+ if (log.isWarnEnabled) log.warn(msg, throwable)
+ }
+
+ protected def logError(msg: => String, throwable: Throwable) {
+ if (log.isErrorEnabled) log.error(msg, throwable)
+ }
+
+ protected def isTraceEnabled(): Boolean = {
+ log.isTraceEnabled
+ }
+
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from multiple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+private[sink] object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
+ try {
+ // We use reflection here to handle the case where users remove the
+ // slf4j-to-jul bridge order to route their logs to JUL.
+ // scalastyle:off classforname
+ val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+ // scalastyle:on classforname
+ bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
+ val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
+ if (!installed) {
+ bridgeClass.getMethod("install").invoke(null)
+ }
+ } catch {
+ case e: ClassNotFoundException => // can't log anything yet so just fail silently
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
new file mode 100644
index 0000000000..719fca0938
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.UUID
+import java.util.concurrent.{CountDownLatch, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.flume.Channel
+
+/**
+ * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
+ * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
+ * @param threads Number of threads to use to process requests.
+ * @param channel The channel that the sink pulls events from
+ * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
+ * is rolled back.
+ */
+// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
+// rolled back from the thread it was originally created in. So each getEvents call from Spark
+// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
+// and events are pulled off the channel. Once the events are sent to spark,
+// that thread is blocked and the TransactionProcessor is saved in a map,
+// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
+// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
+// unblocked, at which point the transaction is committed or rolled back.
+
+private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
+ val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
+ val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
+ new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")))
+ // Protected by `sequenceNumberToProcessor`
+ private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
+ // This sink will not persist sequence numbers and reuses them if it gets restarted.
+ // So it is possible to commit a transaction which may have been meant for the sink before the
+ // restart.
+ // Since the new txn may not have the same sequence number we must guard against accidentally
+ // committing a new transaction. To reduce the probability of that happening a random string is
+ // prepended to the sequence number. Does not change for life of sink
+ private val seqBase = UUID.randomUUID().toString.substring(0, 8)
+ private val seqCounter = new AtomicLong(0)
+
+ // Protected by `sequenceNumberToProcessor`
+ private var stopped = false
+
+ @volatile private var isTest = false
+ private var testLatch: CountDownLatch = null
+
+ /**
+ * Returns a bunch of events to Spark over Avro RPC.
+ * @param n Maximum number of events to return in a batch
+ * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
+ */
+ override def getEventBatch(n: Int): EventBatch = {
+ logDebug("Got getEventBatch call from Spark.")
+ val sequenceNumber = seqBase + seqCounter.incrementAndGet()
+ createProcessor(sequenceNumber, n) match {
+ case Some(processor) =>
+ transactionExecutorOpt.foreach(_.submit(processor))
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (SparkSinkUtils.isErrorBatch(batch)) {
+ // Remove the processor if it is an error batch since no ACK is sent.
+ removeAndGetProcessor(sequenceNumber)
+ logWarning("Received an error batch - no events were received from channel! ")
+ }
+ batch
+ case None =>
+ new EventBatch("Spark sink has been stopped!", "", java.util.Collections.emptyList())
+ }
+ }
+
+ private def createProcessor(seq: String, n: Int): Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ if (!stopped) {
+ val processor = new TransactionProcessor(
+ channel, seq, n, transactionTimeout, backOffInterval, this)
+ sequenceNumberToProcessor.put(seq, processor)
+ if (isTest) {
+ processor.countDownWhenBatchAcked(testLatch)
+ }
+ Some(processor)
+ } else {
+ None
+ }
+ }
+ }
+
+ /**
+ * Called by Spark to indicate successful commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that was successful
+ */
+ override def ack(sequenceNumber: CharSequence): Void = {
+ logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
+ completeTransaction(sequenceNumber, success = true)
+ null
+ }
+
+ /**
+ * Called by Spark to indicate failed commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that failed
+ * @return
+ */
+ override def nack(sequenceNumber: CharSequence): Void = {
+ completeTransaction(sequenceNumber, success = false)
+ logInfo("Spark failed to commit transaction. Will reattempt events.")
+ null
+ }
+
+ /**
+ * Helper method to commit or rollback a transaction.
+ * @param sequenceNumber The sequence number of the batch that was completed
+ * @param success Whether the batch was successful or not.
+ */
+ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+ removeAndGetProcessor(sequenceNumber).foreach(processor => {
+ processor.batchProcessed(success)
+ })
+ }
+
+ /**
+ * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
+ * @param sequenceNumber
+ * @return An `Option` of the transaction processor for the corresponding batch. Note that this
+ * instance is no longer tracked and the caller is responsible for that txn processor.
+ */
+ private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+ Option[TransactionProcessor] = {
+ sequenceNumberToProcessor.synchronized {
+ sequenceNumberToProcessor.remove(sequenceNumber.toString)
+ }
+ }
+
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
+
+ /**
+ * Shuts down the executor used to process transactions.
+ */
+ def shutdown() {
+ logInfo("Shutting down Spark Avro Callback Handler")
+ sequenceNumberToProcessor.synchronized {
+ stopped = true
+ sequenceNumberToProcessor.values.foreach(_.shutdown())
+ }
+ transactionExecutorOpt.foreach(_.shutdownNow())
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
new file mode 100644
index 0000000000..14dffb15fe
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.Context
+import org.apache.flume.Sink.Status
+import org.apache.flume.conf.{Configurable, ConfigurationException}
+import org.apache.flume.sink.AbstractSink
+
+/**
+ * A sink that uses Avro RPC to run a server that can be polled by Spark's
+ * FlumePollingInputDStream. This sink has the following configuration parameters:
+ *
+ * hostname - The hostname to bind to. Default: 0.0.0.0
+ * port - The port to bind to. (No default - mandatory)
+ * timeout - Time in seconds after which a transaction is rolled back,
+ * if an ACK is not received from Spark within that time
+ * threads - Number of threads to use to receive requests from Spark (Default: 10)
+ *
+ * This sink is unlike other Flume sinks in the sense that it does not push data,
+ * instead the process method in this sink simply blocks the SinkRunner the first time it is
+ * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
+ *
+ * Each time a getEventBatch call comes, creates a transaction and reads events
+ * from the channel. When enough events are read, the events are sent to the Spark receiver and
+ * the thread itself is blocked and a reference to it saved off.
+ *
+ * When the ack for that batch is received,
+ * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * channel from the same thread it was originally created in (since Flume transactions are
+ * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
+ * is received within the specified timeout, the transaction is rolled back too. If an ack comes
+ * after that, it is simply ignored and the events get re-sent.
+ *
+ */
+
+class SparkSink extends AbstractSink with Logging with Configurable {
+
+ // Size of the pool to use for holding transaction processors.
+ private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
+
+ // Timeout for each transaction. If spark does not respond in this much time,
+ // rollback the transaction
+ private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+ // Address info to bind on
+ private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+ private var port: Int = 0
+
+ private var backOffInterval: Int = 200
+
+ // Handle to the server
+ private var serverOpt: Option[NettyServer] = None
+
+ // The handler that handles the callback from Avro
+ private var handler: Option[SparkAvroCallbackHandler] = None
+
+ // Latch that blocks off the Flume framework from wasting 1 thread.
+ private val blockingLatch = new CountDownLatch(1)
+
+ override def start() {
+ logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
+ hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
+ transactionTimeout + ".")
+ handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
+ backOffInterval))
+ val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
+ // Using the constructor that takes specific thread-pools requires bringing in netty
+ // dependencies which are being excluded in the build. In practice,
+ // Netty dependencies are already available on the JVM as Flume would have pulled them in.
+ serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
+ serverOpt.foreach(server => {
+ logInfo("Starting Avro server for sink: " + getName)
+ server.start()
+ })
+ super.start()
+ }
+
+ override def stop() {
+ logInfo("Stopping Spark Sink: " + getName)
+ handler.foreach(callbackHandler => {
+ callbackHandler.shutdown()
+ })
+ serverOpt.foreach(server => {
+ logInfo("Stopping Avro Server for sink: " + getName)
+ server.close()
+ server.join()
+ })
+ blockingLatch.countDown()
+ super.stop()
+ }
+
+ override def configure(ctx: Context) {
+ import SparkSinkConfig._
+ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+ port = Option(ctx.getInteger(CONF_PORT)).
+ getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
+ poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
+ transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+ backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
+ logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
+ "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
+ "backoffInterval: " + backOffInterval)
+ }
+
+ override def process(): Status = {
+ // This method is called in a loop by the Flume framework - block it until the sink is
+ // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
+ // being shut down.
+ logInfo("Blocking Sink Runner, sink will continue to run..")
+ blockingLatch.await()
+ Status.BACKOFF
+ }
+
+ private[flume] def getPort(): Int = {
+ serverOpt
+ .map(_.getPort)
+ .getOrElse(
+ throw new RuntimeException("Server was not started!")
+ )
+ }
+
+ /**
+ * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
+ * batch is received. The test can simply call await on this latch till the expected number of
+ * batches are received.
+ * @param latch
+ */
+ private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
+ handler.foreach(_.countDownWhenBatchAcked(latch))
+ }
+}
+
+/**
+ * Configuration parameters and their defaults.
+ */
+private[flume]
+object SparkSinkConfig {
+ val THREADS = "threads"
+ val DEFAULT_THREADS = 10
+
+ val CONF_TRANSACTION_TIMEOUT = "timeout"
+ val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+ val CONF_HOSTNAME = "hostname"
+ val DEFAULT_HOSTNAME = "0.0.0.0"
+
+ val CONF_PORT = "port"
+
+ val CONF_BACKOFF_INTERVAL = "backoffInterval"
+ val DEFAULT_BACKOFF_INTERVAL = 200
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
new file mode 100644
index 0000000000..845fc8debd
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkThreadFactory.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.ThreadFactory
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * Thread factory that generates daemon threads with a specified name format.
+ */
+private[sink] class SparkSinkThreadFactory(nameFormat: String) extends ThreadFactory {
+
+ private val threadId = new AtomicLong()
+
+ override def newThread(r: Runnable): Thread = {
+ val t = new Thread(r, nameFormat.format(threadId.incrementAndGet()))
+ t.setDaemon(true)
+ t
+ }
+
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
new file mode 100644
index 0000000000..47c0e294d6
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+private[flume] object SparkSinkUtils {
+ /**
+ * This method determines if this batch represents an error or not.
+ * @param batch - The batch to check
+ * @return - true if the batch represents an error
+ */
+ def isErrorBatch(batch: EventBatch): Boolean = {
+ !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
new file mode 100644
index 0000000000..b15c2097e5
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
+
+import scala.util.control.Breaks
+
+import org.apache.flume.{Channel, Transaction}
+
+// Flume forces transactions to be thread-local (horrible, I know!)
+// So the sink basically spawns a new thread to pull the events out within a transaction.
+// The thread fills in the event batch object that is set before the thread is scheduled.
+// After filling it in, the thread waits on a condition - which is released only
+// when the success message comes back for the specific sequence number for that event batch.
+/**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. The thread is blocked until the success call for that transaction
+ * comes back with an ACK or NACK.
+ * @param channel The channel from which to pull events
+ * @param seqNum The sequence number to use for the transaction. Must be unique
+ * @param maxBatchSize The maximum number of events to process per batch
+ * @param transactionTimeout Time in seconds after which a transaction must be rolled back
+ * without waiting for an ACK from Spark
+ * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
+ */
+private class TransactionProcessor(val channel: Channel, val seqNum: String,
+ var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
+ val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
+
+ // If a real batch is not returned, we always have to return an error batch.
+ @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
+ util.Collections.emptyList())
+
+ // Synchronization primitives
+ val batchGeneratedLatch = new CountDownLatch(1)
+ val batchAckLatch = new CountDownLatch(1)
+
+ // Sanity check to ensure we don't loop like crazy
+ val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
+
+ // OK to use volatile, since the change would only make this true (otherwise it will be
+ // changed to false - we never apply a negation operation to this) - which means the transaction
+ // succeeded.
+ @volatile private var batchSuccess = false
+
+ @volatile private var stopped = false
+
+ @volatile private var isTest = false
+
+ private var testLatch: CountDownLatch = null
+
+ // The transaction that this processor would handle
+ var txOpt: Option[Transaction] = None
+
+ /**
+ * Get an event batch from the channel. This method will block until a batch of events is
+ * available from the channel. If no events are available after a large number of attempts of
+ * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
+ *
+ * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
+ * maximum of maxBatchSize events
+ */
+ def getEventBatch: EventBatch = {
+ batchGeneratedLatch.await()
+ eventBatch
+ }
+
+ /**
+ * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
+ * method is a no-op if it is called after transactionTimeout has expired since
+ * getEventBatch returned a batch of events.
+ * @param success True if an ACK was received and the transaction should be committed, else false.
+ */
+ def batchProcessed(success: Boolean) {
+ logDebug("Batch processed for sequence number: " + seqNum)
+ batchSuccess = success
+ batchAckLatch.countDown()
+ }
+
+ private[flume] def shutdown(): Unit = {
+ logDebug("Shutting down transaction processor")
+ stopped = true
+ }
+
+ /**
+ * Populates events into the event batch. If the batch cannot be populated,
+ * this method will not set the events into the event batch, but it sets an error message.
+ */
+ private def populateEvents() {
+ try {
+ txOpt = Option(channel.getTransaction)
+ if(txOpt.isEmpty) {
+ eventBatch.setErrorMsg("Something went wrong. Channel was " +
+ "unable to create a transaction!")
+ }
+ txOpt.foreach(tx => {
+ tx.begin()
+ val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
+ val loop = new Breaks
+ var gotEventsInThisTxn = false
+ var loopCounter: Int = 0
+ loop.breakable {
+ while (!stopped && events.size() < maxBatchSize
+ && loopCounter < totalAttemptsToRemoveFromChannel) {
+ loopCounter += 1
+ Option(channel.take()) match {
+ case Some(event) =>
+ events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
+ ByteBuffer.wrap(event.getBody)))
+ gotEventsInThisTxn = true
+ case None =>
+ if (!gotEventsInThisTxn && !stopped) {
+ logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
+ " the current transaction")
+ TimeUnit.MILLISECONDS.sleep(backOffInterval)
+ } else {
+ loop.break()
+ }
+ }
+ }
+ }
+ if (!gotEventsInThisTxn && !stopped) {
+ val msg = "Tried several times, " +
+ "but did not get any events from the channel!"
+ logWarning(msg)
+ eventBatch.setErrorMsg(msg)
+ } else {
+ // At this point, the events are available, so fill them into the event batch
+ eventBatch = new EventBatch("", seqNum, events)
+ }
+ })
+ } catch {
+ case interrupted: InterruptedException =>
+ // Don't pollute logs if the InterruptedException came from this being stopped
+ if (!stopped) {
+ logWarning("Error while processing transaction.", interrupted)
+ }
+ case e: Exception =>
+ logWarning("Error while processing transaction.", e)
+ eventBatch.setErrorMsg(e.getMessage)
+ try {
+ txOpt.foreach(tx => {
+ rollbackAndClose(tx, close = true)
+ })
+ } finally {
+ txOpt = None
+ }
+ } finally {
+ batchGeneratedLatch.countDown()
+ }
+ }
+
+ /**
+ * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
+ * this method commits the transaction with the channel. If the ACK does not come in within
+ * that time or a NACK comes in, this method rolls back the transaction.
+ */
+ private def processAckOrNack() {
+ batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
+ txOpt.foreach(tx => {
+ if (batchSuccess) {
+ try {
+ logDebug("Committing transaction")
+ tx.commit()
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
+ "back", e)
+ rollbackAndClose(tx, close = false) // tx will be closed later anyway
+ } finally {
+ tx.close()
+ if (isTest) {
+ testLatch.countDown()
+ }
+ }
+ } else {
+ logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
+ rollbackAndClose(tx, close = true)
+ // This might have been due to timeout or a NACK. Either way the following call does not
+ // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
+ parent.removeAndGetProcessor(seqNum)
+ }
+ })
+ }
+
+ /**
+ * Helper method to rollback and optionally close a transaction
+ * @param tx The transaction to rollback
+ * @param close Whether the transaction should be closed or not after rolling back
+ */
+ private def rollbackAndClose(tx: Transaction, close: Boolean) {
+ try {
+ logWarning("Spark was unable to successfully process the events. Transaction is being " +
+ "rolled back.")
+ tx.rollback()
+ } catch {
+ case e: Exception =>
+ logError("Error rolling back transaction. Rollback may have failed!", e)
+ } finally {
+ if (close) {
+ tx.close()
+ }
+ }
+ }
+
+ /**
+ * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
+ * @param inMap The map to be converted
+ * @return The converted map
+ */
+ private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+ CharSequence] = {
+ val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+ charSeqMap.putAll(inMap)
+ charSeqMap
+ }
+
+ /**
+ * When the thread is started it sets as many events as the batch size or less (if enough
+ * events aren't available) into the eventBatch and object and lets any threads waiting on the
+ * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
+ * or for a specified timeout and commits or rolls back the transaction.
+ * @return
+ */
+ override def call(): Void = {
+ populateEvents()
+ processAckOrNack()
+ null
+ }
+
+ private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
+ testLatch = latch
+ isTest = true
+ }
+}
diff --git a/external/flume-sink/src/test/resources/log4j.properties b/external/flume-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..42df8792f1
--- /dev/null
+++ b/external/flume-sink/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file streaming/target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
new file mode 100644
index 0000000000..e8ca1e7163
--- /dev/null
+++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.event.EventBuilder
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+
+// Due to MNG-1378, there is not a way to include test dependencies transitively.
+// We cannot include Spark core tests as a dependency here because it depends on
+// Spark core main, which has too many dependencies to require here manually.
+// For this reason, we continue to use FunSuite and ignore the scalastyle checks
+// that fail if this is detected.
+// scalastyle:off
+import org.scalatest.FunSuite
+
+class SparkSinkSuite extends FunSuite {
+// scalastyle:on
+
+ val eventsPerBatch = 1000
+ val channelCapacity = 5000
+
+ test("Success with ack") {
+ val (channel, sink, latch) = initializeChannelAndSink()
+ channel.start()
+ sink.start()
+
+ putEvents(channel, eventsPerBatch)
+
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ client.ack(events.getSequenceNumber)
+ assert(events.getEvents.size() === 1000)
+ latch.await(1, TimeUnit.SECONDS)
+ assertChannelIsEmpty(channel)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Failure with nack") {
+ val (channel, sink, latch) = initializeChannelAndSink()
+ channel.start()
+ sink.start()
+ putEvents(channel, eventsPerBatch)
+
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ assert(events.getEvents.size() === 1000)
+ client.nack(events.getSequenceNumber)
+ latch.await(1, TimeUnit.SECONDS)
+ assert(availableChannelSlots(channel) === 4000)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Failure with timeout") {
+ val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
+ .CONF_TRANSACTION_TIMEOUT -> 1.toString))
+ channel.start()
+ sink.start()
+ putEvents(channel, eventsPerBatch)
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+
+ val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
+ val events = client.getEventBatch(1000)
+ assert(events.getEvents.size() === 1000)
+ latch.await(1, TimeUnit.SECONDS)
+ assert(availableChannelSlots(channel) === 4000)
+ sink.stop()
+ channel.stop()
+ transceiver.close()
+ }
+
+ test("Multiple consumers") {
+ testMultipleConsumers(failSome = false)
+ }
+
+ test("Multiple consumers with some failures") {
+ testMultipleConsumers(failSome = true)
+ }
+
+ def testMultipleConsumers(failSome: Boolean): Unit = {
+ implicit val executorContext = ExecutionContext
+ .fromExecutorService(Executors.newFixedThreadPool(5))
+ val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
+ channel.start()
+ sink.start()
+ (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
+ val port = sink.getPort
+ val address = new InetSocketAddress("0.0.0.0", port)
+ val transceiversAndClients = getTransceiverAndClient(address, 5)
+ val batchCounter = new CountDownLatch(5)
+ val counter = new AtomicInteger(0)
+ transceiversAndClients.foreach(x => {
+ Future {
+ val client = x._2
+ val events = client.getEventBatch(1000)
+ if (!failSome || counter.getAndIncrement() % 2 == 0) {
+ client.ack(events.getSequenceNumber)
+ } else {
+ client.nack(events.getSequenceNumber)
+ throw new RuntimeException("Sending NACK for failure!")
+ }
+ events
+ }.onComplete {
+ case Success(events) =>
+ assert(events.getEvents.size() === 1000)
+ batchCounter.countDown()
+ case Failure(t) =>
+ // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout
+ batchCounter.countDown()
+ }
+ })
+ batchCounter.await()
+ latch.await(1, TimeUnit.SECONDS)
+ executorContext.shutdown()
+ if(failSome) {
+ assert(availableChannelSlots(channel) === 3000)
+ } else {
+ assertChannelIsEmpty(channel)
+ }
+ sink.stop()
+ channel.stop()
+ transceiversAndClients.foreach(x => x._1.close())
+ }
+
+ private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty,
+ batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
+ val channel = new MemoryChannel()
+ val channelContext = new Context()
+
+ channelContext.put("capacity", channelCapacity.toString)
+ channelContext.put("transactionCapacity", 1000.toString)
+ channelContext.put("keep-alive", 0.toString)
+ channelContext.putAll(overrides.asJava)
+ channel.setName(scala.util.Random.nextString(10))
+ channel.configure(channelContext)
+
+ val sink = new SparkSink()
+ val sinkContext = new Context()
+ sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
+ sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
+ sink.configure(sinkContext)
+ sink.setChannel(channel)
+ val latch = new CountDownLatch(batchCounter)
+ sink.countdownWhenBatchReceived(latch)
+ (channel, sink, latch)
+ }
+
+ private def putEvents(ch: MemoryChannel, count: Int): Unit = {
+ val tx = ch.getTransaction
+ tx.begin()
+ (1 to count).foreach(x =>
+ ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
+ tx.commit()
+ tx.close()
+ }
+
+ private def getTransceiverAndClient(address: InetSocketAddress,
+ count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
+
+ (1 to count).map(_ => {
+ lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
+ new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
+ lazy val channelFactory =
+ new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor)
+ val transceiver = new NettyTransceiver(address, channelFactory)
+ val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
+ (transceiver, client)
+ })
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ assert(availableChannelSlots(channel) === channelCapacity)
+ }
+
+ private def availableChannelSlots(channel: MemoryChannel): Int = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
+ }
+}