aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala
blob: 41f27e937662f5f6bd9d49fc44e12fb0af3d83a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.streaming.flume.sink

import java.net.InetSocketAddress
import java.util.concurrent._

import org.apache.avro.ipc.NettyServer
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.flume.Context
import org.apache.flume.Sink.Status
import org.apache.flume.conf.{Configurable, ConfigurationException}
import org.apache.flume.sink.AbstractSink

/**
 * A sink that uses Avro RPC to run a server that can be polled by Spark's
 * FlumePollingInputDStream. This sink has the following configuration parameters:
 *
 * hostname - The hostname to bind to. Default: 0.0.0.0
 * port - The port to bind to. (No default - mandatory)
 * timeout - Time in seconds after which a transaction is rolled back,
 * if an ACK is not received from Spark within that time
 * threads - Number of threads to use to receive requests from Spark (Default: 10)
 *
 * This sink is unlike other Flume sinks in the sense that it does not push data,
 * instead the process method in this sink simply blocks the SinkRunner the first time it is
 * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
 *
 * Each time a getEventBatch call comes, creates a transaction and reads events
 * from the channel. When enough events are read, the events are sent to the Spark receiver and
 * the thread itself is blocked and a reference to it saved off.
 *
 * When the ack for that batch is received,
 * the thread which created the transaction is is retrieved and it commits the transaction with the
 * channel from the same thread it was originally created in (since Flume transactions are
 * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
 * is received within the specified timeout, the transaction is rolled back too. If an ack comes
 * after that, it is simply ignored and the events get re-sent.
 *
 */

class SparkSink extends AbstractSink with Logging with Configurable {

  // Size of the pool to use for holding transaction processors.
  private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS

  // Timeout for each transaction. If spark does not respond in this much time,
  // rollback the transaction
  private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT

  // Address info to bind on
  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
  private var port: Int = 0

  private var backOffInterval: Int = 200

  // Handle to the server
  private var serverOpt: Option[NettyServer] = None

  // The handler that handles the callback from Avro
  private var handler: Option[SparkAvroCallbackHandler] = None

  // Latch that blocks off the Flume framework from wasting 1 thread.
  private val blockingLatch = new CountDownLatch(1)

  override def start() {
    logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
      hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
      transactionTimeout + ".")
    handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
      backOffInterval))
    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
    // Using the constructor that takes specific thread-pools requires bringing in netty
    // dependencies which are being excluded in the build. In practice,
    // Netty dependencies are already available on the JVM as Flume would have pulled them in.
    serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
    serverOpt.foreach { server =>
      logInfo("Starting Avro server for sink: " + getName)
      server.start()
    }
    super.start()
  }

  override def stop() {
    logInfo("Stopping Spark Sink: " + getName)
    handler.foreach { callbackHandler =>
      callbackHandler.shutdown()
    }
    serverOpt.foreach { server =>
      logInfo("Stopping Avro Server for sink: " + getName)
      server.close()
      server.join()
    }
    blockingLatch.countDown()
    super.stop()
  }

  override def configure(ctx: Context) {
    import SparkSinkConfig._
    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
    port = Option(ctx.getInteger(CONF_PORT)).
      getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
    poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
    backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
    logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
      "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
      "backoffInterval: " + backOffInterval)
  }

  override def process(): Status = {
    // This method is called in a loop by the Flume framework - block it until the sink is
    // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
    // being shut down.
    logInfo("Blocking Sink Runner, sink will continue to run..")
    blockingLatch.await()
    Status.BACKOFF
  }

  private[flume] def getPort(): Int = {
    serverOpt
      .map(_.getPort)
      .getOrElse(
        throw new RuntimeException("Server was not started!")
      )
  }

  /**
   * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
   * batch is received. The test can simply call await on this latch till the expected number of
   * batches are received.
   * @param latch
   */
  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
    handler.foreach(_.countDownWhenBatchAcked(latch))
  }
}

/**
 * Configuration parameters and their defaults.
 */
private[flume]
object SparkSinkConfig {
  val THREADS = "threads"
  val DEFAULT_THREADS = 10

  val CONF_TRANSACTION_TIMEOUT = "timeout"
  val DEFAULT_TRANSACTION_TIMEOUT = 60

  val CONF_HOSTNAME = "hostname"
  val DEFAULT_HOSTNAME = "0.0.0.0"

  val CONF_PORT = "port"

  val CONF_BACKOFF_INTERVAL = "backoffInterval"
  val DEFAULT_BACKOFF_INTERVAL = 200
}