aboutsummaryrefslogtreecommitdiff
path: root/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
blob: a4d81a680979ec7a778bdf84db16e2b8e5d03256 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.kinesis

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}

abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
  extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {

  private val testData = 1 to 8

  private var testUtils: KinesisTestUtils = null
  private var shardIds: Seq[String] = null
  private var shardIdToData: Map[String, Seq[Int]] = null
  private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
  private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null
  private var shardIdToRange: Map[String, SequenceNumberRange] = null
  private var allRanges: Seq[SequenceNumberRange] = null

  private var blockManager: BlockManager = null

  override def beforeAll(): Unit = {
    super.beforeAll()
    runIfTestsEnabled("Prepare KinesisTestUtils") {
      testUtils = new KPLBasedKinesisTestUtils()
      testUtils.createStream()

      shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
      require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")

      shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
      shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
      shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
      shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
        val seqNumRange = SequenceNumberRange(
          testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
        (shardId, seqNumRange)
      }
      allRanges = shardIdToRange.values.toSeq
    }
  }

  override def beforeEach(): Unit = {
    super.beforeEach()
    val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
    sc = new SparkContext(conf)
    blockManager = sc.env.blockManager
  }

  override def afterAll(): Unit = {
    try {
      if (testUtils != null) {
        testUtils.deleteStream()
      }
    } finally {
      super.afterAll()
    }
  }

  testIfEnabled("Basic reading from Kinesis") {
    // Verify all data using multiple ranges in a single RDD partition
    val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
      testUtils.endpointUrl, fakeBlockIds(1),
      Array(SequenceNumberRanges(allRanges.toArray))
    ).map { bytes => new String(bytes).toInt }.collect()
    assert(receivedData1.toSet === testData.toSet)

    // Verify all data using one range in each of the multiple RDD partitions
    val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
    ).map { bytes => new String(bytes).toInt }.collect()
    assert(receivedData2.toSet === testData.toSet)

    // Verify ordering within each partition
    val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
    ).map { bytes => new String(bytes).toInt }.collectPartitions()
    assert(receivedData3.length === allRanges.size)
    for (i <- 0 until allRanges.size) {
      assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
    }
  }

  testIfEnabled("Read data available in both block manager and Kinesis") {
    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2)
  }

  testIfEnabled("Read data available only in block manager, not in Kinesis") {
    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0)
  }

  testIfEnabled("Read data available only in Kinesis, not in block manager") {
    testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2)
  }

  testIfEnabled("Read data available partially in block manager, rest in Kinesis") {
    testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1)
  }

  testIfEnabled("Test isBlockValid skips block fetching from block manager") {
    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0,
      testIsBlockValid = true)
  }

  testIfEnabled("Test whether RDD is valid after removing blocks from block manager") {
    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2,
      testBlockRemove = true)
  }

  /**
   * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
   * and the rest to a write ahead log, and then reading reading it all back using the RDD.
   * It can also test if the partitions that were read from the log were again stored in
   * block manager.
   *
   *
   *
   * @param numPartitions Number of partitions in RDD
   * @param numPartitionsInBM Number of partitions to write to the BlockManager.
   *                          Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
   * @param numPartitionsInKinesis Number of partitions to write to the Kinesis.
   *                           Partitions (numPartitions - 1 - numPartitionsInKinesis) to
   *                           (numPartitions - 1) will be written to Kinesis
   * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
   * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
   *                        reads falling back to the WAL
   * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
   *
   *   numPartitionsInBM = 3
   *   |------------------|
   *   |                  |
   *    0       1       2       3       4
   *           |                         |
   *           |-------------------------|
   *              numPartitionsInKinesis = 4
   */
  private def testRDD(
      numPartitions: Int,
      numPartitionsInBM: Int,
      numPartitionsInKinesis: Int,
      testIsBlockValid: Boolean = false,
      testBlockRemove: Boolean = false
    ): Unit = {
    require(shardIds.size > 1, "Need at least 2 shards to test")
    require(numPartitionsInBM <= shardIds.size,
      "Number of partitions in BlockManager cannot be more than the Kinesis test shards available")
    require(numPartitionsInKinesis <= shardIds.size,
      "Number of partitions in Kinesis cannot be more than the Kinesis test shards available")
    require(numPartitionsInBM <= numPartitions,
      "Number of partitions in BlockManager cannot be more than that in RDD")
    require(numPartitionsInKinesis <= numPartitions,
      "Number of partitions in Kinesis cannot be more than that in RDD")

    // Put necessary blocks in the block manager
    val blockIds = fakeBlockIds(numPartitions)
    blockIds.foreach(blockManager.removeBlock(_))
    (0 until numPartitionsInBM).foreach { i =>
      val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() }
      blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY)
    }

    // Create the necessary ranges to use in the RDD
    val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
      SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
    val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
      val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
      SequenceNumberRanges(Array(range))
    }
    val ranges = (fakeRanges ++ realRanges)


    // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
    require(
      blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
      "Expected blocks not in BlockManager"
    )

    require(
      blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
      "Unexpected blocks in BlockManager"
    )

    // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not
    require(
      ranges.takeRight(numPartitionsInKinesis).forall {
        _.ranges.forall { _.streamName == testUtils.streamName }
      }, "Incorrect configuration of RDD, expected ranges not set: "
    )

    require(
      ranges.dropRight(numPartitionsInKinesis).forall {
        _.ranges.forall { _.streamName != testUtils.streamName }
      }, "Incorrect configuration of RDD, unexpected ranges set"
    )

    val rdd = new KinesisBackedBlockRDD[Array[Byte]](
      sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
    val collectedData = rdd.map { bytes =>
      new String(bytes).toInt
    }.collect()
    assert(collectedData.toSet === testData.toSet)

    // Verify that the block fetching is skipped when isBlockValid is set to false.
    // This is done by using an RDD whose data is only in memory but is set to skip block fetching
    // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
    // in BlockManager.
    if (testIsBlockValid) {
      require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
      require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
      val rdd2 = new KinesisBackedBlockRDD[Array[Byte]](
        sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
        isBlockIdValid = Array.fill(blockIds.length)(false))
      intercept[SparkException] {
        rdd2.collect()
      }
    }

    // Verify that the RDD is not invalid after the blocks are removed and can still read data
    // from write ahead log
    if (testBlockRemove) {
      require(numPartitions === numPartitionsInKinesis,
        "All partitions must be in WAL for this test")
      require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
      rdd.removeBlocks()
      assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet)
    }
  }

  /** Generate fake block ids */
  private def fakeBlockIds(num: Int): Array[BlockId] = {
    Array.tabulate(num) { i => new StreamBlockId(0, i) }
  }
}

class WithAggregationKinesisBackedBlockRDDSuite
  extends KinesisBackedBlockRDDTests(aggregateTestData = true)

class WithoutAggregationKinesisBackedBlockRDDSuite
  extends KinesisBackedBlockRDDTests(aggregateTestData = false)