aboutsummaryrefslogtreecommitdiff
path: root/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala
blob: 19e736f01697770086687abbd30e54e3b871d755 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.streaming.flume.sink

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{Callable, CountDownLatch, TimeUnit}

import scala.util.control.Breaks

import org.apache.flume.{Channel, Transaction}

// Flume forces transactions to be thread-local (horrible, I know!)
// So the sink basically spawns a new thread to pull the events out within a transaction.
// The thread fills in the event batch object that is set before the thread is scheduled.
// After filling it in, the thread waits on a condition - which is released only
// when the success message comes back for the specific sequence number for that event batch.
/**
 * This class represents a transaction on the Flume channel. This class runs a separate thread
 * which owns the transaction. The thread is blocked until the success call for that transaction
 * comes back with an ACK or NACK.
 * @param channel The channel from which to pull events
 * @param seqNum The sequence number to use for the transaction. Must be unique
 * @param maxBatchSize The maximum number of events to process per batch
 * @param transactionTimeout Time in seconds after which a transaction must be rolled back
 *                           without waiting for an ACK from Spark
 * @param parent The parent [[SparkAvroCallbackHandler]] instance, for reporting timeouts
 */
private class TransactionProcessor(val channel: Channel, val seqNum: String,
  var maxBatchSize: Int, val transactionTimeout: Int, val backOffInterval: Int,
  val parent: SparkAvroCallbackHandler) extends Callable[Void] with Logging {

  // If a real batch is not returned, we always have to return an error batch.
  @volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
    util.Collections.emptyList())

  // Synchronization primitives
  val batchGeneratedLatch = new CountDownLatch(1)
  val batchAckLatch = new CountDownLatch(1)

  // Sanity check to ensure we don't loop like crazy
  val totalAttemptsToRemoveFromChannel = Int.MaxValue / 2

  // OK to use volatile, since the change would only make this true (otherwise it will be
  // changed to false - we never apply a negation operation to this) - which means the transaction
  // succeeded.
  @volatile private var batchSuccess = false

  @volatile private var stopped = false

  @volatile private var isTest = false

  private var testLatch: CountDownLatch = null

  // The transaction that this processor would handle
  var txOpt: Option[Transaction] = None

  /**
   * Get an event batch from the channel. This method will block until a batch of events is
   * available from the channel. If no events are available after a large number of attempts of
   * polling the channel, this method will return an [[EventBatch]] with a non-empty error message
   *
   * @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
   *         maximum of maxBatchSize events
   */
  def getEventBatch: EventBatch = {
    batchGeneratedLatch.await()
    eventBatch
  }

  /**
   * This method is to be called by the sink when it receives an ACK or NACK from Spark. This
   * method is a no-op if it is called after transactionTimeout has expired since
   * getEventBatch returned a batch of events.
   * @param success True if an ACK was received and the transaction should be committed, else false.
   */
  def batchProcessed(success: Boolean) {
    logDebug("Batch processed for sequence number: " + seqNum)
    batchSuccess = success
    batchAckLatch.countDown()
  }

  private[flume] def shutdown(): Unit = {
    logDebug("Shutting down transaction processor")
    stopped = true
  }

  /**
   * Populates events into the event batch. If the batch cannot be populated,
   * this method will not set the events into the event batch, but it sets an error message.
   */
  private def populateEvents() {
    try {
      txOpt = Option(channel.getTransaction)
      if(txOpt.isEmpty) {
        eventBatch.setErrorMsg("Something went wrong. Channel was " +
          "unable to create a transaction!")
      }
      txOpt.foreach { tx =>
        tx.begin()
        val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
        val loop = new Breaks
        var gotEventsInThisTxn = false
        var loopCounter: Int = 0
        loop.breakable {
          while (!stopped && events.size() < maxBatchSize
            && loopCounter < totalAttemptsToRemoveFromChannel) {
            loopCounter += 1
            Option(channel.take()) match {
              case Some(event) =>
                events.add(new SparkSinkEvent(toCharSequenceMap(event.getHeaders),
                  ByteBuffer.wrap(event.getBody)))
                gotEventsInThisTxn = true
              case None =>
                if (!gotEventsInThisTxn && !stopped) {
                  logDebug("Sleeping for " + backOffInterval + " millis as no events were read in" +
                    " the current transaction")
                  TimeUnit.MILLISECONDS.sleep(backOffInterval)
                } else {
                  loop.break()
                }
            }
          }
        }
        if (!gotEventsInThisTxn && !stopped) {
          val msg = "Tried several times, " +
            "but did not get any events from the channel!"
          logWarning(msg)
          eventBatch.setErrorMsg(msg)
        } else {
          // At this point, the events are available, so fill them into the event batch
          eventBatch = new EventBatch("", seqNum, events)
        }
      }
    } catch {
      case interrupted: InterruptedException =>
        // Don't pollute logs if the InterruptedException came from this being stopped
        if (!stopped) {
          logWarning("Error while processing transaction.", interrupted)
        }
      case e: Exception =>
        logWarning("Error while processing transaction.", e)
        eventBatch.setErrorMsg(e.getMessage)
        try {
          txOpt.foreach { tx =>
            rollbackAndClose(tx, close = true)
          }
        } finally {
          txOpt = None
        }
    } finally {
      batchGeneratedLatch.countDown()
    }
  }

  /**
   * Waits for upto transactionTimeout seconds for an ACK. If an ACK comes in
   * this method commits the transaction with the channel. If the ACK does not come in within
   * that time or a NACK comes in, this method rolls back the transaction.
   */
  private def processAckOrNack() {
    batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
    txOpt.foreach { tx =>
      if (batchSuccess) {
        try {
          logDebug("Committing transaction")
          tx.commit()
        } catch {
          case e: Exception =>
            logWarning("Error while attempting to commit transaction. Transaction will be rolled " +
              "back", e)
            rollbackAndClose(tx, close = false) // tx will be closed later anyway
        } finally {
          tx.close()
          if (isTest) {
            testLatch.countDown()
          }
        }
      } else {
        logWarning("Spark could not commit transaction, NACK received. Rolling back transaction.")
        rollbackAndClose(tx, close = true)
        // This might have been due to timeout or a NACK. Either way the following call does not
        // cause issues. This is required to ensure the TransactionProcessor instance is not leaked
        parent.removeAndGetProcessor(seqNum)
      }
    }
  }

  /**
   * Helper method to rollback and optionally close a transaction
   * @param tx The transaction to rollback
   * @param close Whether the transaction should be closed or not after rolling back
   */
  private def rollbackAndClose(tx: Transaction, close: Boolean) {
    try {
      logWarning("Spark was unable to successfully process the events. Transaction is being " +
        "rolled back.")
      tx.rollback()
    } catch {
      case e: Exception =>
        logError("Error rolling back transaction. Rollback may have failed!", e)
    } finally {
      if (close) {
        tx.close()
      }
    }
  }

  /**
   * Helper method to convert a Map[String, String] to Map[CharSequence, CharSequence]
   * @param inMap The map to be converted
   * @return The converted map
   */
  private def toCharSequenceMap(inMap: java.util.Map[String, String]): java.util.Map[CharSequence,
    CharSequence] = {
    val charSeqMap = new util.HashMap[CharSequence, CharSequence](inMap.size())
    charSeqMap.putAll(inMap)
    charSeqMap
  }

  /**
   * When the thread is started it sets as many events as the batch size or less (if enough
   * events aren't available) into the eventBatch and object and lets any threads waiting on the
   * [[getEventBatch]] method to proceed. Then this thread waits for acks or nacks to come in,
   * or for a specified timeout and commits or rolls back the transaction.
   * @return
   */
  override def call(): Void = {
    populateEvents()
    processAckOrNack()
    null
  }

  private[sink] def countDownWhenBatchAcked(latch: CountDownLatch) {
    testLatch = latch
    isTest = true
  }
}