blob: 1c130654f3f95d34d83cb4e7acfc22ec59ebac60 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis
import java.lang.IllegalArgumentException
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mock.MockitoSugar
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase}
class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterEach
with MockitoSugar {
import KinesisInputDStream._
private val ssc = new StreamingContext(conf, batchDuration)
private val streamName = "a-very-nice-kinesis-stream-name"
private val checkpointAppName = "a-very-nice-kcl-app-name"
private def baseBuilder = KinesisInputDStream.builder
private def builder = baseBuilder.streamingContext(ssc)
.streamName(streamName)
.checkpointAppName(checkpointAppName)
override def afterAll(): Unit = {
ssc.stop()
}
test("should raise an exception if the StreamingContext is missing") {
intercept[IllegalArgumentException] {
baseBuilder.streamName(streamName).checkpointAppName(checkpointAppName).build()
}
}
test("should raise an exception if the stream name is missing") {
intercept[IllegalArgumentException] {
baseBuilder.streamingContext(ssc).checkpointAppName(checkpointAppName).build()
}
}
test("should raise an exception if the checkpoint app name is missing") {
intercept[IllegalArgumentException] {
baseBuilder.streamingContext(ssc).streamName(streamName).build()
}
}
test("should propagate required values to KinesisInputDStream") {
val dstream = builder.build()
assert(dstream.context == ssc)
assert(dstream.streamName == streamName)
assert(dstream.checkpointAppName == checkpointAppName)
}
test("should propagate default values to KinesisInputDStream") {
val dstream = builder.build()
assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL)
assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME)
assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM)
assert(dstream.checkpointInterval == batchDuration)
assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL)
assert(dstream.kinesisCreds == DefaultCredentials)
assert(dstream.dynamoDBCreds == None)
assert(dstream.cloudWatchCreds == None)
}
test("should propagate custom non-auth values to KinesisInputDStream") {
val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val customRegion = "us-west-2"
val customInitialPosition = InitialPositionInStream.TRIM_HORIZON
val customAppName = "a-very-nice-kinesis-app"
val customCheckpointInterval = Seconds(30)
val customStorageLevel = StorageLevel.MEMORY_ONLY
val customKinesisCreds = mock[SparkAWSCredentials]
val customDynamoDBCreds = mock[SparkAWSCredentials]
val customCloudWatchCreds = mock[SparkAWSCredentials]
val dstream = builder
.endpointUrl(customEndpointUrl)
.regionName(customRegion)
.initialPositionInStream(customInitialPosition)
.checkpointAppName(customAppName)
.checkpointInterval(customCheckpointInterval)
.storageLevel(customStorageLevel)
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
assert(dstream.initialPositionInStream == customInitialPosition)
assert(dstream.checkpointAppName == customAppName)
assert(dstream.checkpointInterval == customCheckpointInterval)
assert(dstream._storageLevel == customStorageLevel)
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
}
}
|