aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
blob: 1026d0fcb59bd3700c8085edf11e3e92c756b412 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.streaming.kinesis

import java.util.UUID
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal

import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.model.Record

import org.apache.spark.internal.Logging
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils

/**
 * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
 * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
 * https://github.com/awslabs/amazon-kinesis-client
 *
 * The way this Receiver works is as follows:
 *
 *  - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
 *    KinesisRecordProcessor
 *  - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
 *    inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
 *  - When the block generator defines a block, then the recorded sequence number ranges that were
 *    inserted into the block are recorded separately for being used later.
 *  - When the block is ready to be pushed, the block is pushed and the ranges are reported as
 *    metadata of the block. In addition, the ranges are used to find out the latest sequence
 *    number for each shard that can be checkpointed through the DynamoDB.
 *  - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
 *    number for it own shard.
 *
 * @param streamName   Kinesis stream name
 * @param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
 * @param regionName  Region name used by the Kinesis Client Library for
 *                    DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
 * @param initialPositionInStream  In the absence of Kinesis checkpoint info, this is the
 *                                 worker's initial starting position in the stream.
 *                                 The values are either the beginning of the stream
 *                                 per Kinesis' limit of 24 hours
 *                                 (InitialPositionInStream.TRIM_HORIZON) or
 *                                 the tip of the stream (InitialPositionInStream.LATEST).
 * @param checkpointAppName  Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
 *                 by the Kinesis Client Library.  If you change the App name or Stream name,
 *                 the KCL will throw errors.  This usually requires deleting the backing
 *                 DynamoDB table with the same name this Kinesis application.
 * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
 *                            See the Kinesis Spark Streaming documentation for more
 *                            details on the different types of checkpoints.
 * @param storageLevel Storage level to use for storing the received objects
 * @param kinesisCreds SparkAWSCredentials instance that will be used to generate the
 *                     AWSCredentialsProvider passed to the KCL to authorize Kinesis API calls.
 * @param cloudWatchCreds Optional SparkAWSCredentials instance that will be used to generate the
 *                        AWSCredentialsProvider passed to the KCL to authorize CloudWatch API
 *                        calls. Will use kinesisCreds if value is None.
 * @param dynamoDBCreds Optional SparkAWSCredentials instance that will be used to generate the
 *                      AWSCredentialsProvider passed to the KCL to authorize DynamoDB API calls.
 *                      Will use kinesisCreds if value is None.
 */
