aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink
diff options
context:
space:
mode:
authorHari Shreedharan <harishreedharan@gmail.com>2014-07-29 11:11:29 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-07-29 11:11:29 -0700
commit800ecff4b1127d9042d5a8a746348fb4d45aa34b (patch)
tree4cf35f6858fa9cf6e3db120b4785295103f60386 /external/flume-sink
parentfc4d05700026f4ee9cc5544cf493d900039c38f3 (diff)
downloadspark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.tar.gz
spark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.tar.bz2
spark-800ecff4b1127d9042d5a8a746348fb4d45aa34b.zip
[STREAMING] SPARK-1729. Make Flume pull data from source, rather than the current pu...
...sh model Currently Spark uses Flume's internal Avro Protocol to ingest data from Flume. If the executor running the receiver fails, it currently has to be restarted on the same node to be able to receive data. This commit adds a new Sink which can be deployed to a Flume agent. This sink can be polled by a new DStream that is also included in this commit. This model ensures that data can be pulled into Spark from Flume even if the receiver is restarted on a new node. This also allows the receiver to receive data on multiple threads for better performance. Author: Hari Shreedharan <harishreedharan@gmail.com> Author: Hari Shreedharan <hshreedharan@apache.org> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: harishreedharan <hshreedharan@cloudera.com> Closes #807 from harishreedharan/master and squashes the following commits: e7f70a3 [Hari Shreedharan] Merge remote-tracking branch 'asf-git/master' 96cfb6f [Hari Shreedharan] Merge remote-tracking branch 'asf/master' e48d785 [Hari Shreedharan] Documenting flume-sink being ignored for Mima checks. 5f212ce [Hari Shreedharan] Ignore Spark Sink from mima. 981bf62 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 7a1bc6e [Hari Shreedharan] Fix SparkBuild.scala a082eb3 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 1f47364 [Hari Shreedharan] Minor fixes. 73d6f6d [Hari Shreedharan] Cleaned up tests a bit. Added some docs in multiple places. 65b76b4 [Hari Shreedharan] Fixing the unit test. e59cc20 [Hari Shreedharan] Use SparkFlumeEvent instead of the new type. Also, Flume Polling Receiver now uses the store(ArrayBuffer) method. f3c99d1 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 3572180 [Hari Shreedharan] Adding a license header, making Jenkins happy. 799509f [Hari Shreedharan] Fix a compile issue. 3c5194c [Hari Shreedharan] Merge remote-tracking branch 'asf/master' d248d22 [harishreedharan] Merge pull request #1 from tdas/flume-polling 10b6214 [Tathagata Das] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java. 1edc806 [Hari Shreedharan] SPARK-1729. Update logging in Spark Sink. 8c00289 [Hari Shreedharan] More debug messages 393bd94 [Hari Shreedharan] SPARK-1729. Use LinkedBlockingQueue instead of ArrayBuffer to keep track of connections. 120e2a1 [Hari Shreedharan] SPARK-1729. Some test changes and changes to utils classes. 9fd0da7 [Hari Shreedharan] SPARK-1729. Use foreach instead of map for all Options. 8136aa6 [Hari Shreedharan] Adding TransactionProcessor to map on returning batch of data 86aa274 [Hari Shreedharan] Merge remote-tracking branch 'asf/master' 205034d [Hari Shreedharan] Merging master in 4b0c7fc [Hari Shreedharan] FLUME-1729. New Flume-Spark integration. bda01fc [Hari Shreedharan] FLUME-1729. Flume-Spark integration. 0d69604 [Hari Shreedharan] FLUME-1729. Better Flume-Spark integration. 3c23c18 [Hari Shreedharan] SPARK-1729. New Spark-Flume integration. 70bcc2a [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. d6fa3aa [Hari Shreedharan] SPARK-1729. New Flume-Spark integration. e7da512 [Hari Shreedharan] SPARK-1729. Fixing import order 9741683 [Hari Shreedharan] SPARK-1729. Fixes based on review. c604a3c [Hari Shreedharan] SPARK-1729. Optimize imports. 0f10788 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 87775aa [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 8df37e4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 03d6c1c [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 08176ad [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model d24d9d4 [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model 6d6776a [Hari Shreedharan] SPARK-1729. Make Flume pull data from source, rather than the current push model
Diffstat (limited to 'external/flume-sink')
-rw-r--r--external/flume-sink/pom.xml100
-rw-r--r--external/flume-sink/src/main/avro/sparkflume.avdl40
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala125
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala131
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala154
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala28
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala228
7 files changed, 806 insertions, 0 deletions
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
new file mode 100644
index 0000000000..d11129ce8d
--- /dev/null
+++ b/external/flume-sink/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-streaming-flume-sink_2.10</artifactId>
+ <properties>
+ <sbt.project.name>streaming-flume-sink</sbt.project.name>
+ </properties>
+
+ <packaging>jar</packaging>
+ <name>Spark Project External Flume Sink</name>
+ <url>http://spark.apache.org/</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.4.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>1.4.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.4</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.7.3</version>
+ <configuration>
+ <!-- Generate the output in the same directory as the sbt-avro-plugin -->
+ <outputDirectory>${project.basedir}/target/scala-${scala.binary.version}/src_managed/main/compiled_avro</outputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl
new file mode 100644
index 0000000000..8806e863ac
--- /dev/null
+++ b/external/flume-sink/src/main/avro/sparkflume.avdl
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@namespace("org.apache.spark.streaming.flume.sink")
+
+protocol SparkFlumeProtocol {
+
+ record SparkSinkEvent {
+ map<string> headers;
+ bytes body;
+ }
+
+ record EventBatch {
+ string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
+ string sequenceNumber;
+ array<SparkSinkEvent> events;
+ }
+
+ EventBatch getEventBatch (int n);
+
+ void ack (string sequenceNumber);
+
+ void nack (string sequenceNumber);
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
new file mode 100644
index 0000000000..17cbc6707b
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Copy of the org.apache.spark.Logging for being used in the Spark Sink.
+ * The org.apache.spark.Logging is not used so that all of Spark is not brought
+ * in as a dependency.
+ */
+private[sink] trait Logging {
+ // Make the log field transient so that objects with Logging can
+ // be serialized and used on another machine
+ @transient private var log_ : Logger = null
+
+ // Method to get or create the logger for this object
+ protected def log: Logger = {
+ if (log_ == null) {
+ initializeIfNecessary()
+ var className = this.getClass.getName
+ // Ignore trailing $'s in the class names for Scala objects
+ if (className.endsWith("$")) {
+ className = className.substring(0, className.length - 1)
+ }
+ log_ = LoggerFactory.getLogger(className)
+ }
+ log_
+ }
+
+ // Log methods that take only a String
+ protected def logInfo(msg: => String) {
+ if (log.isInfoEnabled) log.info(msg)
+ }
+
+ protected def logDebug(msg: => String) {
+ if (log.isDebugEnabled) log.debug(msg)
+ }
+
+ protected def logTrace(msg: => String) {
+ if (log.isTraceEnabled) log.trace(msg)
+ }
+
+ protected def logWarning(msg: => String) {
+ if (log.isWarnEnabled) log.warn(msg)
+ }
+
+ protected def logError(msg: => String) {
+ if (log.isErrorEnabled) log.error(msg)
+ }
+
+ // Log methods that take Throwables (Exceptions/Errors) too
+ protected def logInfo(msg: => String, throwable: Throwable) {
+ if (log.isInfoEnabled) log.info(msg, throwable)
+ }
+
+ protected def logDebug(msg: => String, throwable: Throwable) {
+ if (log.isDebugEnabled) log.debug(msg, throwable)
+ }
+
+ protected def logTrace(msg: => String, throwable: Throwable) {
+ if (log.isTraceEnabled) log.trace(msg, throwable)
+ }
+
+ protected def logWarning(msg: => String, throwable: Throwable) {
+ if (log.isWarnEnabled) log.warn(msg, throwable)
+ }
+
+ protected def logError(msg: => String, throwable: Throwable) {
+ if (log.isErrorEnabled) log.error(msg, throwable)
+ }
+
+ protected def isTraceEnabled(): Boolean = {
+ log.isTraceEnabled
+ }
+
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+private[sink] object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
+ try {
+ // We use reflection here to handle the case where users remove the
+ // slf4j-to-jul bridge order to route their logs to JUL.
+ val bridgeClass = Class.forName("org.slf4j.bridge.SLF4JBridgeHandler")
+ bridgeClass.getMethod("removeHandlersForRootLogger").invoke(null)
+ val installed = bridgeClass.getMethod("isInstalled").invoke(null).asInstanceOf[Boolean]
+ if (!installed) {
+ bridgeClass.getMethod("install").invoke(null)
+ }
+ } catch {
+ case e: ClassNotFoundException => // can't log anything yet so just fail silently
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
new file mode 100644
index 0000000000..7da8eb3e35
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.flume.Channel
+import org.apache.commons.lang.RandomStringUtils
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+/**
+ * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
+ * requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
+ * @param threads Number of threads to use to process requests.
+ * @param channel The channel that the sink pulls events from
+ * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
+ * is rolled back.
+ */
+// Flume forces transactions to be thread-local. So each transaction *must* be committed, or
+// rolled back from the thread it was originally created in. So each getEvents call from Spark
+// creates a TransactionProcessor which runs in a new thread, in which the transaction is created
+// and events are pulled off the channel. Once the events are sent to spark,
+// that thread is blocked and the TransactionProcessor is saved in a map,
+// until an ACK or NACK comes back or the transaction times out (after the specified timeout).
+// When the response comes or a timeout is hit, the TransactionProcessor is retrieved and then
+// unblocked, at which point the transaction is committed or rolled back.
+
+private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
+ val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging {
+ val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Spark Sink Processor Thread - %d").build()))
+ private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+ // This sink will not persist sequence numbers and reuses them if it gets restarted.
+ // So it is possible to commit a transaction which may have been meant for the sink before the
+ // restart.
+ // Since the new txn may not have the same sequence number we must guard against accidentally
+ // committing a new transaction. To reduce the probability of that happening a random string is
+ // prepended to the sequence number. Does not change for life of sink
+ private val seqBase = RandomStringUtils.randomAlphanumeric(8)
+ private val seqCounter = new AtomicLong(0)
+
+ /**
+ * Returns a bunch of events to Spark over Avro RPC.
+ * @param n Maximum number of events to return in a batch
+ * @return [[EventBatch]] instance that has a sequence number and an array of at most n events
+ */
+ override def getEventBatch(n: Int): EventBatch = {
+ logDebug("Got getEventBatch call from Spark.")
+ val sequenceNumber = seqBase + seqCounter.incrementAndGet()
+ val processor = new TransactionProcessor(channel, sequenceNumber,
+ n, transactionTimeout, backOffInterval, this)
+ transactionExecutorOpt.foreach(executor => {
+ executor.submit(processor)
+ })
+ // Wait until a batch is available - will be an error if error message is non-empty
+ val batch = processor.getEventBatch
+ if (!SparkSinkUtils.isErrorBatch(batch)) {
+ processorMap.put(sequenceNumber.toString, processor)
+ logDebug("Sending event batch with sequence number: " + sequenceNumber)
+ }
+ batch
+ }
+
+ /**
+ * Called by Spark to indicate successful commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that was successful
+ */
+ override def ack(sequenceNumber: CharSequence): Void = {
+ logDebug("Received Ack for batch with sequence number: " + sequenceNumber)
+ completeTransaction(sequenceNumber, success = true)
+ null
+ }
+
+ /**
+ * Called by Spark to indicate failed commit of a batch
+ * @param sequenceNumber The sequence number of the event batch that failed
+ * @return
+ */
+ override def nack(sequenceNumber: CharSequence): Void = {
+ completeTransaction(sequenceNumber, success = false)
+ logInfo("Spark failed to commit transaction. Will reattempt events.")
+ null
+ }
+
+ /**
+ * Helper method to commit or rollback a transaction.
+ * @param sequenceNumber The sequence number of the batch that was completed
+ * @param success Whether the batch was successful or not.
+ */
+ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
+ Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
+ processor.batchProcessed(success)
+ })
+ }
+
+ /**
+ * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
+ * @param sequenceNumber
+ * @return The transaction processor for the corresponding batch. Note that this instance is no
+ * longer tracked and the caller is responsible for that txn processor.
+ */
+ private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
+ processorMap.remove(sequenceNumber.toString) // The toString is required!
+ }
+
+ /**
+ * Shuts down the executor used to process transactions.
+ */
+ def shutdown() {
+ logInfo("Shutting down Spark Avro Callback Handler")
+ transactionExecutorOpt.foreach(executor => {
+ executor.shutdownNow()
+ })
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
new file mode 100644
index 0000000000..7b735133e3
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.net.InetSocketAddress
+import java.util.concurrent._
+
+import org.apache.avro.ipc.NettyServer
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.flume.Context
+import org.apache.flume.Sink.Status
+import org.apache.flume.conf.{Configurable, ConfigurationException}
+import org.apache.flume.sink.AbstractSink
+
+/**
+ * A sink that uses Avro RPC to run a server that can be polled by Spark's
+ * FlumePollingInputDStream. This sink has the following configuration parameters:
+ *
+ * hostname - The hostname to bind to. Default: 0.0.0.0
+ * port - The port to bind to. (No default - mandatory)
+ * timeout - Time in seconds after which a transaction is rolled back,
+ * if an ACK is not received from Spark within that time
+ * threads - Number of threads to use to receive requests from Spark (Default: 10)
+ *
+ * This sink is unlike other Flume sinks in the sense that it does not push data,
+ * instead the process method in this sink simply blocks the SinkRunner the first time it is
+ * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
+ *
+ * Each time a getEventBatch call comes, creates a transaction and reads events
+ * from the channel. When enough events are read, the events are sent to the Spark receiver and
+ * the thread itself is blocked and a reference to it saved off.
+ *
+ * When the ack for that batch is received,
+ * the thread which created the transaction is is retrieved and it commits the transaction with the
+ * channel from the same thread it was originally created in (since Flume transactions are
+ * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
+ * is received within the specified timeout, the transaction is rolled back too. If an ack comes
+ * after that, it is simply ignored and the events get re-sent.
+ *
+ */
+
+private[flume]
+class SparkSink extends AbstractSink with Logging with Configurable {
+
+ // Size of the pool to use for holding transaction processors.
+ private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS
+
+ // Timeout for each transaction. If spark does not respond in this much time,
+ // rollback the transaction
+ private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT
+
+ // Address info to bind on
+ private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
+ private var port: Int = 0
+
+ private var backOffInterval: Int = 200
+
+ // Handle to the server
+ private var serverOpt: Option[NettyServer] = None
+
+ // The handler that handles the callback from Avro
+ private var handler: Option[SparkAvroCallbackHandler] = None
+
+ // Latch that blocks off the Flume framework from wasting 1 thread.
+ private val blockingLatch = new CountDownLatch(1)
+
+ override def start() {
+ logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
+ hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
+ transactionTimeout + ".")
+ handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
+ backOffInterval))
+ val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
+ // Using the constructor that takes specific thread-pools requires bringing in netty
+ // dependencies which are being excluded in the build. In practice,
+ // Netty dependencies are already available on the JVM as Flume would have pulled them in.
+ serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
+ serverOpt.foreach(server => {
+ logInfo("Starting Avro server for sink: " + getName)
+ server.start()
+ })
+ super.start()
+ }
+
+ override def stop() {
+ logInfo("Stopping Spark Sink: " + getName)
+ handler.foreach(callbackHandler => {
+ callbackHandler.shutdown()
+ })
+ serverOpt.foreach(server => {
+ logInfo("Stopping Avro Server for sink: " + getName)
+ server.close()
+ server.join()
+ })
+ blockingLatch.countDown()
+ super.stop()
+ }
+
+ override def configure(ctx: Context) {
+ import SparkSinkConfig._
+ hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
+ port = Option(ctx.getInteger(CONF_PORT)).
+ getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
+ poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
+ transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
+ backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
+ logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
+ "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
+ "backoffInterval: " + backOffInterval)
+ }
+
+ override def process(): Status = {
+ // This method is called in a loop by the Flume framework - block it until the sink is
+ // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
+ // being shut down.
+ logInfo("Blocking Sink Runner, sink will continue to run..")
+ blockingLatch.await()
+ Status.BACKOFF
+ }
+}
+
+/**
+ * Configuration parameters and their defaults.
+ */
+private[flume]
+object SparkSinkConfig {
+ val THREADS = "threads"
+ val DEFAULT_THREADS = 10
+
+ val CONF_TRANSACTION_TIMEOUT = "timeout"
+ val DEFAULT_TRANSACTION_TIMEOUT = 60
+
+ val CONF_HOSTNAME = "hostname"
+ val DEFAULT_HOSTNAME = "0.0.0.0"
+
+ val CONF_PORT = "port"
+
+ val CONF_BACKOFF_INTERVAL = "backoffInterval"
+ val DEFAULT_BACKOFF_INTERVAL = 200
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
new file mode 100644
index 0000000000..47c0e294d6
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+private[flume] object SparkSinkUtils {
+ /**
+ * This method determines if this batch represents an error or not.
+ * @param batch - The batch to check
+ * @return - true if the batch represents an error
+ */
+ def isErrorBatch(batch: EventBatch): Boolean = {
+ !batch.getErrorMsg.toString.equals("") // If there is an error message, it is an error batch.
+ }
+}
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
new file mode 100644
index 0000000000..b9e3c786eb
--- /dev/null
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.flume.sink
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}
+
+import scala.util.control.Breaks
+
+import org.apache.flume.{Transaction, Channel}
+
+// Flume forces transactions to be thread-local (horrible, I know!)
+// So the sink basically spawns a new thread to pull the events out within a transaction.
+// The thread fills in the event batch object that is set before the thread is scheduled.
+// After filling it in, the thread waits on a condition - which is released only
+// when the success message comes back for the specific sequence number for that event batch.
+/**
+ * This class represents a transaction on the Flume channel. This class runs a separate thread
+ * which owns the transaction. The thread is blocked until the success call for that transaction
+ * comes back with an ACK or NACK.
+ * @param channel The channel from which to pull events
+ * @param seqNum The sequence number to use for the transaction. Must be unique
+ * @param maxBatchSize The maximum number of events to process per batch
+ * @param transactionTimeout Time in seconds after which a transaction must be rolled back
+ * without waiting for an ACK from Spark
+ * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
+ */
+private class TransactionProcessor(val channel: Channel, val seqNum: String,
+ var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
+ val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {
+
+ // If a real batch is not returned, we always have to return an error batch.
+ @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
+ util.Collections.emptyList())
+
+ // Synchronization primitives
+ val batchGeneratedLatch = new CountDownLatch(1)
+ val batchAckLatch = new CountDownLatch(1)
+
+ // Sanity check to ensure we don't loop like crazy
+ val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2
+
+ // OK to use volatile, since the change would only make this true (otherwise it will be
+ // changed to false - we never apply a negation operation to this) - which means the transaction
+ // succeeded.
+ @volatile private var batchSuccess = false
+
+ // The transaction that this processor would handle
+ var txOpt: Option[Transaction] = None
+
+ /**
+ * Get an event batch from the channel. This method will block until a batch of events is
+ * available from the channel. If no events are available after a large number of attempts of
+ * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
+ *
+ * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
+ * maximum of maxBatchSize events
+ */
+ def getEventBatch: EventBatch = {
+ batchGeneratedLatch.await()
+ eventBatch
+ }
+
+ /**
+ * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
+ * method is a no-op if it is called after transactionTimeout has expired since
+ * getEventBatch returned a batch of events.
+ * @param success True if an ACK was received and the transaction should be committed, else false.
+ */
+ def batchProcessed(success: Boolean) {
+ logDebug("Batch processed for sequence number: " + seqNum)
+ batchSuccess = success
+ batchAckLatch.countDown()
+ }
+
+ /**
+ * Populates events into the event batch. If the batch cannot be populated,
+ * this method will not set the events into the event batch, but it sets an error message.
+ */
+ private def populateEvents() {
+ try {
+ txOpt = Option(channel.getTransaction)
+ if(txOpt.isEmpty) {
+ eventBatch.setErrorMsg("Something went wrong. Channel was " +
+ "unable to create a transaction!")
+ }
+ txOpt.foreach(tx => {
+ tx.begin()
+ val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
+ val loop = new Breaks
+ var gotEventsInThisTxn = false
+ var loopCounter: Int = 0
+ loop.breakable {
+ while (events.size() < maxBatchSize
+ && loopCounter < totalAttemptsToRemoveFromChannel) {
+ loopCounter += 1
+ Option(channel.take()) match {
+ case Some(event) =>
+ events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
+ ByteBuffer.wrap(event.getBody)))
+ gotEventsInThisTxn = true
+ case None =>
+ if (!gotEventsInThisTxn) {
+ logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
+ " the current transaction")
+ TimeUnit.MILLISECONDS.sleep(backOffInterval)
+ } else {
+ loop.break()
+ }
+ }
+ }
+ }
+ if (!gotEventsInThisTxn) {
+ val msg = "Tried several times, " +
+ "but did not get any events from the channel!"
+ logWarning(msg)
+ eventBatch.setErrorMsg(msg)
+ } else {
+ // At this point, the events are available, so fill them into the event batch
+ eventBatch = new EventBatch("",seqNum, events)
+ }
+ })
+ } catch {
+ case e: Exception =>
+ logWarning("Error while processing transaction.", e)
+ eventBatch.setErrorMsg(e.getMessage)
+ try {
+ txOpt.foreach(tx => {
+ rollbackAndClose(tx, close = true)
+ })
+ } finally {
+ txOpt = None
+ }
+ } finally {
+ batchGeneratedLatch.countDown()
+ }
+ }
+
+ /**
+ * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
+ * this method commits the transaction with the channel. If the ACK does not come in within
+ * that time or a NACK comes in, this method rolls back the transaction.
+ */
+ private def processAckOrNack() {
+ batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
+ txOpt.foreach(tx => {
+ if (batchSuccess) {
+ try {
+ logDebug("Committing transaction")
+ tx.commit()
+ } catch {
+ case e: Exception =>
+ logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
+ "back", e)
+ rollbackAndClose(tx, close = false) // tx will be closed later anyway
+ } finally {
+ tx.close()
+ }
+ } else {
+ logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
+ rollbackAndClose(tx, close = true)
+ // This might have been due to timeout or a NACK. Either way the following call does not
+ // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
+ parent.removeAndGetProcessor(seqNum)
+ }
+ })
+ }
+
+ /**
+ * Helper method to rollback and optionally close a transaction
+ * @param tx The transaction to rollback
+ * @param close Whether the transaction should be closed or not after rolling back
+ */
+ private def rollbackAndClose(tx: Transaction, close: Boolean) {
+ try {
+ logWarning("Spark was unable to successfully process the events. Transaction is being " +
+ "rolled back.")
+ tx.rollback()
+ } catch {
+ case e: Exception =>
+ logError("Error rolling back transaction. Rollback may have failed!", e)
+ } finally {
+ if (close) {
+ tx.close()
+ }
+ }
+ }
+
+ /**
+ * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
+ * @param inMap The map to be converted
+ * @return The converted map
+ */
+ private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
+ CharSequence] = {
+ val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
+ charSeqMap.putAll(inMap)
+ charSeqMap
+ }
+
+ /**
+ * When the thread is started it sets as many events as the batch size or less (if enough
+ * events aren't available) into the eventBatch and object and lets any threads waiting on the
+ * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
+ * or for a specified timeout and commits or rolls back the transaction.
+ * @return
+ */
+ override def call(): Void = {
+ populateEvents()
+ processAckOrNack()
+ null
+ }
+}