aboutsummaryrefslogtreecommitdiff
path: root/extras/kinesis-asl/src/main/scala
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-31 12:09:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:09:48 -0700
commit3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0 (patch)
tree5f362cf13352f4a06ea05e2f3221674147587e75 /extras/kinesis-asl/src/main/scala
parent39ab199a3f735b7658ab3331d3e2fb03441aec13 (diff)
downloadspark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.gz
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.tar.bz2
spark-3afc1de89cb4de9f8ea74003dd1e6b5b006d06f0.zip
[SPARK-8564] [STREAMING] Add the Python API for Kinesis
This PR adds the Python API for Kinesis, including a Python example and a simple unit test. Author: zsxwing <zsxwing@gmail.com> Closes #6955 from zsxwing/kinesis-python and squashes the following commits: e42e471 [zsxwing] Merge branch 'master' into kinesis-python 455f7ea [zsxwing] Remove streaming_kinesis_asl_assembly module and simply add the source folder to streaming_kinesis_asl module 32e6451 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 5082d28 [zsxwing] Fix the syntax error for Python 2.6 fca416b [zsxwing] Fix wrong comparison 96670ff [zsxwing] Fix the compilation error after merging master 756a128 [zsxwing] Merge branch 'master' into kinesis-python 6c37395 [zsxwing] Print stack trace for debug 7c5cfb0 [zsxwing] RUN_KINESIS_TESTS -> ENABLE_KINESIS_TESTS cc9d071 [zsxwing] Fix the python test errors 466b425 [zsxwing] Add python tests for Kinesis e33d505 [zsxwing] Merge remote-tracking branch 'origin/master' into kinesis-python 3da2601 [zsxwing] Fix the kinesis folder 687446b [zsxwing] Fix the error message and the maven output path add2beb [zsxwing] Merge branch 'master' into kinesis-python 4957c0b [zsxwing] Add the Python API for Kinesis
Diffstat (limited to 'extras/kinesis-asl/src/main/scala')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala19
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala78
2 files changed, 78 insertions, 19 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index ca39358b75..255ac27f79 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -36,9 +36,15 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
-private class KinesisTestUtils(
- val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com",
- _regionName: String = "") extends Logging {
+private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
+
+ def this() {
+ this("https://kinesis.us-west-2.amazonaws.com", "")
+ }
+
+ def this(endpointUrl: String) {
+ this(endpointUrl, "")
+ }
val regionName = if (_regionName.length == 0) {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
@@ -117,6 +123,13 @@ private class KinesisTestUtils(
shardIdToSeqNumbers.toMap
}
+ /**
+ * Expose a Python friendly API.
+ */
+ def pushData(testData: java.util.List[Int]): Unit = {
+ pushData(scala.collection.JavaConversions.asScalaBuffer(testData))
+ }
+
def deleteStream(): Unit = {
try {
if (streamCreated) {
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index e5acab5018..7dab17eba8 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -86,19 +86,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
ssc: StreamingContext,
@@ -130,7 +130,7 @@ object KinesisUtils {
* - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
* [[org.apache.spark.SparkConf]].
*
- * @param ssc Java StreamingContext object
+ * @param ssc StreamingContext object
* @param streamName Kinesis stream name
* @param endpointUrl Endpoint url of Kinesis service
* (e.g., https://kinesis.us-east-1.amazonaws.com)
@@ -175,15 +175,15 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
*/
@@ -206,8 +206,8 @@ object KinesisUtils {
* This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
*
* Note:
- * The given AWS credentials will get saved in DStream checkpoints if checkpointing
- * is enabled. Make sure that your checkpoint directory is secure.
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
*
* @param jssc Java StreamingContext object
* @param kinesisAppName Kinesis application name used by the Kinesis Client Library
@@ -216,19 +216,19 @@ object KinesisUtils {
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Name of region used by the Kinesis Client Library (KCL) to update
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
- * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
- * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
- * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
- * See the Kinesis Spark Streaming documentation for more
- * details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects.
* StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
*/
def createStream(
jssc: JavaStreamingContext,
@@ -297,3 +297,49 @@ object KinesisUtils {
}
}
}
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+ def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
+ initialPositionInStream match {
+ case 0 => InitialPositionInStream.LATEST
+ case 1 => InitialPositionInStream.TRIM_HORIZON
+ case _ => throw new IllegalArgumentException(
+ "Illegal InitialPositionInStream. Please use " +
+ "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
+ }
+ }
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: Int,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ if (awsAccessKeyId == null && awsSecretKey != null) {
+ throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
+ }
+ if (awsAccessKeyId != null && awsSecretKey == null) {
+ throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
+ }
+ if (awsAccessKeyId == null && awsSecretKey == null) {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
+ } else {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
+ awsAccessKeyId, awsSecretKey)
+ }
+ }
+
+}