private[kinesis] class KinesisReceiver[T](
    val streamName: String,
    endpointUrl: String,
    regionName: String,
    initialPositionInStream: InitialPositionInStream,
    checkpointAppName: String,
    checkpointInterval: Duration,
    storageLevel: StorageLevel,
    messageHandler: Record => T,
    kinesisCreds: SparkAWSCredentials,
    dynamoDBCreds: Option[SparkAWSCredentials],
    cloudWatchCreds: Option[SparkAWSCredentials])
  extends Receiver[T](storageLevel) with Logging { receiver =>

  /*
   * =================================================================================
   * The following vars are initialize in the onStart() method which executes in the
   * Spark worker after this Receiver is serialized and shipped to the worker.
   * =================================================================================
   */

  /**
   * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
   * where this code runs (not the driver's IP address.)
   */
  @volatile private var workerId: String = null

  /**
   * Worker is the core client abstraction from the Kinesis Client Library (KCL).
   * A worker can process more than one shards from the given stream.
   * Each shard is assigned its own IRecordProcessor and the worker run multiple such
   * processors.
   */
  @volatile private var worker: Worker = null
  @volatile private var workerThread: Thread = null

  /** BlockGenerator used to generates blocks out of Kinesis data */
  @volatile private var blockGenerator: BlockGenerator = null

  /**
   * Sequence number ranges added to the current block being generated.
   * Accessing and updating of this map is synchronized by locks in BlockGenerator.
   */
  private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]

  /** Sequence number ranges of data added to each generated block */
  private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]

  /**
   * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
   */
  @volatile private var kinesisCheckpointer: KinesisCheckpointer = null

  /**
   * Latest sequence number ranges that have been stored successfully.
   * This is used for checkpointing through KCL */
  private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]

  /**
   * This is called when the KinesisReceiver starts and must be non-blocking.
   * The KCL creates and manages the receiving/processing thread pool through Worker.run().
   */
  override def onStart() {
    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

    workerId = Utils.localHostName() + ":" + UUID.randomUUID()

    kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
    val kinesisProvider = kinesisCreds.provider
    val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
          checkpointAppName,
          streamName,
          kinesisProvider,
          dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
          cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
          workerId)
        .withKinesisEndpoint(endpointUrl)
        .withInitialPositionInStream(initialPositionInStream)
        .withTaskBackoffTimeMillis(500)
        .withRegionName(regionName)

   /*
    *  RecordProcessorFactory creates impls of IRecordProcessor.
    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
    *  IRecordProcessor.processRecords() method.
    *  We're using our custom KinesisRecordProcessor in this case.
    */
    val recordProcessorFactory = new IRecordProcessorFactory {
      override def createProcessor: IRecordProcessor =
        new KinesisRecordProcessor(receiver, workerId)
    }

    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
    workerThread = new Thread() {
      override def run(): Unit = {
        try {
          worker.run()
        } catch {
          case NonFatal(e) =>
            restart("Error running the KCL worker in Receiver", e)
        }
      }
    }

    blockIdToSeqNumRanges.clear()
    blockGenerator.start()

    workerThread.setName(s"Kinesis Receiver ${streamId}")
    workerThread.setDaemon(true)
    workerThread.start()

    logInfo(s"Started receiver with workerId $workerId")
  }

  /**
   * This is called when the KinesisReceiver stops.
   * The KCL worker.shutdown() method stops the receiving/processing threads.
   * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
   */
  override def onStop() {
    if (workerThread != null) {
      if (worker != null) {
        worker.shutdown()
        worker = null
      }
      workerThread.join()
      workerThread = null
      logInfo(s"Stopped receiver for workerId $workerId")
    }
    workerId = null
    if (kinesisCheckpointer != null) {
      kinesisCheckpointer.shutdown()
      kinesisCheckpointer = null
    }
  }

  /** Add records of the given shard to the current block being generated */
  private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
    if (records.size > 0) {
      val dataIterator = records.iterator().asScala.map(messageHandler)
      val metadata = SequenceNumberRange(streamName, shardId,
        records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(),
        records.size())
      blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
    }
  }

  /** Return the current rate limit defined in [[BlockGenerator]]. */
  private[kinesis] def getCurrentLimit: Int = {
    assert(blockGenerator != null)
    math.min(blockGenerator.getCurrentLimit, Int.MaxValue).toInt
  }

  /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
  private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
    Option(shardIdToLatestStoredSeqNum.get(shardId))
  }

  /**
   * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the
   * given shardId.
   */
  def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
    kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
  }

  /**
   * Remove the checkpointer for the given shardId. The provided checkpointer will be used to
   * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not
   * checkpoint.
   */
  def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
    kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
  }

  /**
   * Remember the range of sequence numbers that was added to the currently active block.
   * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
   */
  private def rememberAddedRange(range: SequenceNumberRange): Unit = {
    seqNumRangesInCurrentBlock += range
  }

  /**
   * Finalize the ranges added to the block that was active and prepare the ranges buffer
   * for next block. Internally, this is synchronized with `rememberAddedRange()`.
   */
  private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
    blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
    seqNumRangesInCurrentBlock.clear()
    logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
  }

  /** Store the block along with its associated ranges */
  private def storeBlockWithRanges(
      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
    val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
    if (rangesToReportOption.isEmpty) {
      stop("Error while storing block into Spark, could not find sequence number ranges " +
        s"for block $blockId")
      return
    }

    val rangesToReport = rangesToReportOption.get
    var attempt = 0
    var stored = false
    var throwable: Throwable = null
    while (!stored && attempt <= 3) {
      try {
        store(arrayBuffer, rangesToReport)
        stored = true
      } catch {
        case NonFatal(th) =>
          attempt += 1
          throwable = th
      }
    }
    if (!stored) {
      stop("Error while storing block into Spark", throwable)
    }

    // Update the latest sequence number that have been successfully stored for each shard
    // Note that we are doing this sequentially because the array of sequence number ranges
    // is assumed to be
    rangesToReport.ranges.foreach { range =>
      shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
    }
  }

  /**
   * Class to handle blocks generated by this receiver's block generator. Specifically, in
   * the context of the Kinesis Receiver, this handler does the following.
   *
   * - When an array of records is added to the current active block in the block generator,
   *   this handler keeps track of the corresponding sequence number range.
   * - When the currently active block is ready to sealed (not more records), this handler
   *   keep track of the list of ranges added into this block in another H
   */
  private class GeneratedBlockHandler extends BlockGeneratorListener {

    /**
     * Callback method called after a data item is added into the BlockGenerator.
     * The data addition, block generation, and calls to onAddData and onGenerateBlock
     * are all synchronized through the same lock.
     */
    def onAddData(data: Any, metadata: Any): Unit = {
      rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
    }

    /**
     * Callback method called after a block has been generated.
     * The data addition, block generation, and calls to onAddData and onGenerateBlock
     * are all synchronized through the same lock.
     */
    def onGenerateBlock(blockId: StreamBlockId): Unit = {
      finalizeRangesForCurrentBlock(blockId)
    }

    /** Callback method called when a block is ready to be pushed / stored. */
    def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
      storeBlockWithRanges(blockId,
        arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
    }

    /** Callback called in case of any error in internal of the BlockGenerator */
    def onError(message: String, throwable: Throwable): Unit = {
      reportError(message, throwable)
    }
  }
}