aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-09 18:27:44 +0000
committerSean Owen <sowen@cloudera.com>2016-03-09 18:27:44 +0000
commit256704c771d301700af9ebf0d180c1ba7c4116c0 (patch)
treef9be79919b5c6ec4847c24a086fa844555e2cd12 /external
parent7791d0c3a9bdfe73e071266846f9ab1491fce50c (diff)
downloadspark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.gz
spark-256704c771d301700af9ebf0d180c1ba7c4116c0.tar.bz2
spark-256704c771d301700af9ebf0d180c1ba7c4116c0.zip
[SPARK-13595][BUILD] Move docker, extras modules into external
## What changes were proposed in this pull request? Move `docker` dirs out of top level into `external/`; move `extras/*` into `external/` ## How was this patch tested? This is tested with Jenkins tests. Author: Sean Owen <sowen@cloudera.com> Closes #11523 from srowen/SPARK-13595.
Diffstat (limited to 'external')
-rw-r--r--external/docker-integration-tests/pom.xml184
-rw-r--r--external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala160
-rw-r--r--external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala153
-rw-r--r--external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala78
-rw-r--r--external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala105
-rw-r--r--external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala68
-rw-r--r--external/docker/README.md7
-rwxr-xr-xexternal/docker/build22
-rw-r--r--external/docker/spark-mesos/Dockerfile30
-rw-r--r--external/docker/spark-test/README.md11
-rw-r--r--external/docker/spark-test/base/Dockerfile37
-rwxr-xr-xexternal/docker/spark-test/build22
-rw-r--r--external/docker/spark-test/master/Dockerfile21
-rwxr-xr-xexternal/docker/spark-test/master/default_cmd28
-rw-r--r--external/docker/spark-test/worker/Dockerfile22
-rwxr-xr-xexternal/docker/spark-test/worker/default_cmd28
-rw-r--r--external/java8-tests/README.md24
-rw-r--r--external/java8-tests/pom.xml161
-rw-r--r--external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java393
-rw-r--r--external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java905
-rw-r--r--external/java8-tests/src/test/resources/log4j.properties28
-rw-r--r--external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala27
-rw-r--r--external/kinesis-asl-assembly/pom.xml181
-rw-r--r--external/kinesis-asl/pom.xml87
-rw-r--r--external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java189
-rw-r--r--external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py83
-rw-r--r--external/kinesis-asl/src/main/resources/log4j.properties37
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala276
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala288
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala133
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala76
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala361
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala177
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala260
-rw-r--r--external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala560
-rw-r--r--external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java62
-rw-r--r--external/kinesis-asl/src/test/resources/log4j.properties27
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala72
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala259
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala152
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala46
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala210
-rw-r--r--external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala297
-rw-r--r--external/spark-ganglia-lgpl/pom.xml49
-rw-r--r--external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala90
45 files changed, 6486 insertions, 0 deletions
diff --git a/external/docker-integration-tests/pom.xml b/external/docker-integration-tests/pom.xml
new file mode 100644
index 0000000000..1764aa9465
--- /dev/null
+++ b/external/docker-integration-tests/pom.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-docker-integration-tests_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Docker Integration Tests</name>
+ <url>http://spark.apache.org/</url>
+ <properties>
+ <sbt.project.name>docker-integration-tests</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-client</artifactId>
+ <classifier>shaded</classifier>
+ <scope>test</scope>
+ <!--
+ See https://github.com/spotify/docker-client/pull/272#issuecomment-155249101
+ for an explanation of why these exclusions are (necessarily) a mess.
+ -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.jaxrs</groupId>
+ <artifactId>jackson-jaxrs-json-provider</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.connectors</groupId>
+ <artifactId>jersey-apache-connector</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-json-jackson</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.4.1</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- Necessary in order to avoid errors in log messages: -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- Oracle ojdbc jar, used for oracle integration suite for docker testing.
+ See https://github.com/apache/spark/pull/11306 for background on why we need
+ to use a an ojdbc jar for the testcase. The maven dependency here is commented
+ because currently the maven repository does not contain the ojdbc jar mentioned.
+ Once the jar is available in maven, this could be uncommented. -->
+ <!--
+ <dependency>
+ <groupId>com.oracle</groupId>
+ <artifactId>ojdbc6</artifactId>
+ <version>11.2.0.2.0</version>
+ <scope>test</scope>
+ </dependency>
+ -->
+ <!-- Jersey dependencies, used to override version.
+ See https://github.com/apache/spark/pull/9503#issuecomment-154369560 for
+ background on why we need to use a newer Jersey only in this test module;
+ we can remove this once https://github.com/spotify/docker-client/pull/272 is
+ merged and a new docker-client release is published. -->
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-servlet</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.19</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>stax</groupId>
+ <artifactId>stax-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- End Jersey dependencies -->
+ </dependencies>
+</project>
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
new file mode 100644
index 0000000000..f73231fc80
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.net.ServerSocket
+import java.sql.Connection
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.spotify.docker.client._
+import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.DockerUtils
+
+abstract class DatabaseOnDocker {
+ /**
+ * The docker image to be pulled.
+ */
+ val imageName: String
+
+ /**
+ * Environment variables to set inside of the Docker container while launching it.
+ */
+ val env: Map[String, String]
+
+ /**
+ * The container-internal JDBC port that the database listens on.
+ */
+ val jdbcPort: Int
+
+ /**
+ * Return a JDBC URL that connects to the database running at the given IP address and port.
+ */
+ def getJdbcUrl(ip: String, port: Int): String
+}
+
+abstract class DockerJDBCIntegrationSuite
+ extends SparkFunSuite
+ with BeforeAndAfterAll
+ with Eventually
+ with SharedSQLContext {
+
+ val db: DatabaseOnDocker
+
+ private var docker: DockerClient = _
+ private var containerId: String = _
+ protected var jdbcUrl: String = _
+
+ override def beforeAll() {
+ super.beforeAll()
+ try {
+ docker = DefaultDockerClient.fromEnv.build()
+ // Check that Docker is actually up
+ try {
+ docker.ping()
+ } catch {
+ case NonFatal(e) =>
+ log.error("Exception while connecting to Docker. Check whether Docker is running.")
+ throw e
+ }
+ // Ensure that the Docker image is installed:
+ try {
+ docker.inspectImage(db.imageName)
+ } catch {
+ case e: ImageNotFoundException =>
+ log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
+ docker.pull(db.imageName)
+ }
+ // Configure networking (necessary for boot2docker / Docker Machine)
+ val externalPort: Int = {
+ val sock = new ServerSocket(0)
+ val port = sock.getLocalPort
+ sock.close()
+ port
+ }
+ val dockerIp = DockerUtils.getDockerIp()
+ val hostConfig: HostConfig = HostConfig.builder()
+ .networkMode("bridge")
+ .portBindings(
+ Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava)
+ .build()
+ // Create the database container:
+ val config = ContainerConfig.builder()
+ .image(db.imageName)
+ .networkDisabled(false)
+ .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
+ .hostConfig(hostConfig)
+ .exposedPorts(s"${db.jdbcPort}/tcp")
+ .build()
+ containerId = docker.createContainer(config).id
+ // Start the container and wait until the database can accept JDBC connections:
+ docker.startContainer(containerId)
+ jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
+ eventually(timeout(60.seconds), interval(1.seconds)) {
+ val conn = java.sql.DriverManager.getConnection(jdbcUrl)
+ conn.close()
+ }
+ // Run any setup queries:
+ val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl)
+ try {
+ dataPreparation(conn)
+ } finally {
+ conn.close()
+ }
+ } catch {
+ case NonFatal(e) =>
+ try {
+ afterAll()
+ } finally {
+ throw e
+ }
+ }
+ }
+
+ override def afterAll() {
+ try {
+ if (docker != null) {
+ try {
+ if (containerId != null) {
+ docker.killContainer(containerId)
+ docker.removeContainer(containerId)
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Could not stop container $containerId", e)
+ } finally {
+ docker.close()
+ }
+ }
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ /**
+ * Prepare databases and tables for testing.
+ */
+ def dataPreparation(connection: Connection): Unit
+}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
new file mode 100644
index 0000000000..c68e4dc493
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.math.BigDecimal
+import java.sql.{Connection, Date, Timestamp}
+import java.util.Properties
+
+import org.apache.spark.tags.DockerTest
+
+@DockerTest
+class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
+ override val db = new DatabaseOnDocker {
+ override val imageName = "mysql:5.7.9"
+ override val env = Map(
+ "MYSQL_ROOT_PASSWORD" -> "rootpass"
+ )
+ override val jdbcPort: Int = 3306
+ override def getJdbcUrl(ip: String, port: Int): String =
+ s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
+ }
+
+ override def dataPreparation(conn: Connection): Unit = {
+ conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
+ conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y TEXT(8))").executeUpdate()
+ conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate()
+ conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate()
+
+ conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
+ + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
+ + "dbl DOUBLE)").executeUpdate()
+ conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
+ + "42.75, 1.0000000000000002)").executeUpdate()
+
+ conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
+ + "yr YEAR)").executeUpdate()
+ conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', "
+ + "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate()
+
+ // TODO: Test locale conversion for strings.
+ conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c TINYTEXT, "
+ + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i BLOB)"
+ ).executeUpdate()
+ conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " +
+ "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
+ }
+
+ test("Basic test") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 2)
+ val types = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(types.length == 2)
+ assert(types(0).equals("class java.lang.Integer"))
+ assert(types(1).equals("class java.lang.String"))
+ }
+
+ test("Numeric types") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 1)
+ val types = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(types.length == 9)
+ assert(types(0).equals("class java.lang.Boolean"))
+ assert(types(1).equals("class java.lang.Long"))
+ assert(types(2).equals("class java.lang.Integer"))
+ assert(types(3).equals("class java.lang.Integer"))
+ assert(types(4).equals("class java.lang.Integer"))
+ assert(types(5).equals("class java.lang.Long"))
+ assert(types(6).equals("class java.math.BigDecimal"))
+ assert(types(7).equals("class java.lang.Double"))
+ assert(types(8).equals("class java.lang.Double"))
+ assert(rows(0).getBoolean(0) == false)
+ assert(rows(0).getLong(1) == 0x225)
+ assert(rows(0).getInt(2) == 17)
+ assert(rows(0).getInt(3) == 77777)
+ assert(rows(0).getInt(4) == 123456789)
+ assert(rows(0).getLong(5) == 123456789012345L)
+ val bd = new BigDecimal("123456789012345.12345678901234500000")
+ assert(rows(0).getAs[BigDecimal](6).equals(bd))
+ assert(rows(0).getDouble(7) == 42.75)
+ assert(rows(0).getDouble(8) == 1.0000000000000002)
+ }
+
+ test("Date types") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 1)
+ val types = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(types.length == 5)
+ assert(types(0).equals("class java.sql.Date"))
+ assert(types(1).equals("class java.sql.Timestamp"))
+ assert(types(2).equals("class java.sql.Timestamp"))
+ assert(types(3).equals("class java.sql.Timestamp"))
+ assert(types(4).equals("class java.sql.Date"))
+ assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09")))
+ assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24")))
+ assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45")))
+ assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30")))
+ assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01")))
+ }
+
+ test("String types") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 1)
+ val types = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(types.length == 9)
+ assert(types(0).equals("class java.lang.String"))
+ assert(types(1).equals("class java.lang.String"))
+ assert(types(2).equals("class java.lang.String"))
+ assert(types(3).equals("class java.lang.String"))
+ assert(types(4).equals("class java.lang.String"))
+ assert(types(5).equals("class java.lang.String"))
+ assert(types(6).equals("class [B"))
+ assert(types(7).equals("class [B"))
+ assert(types(8).equals("class [B"))
+ assert(rows(0).getString(0).equals("the"))
+ assert(rows(0).getString(1).equals("quick"))
+ assert(rows(0).getString(2).equals("brown"))
+ assert(rows(0).getString(3).equals("fox"))
+ assert(rows(0).getString(4).equals("jumps"))
+ assert(rows(0).getString(5).equals("over"))
+ assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0)))
+ assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121)))
+ assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103)))
+ }
+
+ test("Basic write test") {
+ val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
+ val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties)
+ val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties)
+ df1.write.jdbc(jdbcUrl, "numberscopy", new Properties)
+ df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
+ df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
+ }
+}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
new file mode 100644
index 0000000000..8a0f938f7e
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+import java.util.Properties
+
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.tags.DockerTest
+
+/**
+ * This patch was tested using the Oracle docker. Created this integration suite for the same.
+ * The ojdbc6-11.2.0.2.0.jar was to be downloaded from the maven repository. Since there was
+ * no jdbc jar available in the maven repository, the jar was downloaded from oracle site
+ * manually and installed in the local; thus tested. So, for SparkQA test case run, the
+ * ojdbc jar might be manually placed in the local maven repository(com/oracle/ojdbc6/11.2.0.2.0)
+ * while Spark QA test run.
+ *
+ * The following would be the steps to test this
+ * 1. Pull oracle 11g image - docker pull wnameless/oracle-xe-11g
+ * 2. Start docker - sudo service docker start
+ * 3. Download oracle 11g driver jar and put it in maven local repo:
+ * (com/oracle/ojdbc6/11.2.0.2.0/ojdbc6-11.2.0.2.0.jar)
+ * 4. The timeout and interval parameter to be increased from 60,1 to a high value for oracle test
+ * in DockerJDBCIntegrationSuite.scala (Locally tested with 200,200 and executed successfully).
+ * 5. Run spark test - ./build/sbt "test-only org.apache.spark.sql.jdbc.OracleIntegrationSuite"
+ *
+ * All tests in this suite are ignored because of the dependency with the oracle jar from maven
+ * repository.
+ */
+@DockerTest
+class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLContext {
+ import testImplicits._
+
+ override val db = new DatabaseOnDocker {
+ override val imageName = "wnameless/oracle-xe-11g:latest"
+ override val env = Map(
+ "ORACLE_ROOT_PASSWORD" -> "oracle"
+ )
+ override val jdbcPort: Int = 1521
+ override def getJdbcUrl(ip: String, port: Int): String =
+ s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
+ }
+
+ override def dataPreparation(conn: Connection): Unit = {
+ }
+
+ ignore("SPARK-12941: String datatypes to be mapped to Varchar in Oracle") {
+ // create a sample dataframe with string type
+ val df1 = sparkContext.parallelize(Seq(("foo"))).toDF("x")
+ // write the dataframe to the oracle table tbl
+ df1.write.jdbc(jdbcUrl, "tbl2", new Properties)
+ // read the table from the oracle
+ val dfRead = sqlContext.read.jdbc(jdbcUrl, "tbl2", new Properties)
+ // get the rows
+ val rows = dfRead.collect()
+ // verify the data type is inserted
+ val types = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(types(0).equals("class java.lang.String"))
+ // verify the value is the inserted correct or not
+ assert(rows(0).getString(0).equals("foo"))
+ }
+}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
new file mode 100644
index 0000000000..d55cdcf28b
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+import java.util.Properties
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.types.{ArrayType, DecimalType}
+import org.apache.spark.tags.DockerTest
+
+@DockerTest
+class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+ override val db = new DatabaseOnDocker {
+ override val imageName = "postgres:9.4.5"
+ override val env = Map(
+ "POSTGRES_PASSWORD" -> "rootpass"
+ )
+ override val jdbcPort = 5432
+ override def getJdbcUrl(ip: String, port: Int): String =
+ s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
+ }
+
+ override def dataPreparation(conn: Connection): Unit = {
+ conn.prepareStatement("CREATE DATABASE foo").executeUpdate()
+ conn.setCatalog("foo")
+ conn.prepareStatement("CREATE TYPE enum_type AS ENUM ('d1', 'd2')").executeUpdate()
+ conn.prepareStatement("CREATE TABLE bar (c0 text, c1 integer, c2 double precision, c3 bigint, "
+ + "c4 bit(1), c5 bit(10), c6 bytea, c7 boolean, c8 inet, c9 cidr, "
+ + "c10 integer[], c11 text[], c12 real[], c13 numeric(2,2)[], c14 enum_type)").executeUpdate()
+ conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', "
+ + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16', "
+ + """'{1, 2}', '{"a", null, "b"}', '{0.11, 0.22}', '{0.11, 0.22}', 'd1')""").executeUpdate()
+ }
+
+ test("Type mapping for various types") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 1)
+ val types = rows(0).toSeq.map(x => x.getClass)
+ assert(types.length == 15)
+ assert(classOf[String].isAssignableFrom(types(0)))
+ assert(classOf[java.lang.Integer].isAssignableFrom(types(1)))
+ assert(classOf[java.lang.Double].isAssignableFrom(types(2)))
+ assert(classOf[java.lang.Long].isAssignableFrom(types(3)))
+ assert(classOf[java.lang.Boolean].isAssignableFrom(types(4)))
+ assert(classOf[Array[Byte]].isAssignableFrom(types(5)))
+ assert(classOf[Array[Byte]].isAssignableFrom(types(6)))
+ assert(classOf[java.lang.Boolean].isAssignableFrom(types(7)))
+ assert(classOf[String].isAssignableFrom(types(8)))
+ assert(classOf[String].isAssignableFrom(types(9)))
+ assert(classOf[Seq[Int]].isAssignableFrom(types(10)))
+ assert(classOf[Seq[String]].isAssignableFrom(types(11)))
+ assert(classOf[Seq[Double]].isAssignableFrom(types(12)))
+ assert(classOf[Seq[BigDecimal]].isAssignableFrom(types(13)))
+ assert(classOf[String].isAssignableFrom(types(14)))
+ assert(rows(0).getString(0).equals("hello"))
+ assert(rows(0).getInt(1) == 42)
+ assert(rows(0).getDouble(2) == 1.25)
+ assert(rows(0).getLong(3) == 123456789012345L)
+ assert(!rows(0).getBoolean(4))
+ // BIT(10)'s come back as ASCII strings of ten ASCII 0's and 1's...
+ assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](5),
+ Array[Byte](49, 48, 48, 48, 49, 48, 48, 49, 48, 49)))
+ assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6),
+ Array[Byte](0xDE.toByte, 0xAD.toByte, 0xBE.toByte, 0xEF.toByte)))
+ assert(rows(0).getBoolean(7))
+ assert(rows(0).getString(8) == "172.16.0.42")
+ assert(rows(0).getString(9) == "192.168.0.0/16")
+ assert(rows(0).getSeq(10) == Seq(1, 2))
+ assert(rows(0).getSeq(11) == Seq("a", null, "b"))
+ assert(rows(0).getSeq(12).toSeq == Seq(0.11f, 0.22f))
+ assert(rows(0).getSeq(13) == Seq("0.11", "0.22").map(BigDecimal(_).bigDecimal))
+ assert(rows(0).getString(14) == "d1")
+ }
+
+ test("Basic write test") {
+ val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties)
+ // Test only that it doesn't crash.
+ df.write.jdbc(jdbcUrl, "public.barcopy", new Properties)
+ // Test that written numeric type has same DataType as input
+ assert(sqlContext.read.jdbc(jdbcUrl, "public.barcopy", new Properties).schema(13).dataType ==
+ ArrayType(DecimalType(2, 2), true))
+ // Test write null values.
+ df.select(df.queryExecution.analyzed.output.map { a =>
+ Column(Literal.create(null, a.dataType)).as(a.name)
+ }: _*).write.jdbc(jdbcUrl, "public.barcopy2", new Properties)
+ }
+}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
new file mode 100644
index 0000000000..fda377e032
--- /dev/null
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.{Inet4Address, InetAddress, NetworkInterface}
+
+import scala.collection.JavaConverters._
+import scala.sys.process._
+import scala.util.Try
+
+private[spark] object DockerUtils {
+
+ def getDockerIp(): String = {
+ /** If docker-machine is setup on this box, attempts to find the ip from it. */
+ def findFromDockerMachine(): Option[String] = {
+ sys.env.get("DOCKER_MACHINE_NAME").flatMap { name =>
+ Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption
+ }
+ }
+ sys.env.get("DOCKER_IP")
+ .orElse(findFromDockerMachine())
+ .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption)
+ .getOrElse {
+ // This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist
+ // certain interfaces.
+ val address = InetAddress.getLocalHost
+ // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
+ // a better address using the local network interfaces
+ // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
+ // on unix-like system. On windows, it returns in index order.
+ // It's more proper to pick ip address following system output order.
+ val blackListedIFs = Seq(
+ "vboxnet0", // Mac
+ "docker0" // Linux
+ )
+ val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i =>
+ !blackListedIFs.contains(i.getName)
+ }
+ val reOrderedNetworkIFs = activeNetworkIFs.reverse
+ for (ni <- reOrderedNetworkIFs) {
+ val addresses = ni.getInetAddresses.asScala
+ .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq
+ if (addresses.nonEmpty) {
+ val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
+ // because of Inet6Address.toHostName may add interface at the end if it knows about it
+ val strippedAddress = InetAddress.getByAddress(addr.getAddress)
+ return strippedAddress.getHostAddress
+ }
+ }
+ address.getHostAddress
+ }
+ }
+}
diff --git a/external/docker/README.md b/external/docker/README.md
new file mode 100644
index 0000000000..40ba9c3065
--- /dev/null
+++ b/external/docker/README.md
@@ -0,0 +1,7 @@
+Spark docker files
+===========
+
+Drawn from Matt Massie's docker files (https://github.com/massie/dockerfiles),
+as well as some updates from Andre Schumacher (https://github.com/AndreSchumacher/docker).
+
+Tested with Docker version 0.8.1.
diff --git a/external/docker/build b/external/docker/build
new file mode 100755
index 0000000000..253a2fc8dd
--- /dev/null
+++ b/external/docker/build
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker images > /dev/null || { echo Please install docker in non-sudo mode. ; exit; }
+
+./spark-test/build \ No newline at end of file
diff --git a/external/docker/spark-mesos/Dockerfile b/external/docker/spark-mesos/Dockerfile
new file mode 100644
index 0000000000..fb3f267fe5
--- /dev/null
+++ b/external/docker/spark-mesos/Dockerfile
@@ -0,0 +1,30 @@
+# This is an example Dockerfile for creating a Spark image which can be
+# references by the Spark property 'spark.mesos.executor.docker.image'
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM mesosphere/mesos:0.20.1
+
+# Update the base ubuntu image with dependencies needed for Spark
+RUN apt-get update && \
+ apt-get install -y python libnss3 openjdk-7-jre-headless curl
+
+RUN mkdir /opt/spark && \
+ curl http://www.apache.org/dyn/closer.lua/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.4.tgz \
+ | tar -xzC /opt
+ENV SPARK_HOME /opt/spark
+ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so
diff --git a/external/docker/spark-test/README.md b/external/docker/spark-test/README.md
new file mode 100644
index 0000000000..ec0baf6e6d
--- /dev/null
+++ b/external/docker/spark-test/README.md
@@ -0,0 +1,11 @@
+Spark Docker files usable for testing and development purposes.
+
+These images are intended to be run like so:
+
+ docker run -v $SPARK_HOME:/opt/spark spark-test-master
+ docker run -v $SPARK_HOME:/opt/spark spark-test-worker spark://<master_ip>:7077
+
+Using this configuration, the containers will have their Spark directories
+mounted to your actual `SPARK_HOME`, allowing you to modify and recompile
+your Spark source and have them immediately usable in the docker images
+(without rebuilding them).
diff --git a/external/docker/spark-test/base/Dockerfile b/external/docker/spark-test/base/Dockerfile
new file mode 100644
index 0000000000..76f550f886
--- /dev/null
+++ b/external/docker/spark-test/base/Dockerfile
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM ubuntu:precise
+
+# Upgrade package index
+# install a few other useful packages plus Open Jdk 7
+# Remove unneeded /var/lib/apt/lists/* after install to reduce the
+# docker image size (by ~30MB)
+RUN apt-get update && \
+ apt-get install -y less openjdk-7-jre-headless net-tools vim-tiny sudo openssh-server && \
+ rm -rf /var/lib/apt/lists/*
+
+ENV SCALA_VERSION 2.11.7
+ENV CDH_VERSION cdh4
+ENV SCALA_HOME /opt/scala-$SCALA_VERSION
+ENV SPARK_HOME /opt/spark
+ENV PATH $SPARK_HOME:$SCALA_HOME/bin:$PATH
+
+# Install Scala
+ADD http://www.scala-lang.org/files/archive/scala-$SCALA_VERSION.tgz /
+RUN (cd / && gunzip < scala-$SCALA_VERSION.tgz)|(cd /opt && tar -xvf -)
+RUN rm /scala-$SCALA_VERSION.tgz
diff --git a/external/docker/spark-test/build b/external/docker/spark-test/build
new file mode 100755
index 0000000000..6f9e197433
--- /dev/null
+++ b/external/docker/spark-test/build
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+docker build -t spark-test-base spark-test/base/
+docker build -t spark-test-master spark-test/master/
+docker build -t spark-test-worker spark-test/worker/
diff --git a/external/docker/spark-test/master/Dockerfile b/external/docker/spark-test/master/Dockerfile
new file mode 100644
index 0000000000..f729534ab6
--- /dev/null
+++ b/external/docker/spark-test/master/Dockerfile
@@ -0,0 +1,21 @@
+# Spark Master
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-test-base
+ADD default_cmd /root/
+CMD ["/root/default_cmd"]
diff --git a/external/docker/spark-test/master/default_cmd b/external/docker/spark-test/master/default_cmd
new file mode 100755
index 0000000000..5a7da3446f
--- /dev/null
+++ b/external/docker/spark-test/master/default_cmd
@@ -0,0 +1,28 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
+echo "CONTAINER_IP=$IP"
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.master.Master -i $IP
diff --git a/external/docker/spark-test/worker/Dockerfile b/external/docker/spark-test/worker/Dockerfile
new file mode 100644
index 0000000000..890febe7b6
--- /dev/null
+++ b/external/docker/spark-test/worker/Dockerfile
@@ -0,0 +1,22 @@
+# Spark Worker
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-test-base
+ENV SPARK_WORKER_PORT 8888
+ADD default_cmd /root/
+ENTRYPOINT ["/root/default_cmd"]
diff --git a/external/docker/spark-test/worker/default_cmd b/external/docker/spark-test/worker/default_cmd
new file mode 100755
index 0000000000..31b06cb0eb
--- /dev/null
+++ b/external/docker/spark-test/worker/default_cmd
@@ -0,0 +1,28 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+IP=$(ip -o -4 addr list eth0 | perl -n -e 'if (m{inet\s([\d\.]+)\/\d+\s}xms) { print $1 }')
+echo "CONTAINER_IP=$IP"
+export SPARK_LOCAL_IP=$IP
+export SPARK_PUBLIC_DNS=$IP
+
+# Avoid the default Docker behavior of mapping our IP address to an unreachable host name
+umount /etc/hosts
+
+/opt/spark/bin/spark-class org.apache.spark.deploy.worker.Worker $1
diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md
new file mode 100644
index 0000000000..dc9e87f2ee
--- /dev/null
+++ b/external/java8-tests/README.md
@@ -0,0 +1,24 @@
+# Java 8 Test Suites
+
+These tests require having Java 8 installed and are isolated from the main Spark build.
+If Java 8 is not your system's default Java version, you will need to point Spark's build
+to your Java location. The set-up depends a bit on the build system:
+
+* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
+ `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
+ include the Java 8 test project.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"`
+
+* For Maven users,
+
+ Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not
+ automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests`
+ must be used.
+
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
+ `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite`
+
+ Note that the above command can only be run from project root directory since this module
+ depends on core and the test-jars of core and streaming. This means an install step is
+ required to make the test dependencies visible to the Java 8 sub-project.
diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml
new file mode 100644
index 0000000000..0ad9c5303a
--- /dev/null
+++ b/external/java8-tests/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>java8-tests_2.11</artifactId>
+ <packaging>pom</packaging>
+ <name>Spark Project Java8 Tests POM</name>
+
+ <properties>
+ <sbt.project.name>java8-tests</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>java8-tests</id>
+ </profile>
+ </profiles>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-install-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <systemPropertyVariables>
+ <!-- For some reason surefire isn't setting this log4j file on the
+ test classpath automatically. So we add it manually. -->
+ <log4j.configuration>
+ file:src/test/resources/log4j.properties
+ </log4j.configuration>
+ </systemPropertyVariables>
+ <skipTests>false</skipTests>
+ <includes>
+ <include>**/Suite*.java</include>
+ <include>**/*Suite.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>test-compile-first</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <fork>true</fork>
+ <verbose>true</verbose>
+ <forceJavacCompilerUse>true</forceJavacCompilerUse>
+ <source>1.8</source>
+ <compilerVersion>1.8</compilerVersion>
+ <target>1.8</target>
+ <encoding>UTF-8</encoding>
+ <maxmem>1024m</maxmem>
+ </configuration>
+ </plugin>
+ <plugin>
+ <!-- disabled -->
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>scala-test-compile-first</id>
+ <phase>none</phase>
+ </execution>
+ <execution>
+ <id>attach-scaladocs</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
new file mode 100644
index 0000000000..c0b58e713f
--- /dev/null
+++ b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.util.Utils;
+
+/**
+ * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+public class Java8APISuite implements Serializable {
+ static int foreachCalls = 0;
+ private transient JavaSparkContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaSparkContext("local", "JavaAPISuite");
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ }
+
+ @Test
+ public void foreachWithAnonymousClass() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String s) {
+ foreachCalls++;
+ }
+ });
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void foreach() {
+ foreachCalls = 0;
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
+ rdd.foreach(x -> foreachCalls++);
+ Assert.assertEquals(2, foreachCalls);
+ }
+
+ @Test
+ public void groupBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
+ JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+
+ oddsAndEvens = rdd.groupBy(isOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+ }
+
+ @Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(1, 2),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<>(1, 'x'),
+ new Tuple2<>(2, 'y'),
+ new Tuple2<>(2, 'z'),
+ new Tuple2<>(4, 'w')
+ ));
+ List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
+ public void foldReduce() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
+
+ int sum = rdd.fold(0, add);
+ Assert.assertEquals(33, sum);
+
+ sum = rdd.reduce(add);
+ Assert.assertEquals(33, sum);
+ }
+
+ @Test
+ public void foldByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
+ Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
+ }
+
+ @Test
+ public void reduceByKey() {
+ List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
+ new Tuple2<>(2, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(1, 1),
+ new Tuple2<>(3, 2),
+ new Tuple2<>(3, 1)
+ );
+ JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
+ JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
+ Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
+ Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
+ Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
+
+ Map<Integer, Integer> localCounts = counts.collectAsMap();
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+
+ localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
+ Assert.assertEquals(1, localCounts.get(1).intValue());
+ Assert.assertEquals(2, localCounts.get(2).intValue());
+ Assert.assertEquals(3, localCounts.get(3).intValue());
+ }
+
+ @Test
+ public void map() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
+ doubles.collect();
+ JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
+ .cache();
+ pairs.collect();
+ JavaRDD<String> strings = rdd.map(Object::toString).cache();
+ strings.collect();
+ }
+
+ @Test
+ public void flatMap() {
+ JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
+ "The quick brown fox jumps over the lazy dog."));
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+
+ Assert.assertEquals("Hello", words.first());
+ Assert.assertEquals(11, words.count());
+
+ JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
+ List<Tuple2<String, String>> pairs2 = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ pairs2.add(new Tuple2<>(word, word));
+ }
+ return pairs2;
+ });
+
+ Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
+ Assert.assertEquals(11, pairs.count());
+
+ JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
+ List<Double> lengths = new LinkedList<>();
+ for (String word : s.split(" ")) {
+ lengths.add((double) word.length());
+ }
+ return lengths;
+ });
+
+ Assert.assertEquals(5.0, doubles.first(), 0.01);
+ Assert.assertEquals(11, pairs.count());
+ }
+
+ @Test
+ public void mapsFromPairsToPairs() {
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
+
+ // Regression test for SPARK-668:
+ JavaPairRDD<String, Integer> swapped =
+ pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+ swapped.collect();
+
+ // There was never a bug here, but it's worth testing:
+ pairRDD.map(Tuple2::swap).collect();
+ }
+
+ @Test
+ public void mapPartitions() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
+ JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
+ int sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next();
+ }
+ return Collections.singletonList(sum);
+ });
+
+ Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
+ }
+
+ @Test
+ public void sequenceFile() {
+ File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+ String outputDir = new File(tempDir, "output").getAbsolutePath();
+ List<Tuple2<Integer, String>> pairs = Arrays.asList(
+ new Tuple2<>(1, "a"),
+ new Tuple2<>(2, "aa"),
+ new Tuple2<>(3, "aaa")
+ );
+ JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+ rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+
+ // Try reading the output back as an object file
+ JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
+ .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString()));
+ Assert.assertEquals(pairs, readRDD.collect());
+ Utils.deleteRecursively(tempDir);
+ }
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
+
+ @Test
+ public void zipPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
+ FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
+ (Iterator<Integer> i, Iterator<String> s) -> {
+ int sizeI = 0;
+ while (i.hasNext()) {
+ sizeI += 1;
+ i.next();
+ }
+ int sizeS = 0;
+ while (s.hasNext()) {
+ sizeS += 1;
+ s.next();
+ }
+ return Arrays.asList(sizeI, sizeS).iterator();
+ };
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
+ Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
+ }
+
+ @Test
+ public void accumulators() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+
+ Accumulator<Integer> intAccum = sc.intAccumulator(10);
+ rdd.foreach(intAccum::add);
+ Assert.assertEquals((Integer) 25, intAccum.value());
+
+ Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
+ rdd.foreach(x -> doubleAccum.add((double) x));
+ Assert.assertEquals((Double) 25.0, doubleAccum.value());
+
+ // Try a custom accumulator type
+ AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
+ @Override
+ public Float addInPlace(Float r, Float t) {
+ return r + t;
+ }
+ @Override
+ public Float addAccumulator(Float r, Float t) {
+ return r + t;
+ }
+ @Override
+ public Float zero(Float initialValue) {
+ return 0.0f;
+ }
+ };
+
+ Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
+ rdd.foreach(x -> floatAccum.add((float) x));
+ Assert.assertEquals((Float) 25.0f, floatAccum.value());
+
+ // Test the setValue method
+ floatAccum.setValue(5.0f);
+ Assert.assertEquals((Float) 5.0f, floatAccum.value());
+ }
+
+ @Test
+ public void keyBy() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
+ List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
+ Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
+ Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
+ }
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+ JavaPairRDD<Integer, Integer> rdd3 =
+ rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(0, 2),
+ new Tuple2<>(1, 3),
+ new Tuple2<>(0, 4)), rdd3.collect());
+ }
+
+ @Test
+ public void collectPartitions() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 =
+ rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
+ List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
+ Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
+
+ parts = rdd1.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
+ Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
+ rdd2.collectPartitions(new int[]{0})[0]);
+
+ List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
+ Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
+ parts2[1]);
+ }
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
+ JavaPairRDD<Integer, int[]> pairRDD =
+ rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
+ pairRDD.collect(); // Works fine
+ pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
+}
diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
new file mode 100644
index 0000000000..604d818ef1
--- /dev/null
+++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -0,0 +1,905 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
+
+/**
+ * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8
+ * lambda syntax.
+ */
+@SuppressWarnings("unchecked")
+public class Java8APISuite extends LocalJavaStreamingContext implements Serializable {
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5, 5),
+ Arrays.asList(9, 4));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(String::length);
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red sox"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(s -> s.contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red sox"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOX"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(in -> {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y);
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
+ (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1, 2, 3),
+ Arrays.asList(4, 5, 6),
+ Arrays.asList(7, 8, 9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 4, 5),
+ Arrays.asList(6, 7, 8),
+ Arrays.asList(9, 10, 11));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2));
+
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(in -> null);
+ JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null);
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null);
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null);
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null);
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null);
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null);
+ JavaPairDStream<String, String> pairTransformed4 =
+ pairStream.transformToPair((x, time) -> null);
+
+ }
+
+ @Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("new york", "yankees")),
+ Arrays.asList(
+ new Tuple2<>("california", "sharks"),
+ new Tuple2<>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "islanders")));
+
+
+ List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Sets.newHashSet(
+ new Tuple2<>("california",
+ new Tuple2<>("dodgers", "giants")),
+ new Tuple2<>("new york",
+ new Tuple2<>("yankees", "mets"))),
+ Sets.newHashSet(
+ new Tuple2<>("california",
+ new Tuple2<>("sharks", "ducks")),
+ new Tuple2<>("new york",
+ new Tuple2<>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y));
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Tuple2<String, String>>> res : result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
+
+ Assert.assertEquals(expected, unorderedResult);
+ }
+
+
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null);
+ JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed3 =
+ stream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> transformed4 =
+ stream1.transformWithToPair(pairStream1,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null);
+
+ JavaDStream<Double> pairTransformed2_ =
+ pairStream1.transformWith(pairStream1,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed3 =
+ pairStream1.transformWithToPair(stream2,(x, y, z) -> null);
+
+ JavaPairDStream<Double, Double> pairTransformed4 =
+ pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null);
+ }
+
+ @Test
+ public void testStreamingContextTransform() {
+ List<List<Integer>> stream1input = Arrays.asList(
+ Arrays.asList(1),
+ Arrays.asList(2)
+ );
+
+ List<List<Integer>> stream2input = Arrays.asList(
+ Arrays.asList(3),
+ Arrays.asList(4)
+ );
+
+ List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList(
+ Arrays.asList(new Tuple2<>(1, "x")),
+ Arrays.asList(new Tuple2<>(2, "y"))
+ );
+
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))),
+ Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y")))
+ );
+
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1);
+ JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1));
+
+ List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2);
+
+ // This is just to test whether this transform to JavaStream compiles
+ JavaDStream<Long> transformed1 = ssc.transform(
+ listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ Assert.assertEquals(2, listOfRDDs.size());
+ return null;
+ });
+
+ List<JavaDStream<?>> listOfDStreams2 =
+ Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
+
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
+ listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> {
+ Assert.assertEquals(3, listOfRDDs.size());
+ JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0);
+ JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1);
+ JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2);
+ JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3);
+ PairFunction<Integer, Integer, Integer> mapToTuple =
+ (Integer i) -> new Tuple2<>(i, i);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
+ });
+ JavaTestUtils.attachTestOutputStream(transformed2);
+ List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result =
+ JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"),
+ Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"),
+ Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(rdd -> {
+ accumRdd.add(1);
+ rdd.foreach(x -> accumEle.add(1));
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD((rdd, time) -> null);
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(6, "g"),
+ new Tuple2<>(6, "i"),
+ new Tuple2<>(6, "a"),
+ new Tuple2<>(6, "n"),
+ new Tuple2<>(6, "t"),
+ new Tuple2<>(6, "s")),
+ Arrays.asList(
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "o"),
+ new Tuple2<>(7, "d"),
+ new Tuple2<>(7, "g"),
+ new Tuple2<>(7, "e"),
+ new Tuple2<>(7, "r"),
+ new Tuple2<>(7, "s")),
+ Arrays.asList(
+ new Tuple2<>(9, "a"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "h"),
+ new Tuple2<>(9, "l"),
+ new Tuple2<>(9, "e"),
+ new Tuple2<>(9, "t"),
+ new Tuple2<>(9, "i"),
+ new Tuple2<>(9, "c"),
+ new Tuple2<>(9, "s")));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter : s.split("(?!^)")) {
+ out.add(new Tuple2<>(s.length(), letter));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ expected.forEach(list -> Collections.sort(list));
+ List<List<T>> sortedActual = new ArrayList<>();
+ actual.forEach(list -> {
+ List<T> sortedList = new ArrayList<>(list);
+ Collections.sort(sortedList);
+ sortedActual.add(sortedList);
+ });
+ Assert.assertEquals(expected, sortedActual);
+ }
+
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red sox"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("giants", 6)),
+ Arrays.asList(new Tuple2<>("yankees", 7)));
+
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream =
+ stream.mapToPair(x -> new Tuple2<>(x, x.length()));
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a"));
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", "dodgers"),
+ new Tuple2<>("california", "giants"),
+ new Tuple2<>("new york", "yankees"),
+ new Tuple2<>("new york", "mets")),
+ Arrays.asList(new Tuple2<>("california", "sharks"),
+ new Tuple2<>("california", "ducks"),
+ new Tuple2<>("new york", "rangers"),
+ new Tuple2<>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("california", 1),
+ new Tuple2<>("california", 3),
+ new Tuple2<>("new york", 4),
+ new Tuple2<>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<>("california", 5),
+ new Tuple2<>("california", 5),
+ new Tuple2<>("new york", 3),
+ new Tuple2<>("new york", 1)));
+
+ @Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(1, "california"),
+ new Tuple2<>(3, "california"),
+ new Tuple2<>(4, "new york"),
+ new Tuple2<>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(5, "california"),
+ new Tuple2<>(3, "new york"),
+ new Tuple2<>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(in -> in._2());
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<>("hi", 1),
+ new Tuple2<>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")),
+ Arrays.asList(
+ new Tuple2<>(1, "h"),
+ new Tuple2<>(1, "i"),
+ new Tuple2<>(2, "h"),
+ new Tuple2<>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> {
+ List<Tuple2<Integer, String>> out = new LinkedList<>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<>(in._2(), s.toString()));
+ }
+ return out;
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y);
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i,
+ (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2));
+
+ JavaTestUtils.attachTestOutputStream(combined);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUpdateStateByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v : values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ });
+
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", 4),
+ new Tuple2<>("new york", 5)),
+ Arrays.asList(new Tuple2<>("california", 14),
+ new Tuple2<>("new york", 9)),
+ Arrays.asList(new Tuple2<>("california", 10),
+ new Tuple2<>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000),
+ new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
+ Arrays.asList(
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)),
+ Arrays.asList(
+ new Tuple2<>(1, 5),
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey());
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToNormalRDDTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<>(3, 5),
+ new Tuple2<>(1, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(2, 5)),
+ Arrays.asList(
+ new Tuple2<>(2, 5),
+ new Tuple2<>(3, 5),
+ new Tuple2<>(4, 5),
+ new Tuple2<>(1, 5)));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3, 1, 4, 2),
+ Arrays.asList(2, 3, 4, 1));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1()));
+ JavaTestUtils.attachTestOutputStream(firstParts);
+ List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", "DODGERS"),
+ new Tuple2<>("california", "GIANTS"),
+ new Tuple2<>("new york", "YANKEES"),
+ new Tuple2<>("new york", "METS")),
+ Arrays.asList(new Tuple2<>("california", "SHARKS"),
+ new Tuple2<>("california", "DUCKS"),
+ new Tuple2<>("new york", "RANGERS"),
+ new Tuple2<>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase);
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<>("california", "dodgers1"),
+ new Tuple2<>("california", "dodgers2"),
+ new Tuple2<>("california", "giants1"),
+ new Tuple2<>("california", "giants2"),
+ new Tuple2<>("new york", "yankees1"),
+ new Tuple2<>("new york", "yankees2"),
+ new Tuple2<>("new york", "mets1"),
+ new Tuple2<>("new york", "mets2")),
+ Arrays.asList(new Tuple2<>("california", "sharks1"),
+ new Tuple2<>("california", "sharks2"),
+ new Tuple2<>("california", "ducks1"),
+ new Tuple2<>("california", "ducks2"),
+ new Tuple2<>("new york", "rangers1"),
+ new Tuple2<>("new york", "rangers2"),
+ new Tuple2<>("new york", "islanders1"),
+ new Tuple2<>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> flatMapped =
+ pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2"));
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ Assert.assertEquals(expected, result);
+ }
+
+ /**
+ * This test is only for testing the APIs. It's not necessary to run it.
+ */
+ public void testMapWithStateAPI() {
+ JavaPairRDD<String, Boolean> initialRDD = null;
+ JavaPairDStream<String, Integer> wordsDstream = null;
+
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
+ wordsDstream.mapWithState(
+ StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ }).initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
+
+ JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+
+ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
+ wordsDstream.mapWithState(
+ StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ }).initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
+
+ JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
+ }
+}
diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..eb3b1999eb
--- /dev/null
+++ b/external/java8-tests/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
+org.spark-project.jetty.LEVEL=WARN
diff --git a/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
new file mode 100644
index 0000000000..fa0681db41
--- /dev/null
+++ b/external/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Test cases where JDK8-compiled Scala user code is used with Spark.
+ */
+class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
+ test("basic RDD closure test (SPARK-6152)") {
+ sc.parallelize(1 to 1000).map(x => x * x).count()
+ }
+}
diff --git a/external/kinesis-asl-assembly/pom.xml b/external/kinesis-asl-assembly/pom.xml
new file mode 100644
index 0000000000..d1c38c7ca5
--- /dev/null
+++ b/external/kinesis-asl-assembly/pom.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Kinesis Assembly</name>
+ <url>http://spark.apache.org/</url>
+
+ <properties>
+ <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!--
+ Demote already included in the Spark assembly.
+ -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-mapred</artifactId>
+ <classifier>${avro.mapred.classifier}</classifier>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>*:*</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>reference.conf</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+</build>
+</project>
+
diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml
new file mode 100644
index 0000000000..935155eb5d
--- /dev/null
+++ b/external/kinesis-asl/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kinesis-asl_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Kinesis Integration</name>
+
+ <properties>
+ <sbt.project.name>streaming-kinesis-asl</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-client</artifactId>
+ <version>${aws.kinesis.client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-producer</artifactId>
+ <version>${aws.kinesis.producer.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
new file mode 100644
index 0000000000..5dc825dfdc
--- /dev/null
+++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.amazonaws.regions.RegionUtils;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
+ *
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
+ *
+ * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
+ * [app-name] is the name of the consumer app, used to track the read data in DynamoDB
+ * [stream-name] name of the Kinesis stream (ie. mySparkStream)
+ * [endpoint-url] endpoint of the Kinesis service
+ * (e.g. https://kinesis.us-east-1.amazonaws.com)
+ *
+ *
+ * Example:
+ * # export AWS keys if necessary
+ * $ export AWS_ACCESS_KEY_ID=[your-access-key]
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ * # run the example
+ * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
+ */
+public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
+ private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+ private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
+
+ public static void main(String[] args) {
+ // Check that all required args were passed in.
+ if (args.length != 3) {
+ System.err.println(
+ "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
+ " <app-name> is the name of the app, used to track the read data in DynamoDB\n" +
+ " <stream-name> is the name of the Kinesis stream\n" +
+ " <endpoint-url> is the endpoint of the Kinesis service\n" +
+ " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" +
+ "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" +
+ "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" +
+ "details.\n"
+ );
+ System.exit(1);
+ }
+
+ // Set default log4j logging level to WARN to hide Spark logs
+ StreamingExamples.setStreamingLogLevels();
+
+ // Populate the appropriate variables from the given args
+ String kinesisAppName = args[0];
+ String streamName = args[1];
+ String endpointUrl = args[2];
+
+ // Create a Kinesis client in order to determine the number of shards for the given stream
+ AmazonKinesisClient kinesisClient =
+ new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
+ kinesisClient.setEndpoint(endpointUrl);
+ int numShards =
+ kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
+
+
+ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+ // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+ // then the shards will be automatically distributed among the receivers and each receiver
+ // will receive data from multiple shards.
+ int numStreams = numShards;
+
+ // Spark Streaming batch interval
+ Duration batchInterval = new Duration(2000);
+
+ // Kinesis checkpoint interval. Same as batchInterval for this example.
+ Duration kinesisCheckpointInterval = batchInterval;
+
+ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+ // DynamoDB of the same region as the Kinesis stream
+ String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
+
+ // Setup the Spark config and StreamingContext
+ SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL");
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval);
+
+ // Create the Kinesis DStreams
+ List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
+ for (int i = 0; i < numStreams; i++) {
+ streamsList.add(
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2())
+ );
+ }
+
+ // Union all the streams if there is more than 1 stream
+ JavaDStream<byte[]> unionStreams;
+ if (streamsList.size() > 1) {
+ unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size()));
+ } else {
+ // Otherwise, just use the 1 stream
+ unionStreams = streamsList.get(0);
+ }
+
+ // Convert each line of Array[Byte] to String, and split into words
+ JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
+ @Override
+ public Iterator<String> call(byte[] line) {
+ String s = new String(line, StandardCharsets.UTF_8);
+ return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
+ }
+ });
+
+ // Map each word to a (word, 1) tuple so we can reduce by key to count the words
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }
+ ).reduceByKey(
+ new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ }
+ );
+
+ // Print the first 10 wordCounts
+ wordCounts.print();
+
+ // Start the streaming context and await termination
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
diff --git a/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
new file mode 100644
index 0000000000..4d7fc9a549
--- /dev/null
+++ b/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Consumes messages from a Amazon Kinesis streams and does wordcount.
+
+ This example spins up 1 Kinesis Receiver per shard for the given stream.
+ It then starts pulling from the last checkpointed sequence number of the given stream.
+
+ Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
+ <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ <endpoint-url> endpoint of the Kinesis service
+ (e.g. https://kinesis.us-east-1.amazonaws.com)
+
+
+ Example:
+ # export AWS keys if necessary
+ $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ $ export AWS_SECRET_KEY=<your-secret-key>
+
+ # run the example
+ $ bin/spark-submit -jar external/kinesis-asl/target/scala-*/\
+ spark-streaming-kinesis-asl-assembly_*.jar \
+ external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
+ myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
+
+ There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ onto the Kinesis stream.
+
+ This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ in the following order:
+ Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ Java System Properties - aws.accessKeyId and aws.secretKey
+ Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ Instance profile credentials - delivered through the Amazon EC2 metadata service
+ For more information, see
+ http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+
+ See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ the Kinesis Spark Streaming integration.
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+if __name__ == "__main__":
+ if len(sys.argv) != 5:
+ print(
+ "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>",
+ file=sys.stderr)
+ sys.exit(-1)
+
+ sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
+ ssc = StreamingContext(sc, 1)
+ appName, streamName, endpointUrl, regionName = sys.argv[1:]
+ lines = KinesisUtils.createStream(
+ ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
+ counts = lines.flatMap(lambda line: line.split(" ")) \
+ .map(lambda word: (word, 1)) \
+ .reduceByKey(lambda a, b: a+b)
+ counts.pprint()
+
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/external/kinesis-asl/src/main/resources/log4j.properties b/external/kinesis-asl/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..6cdc9286c5
--- /dev/null
+++ b/external/kinesis-asl/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark-project.jetty=WARN
+log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO \ No newline at end of file
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
new file mode 100644
index 0000000000..6a73bc0e30
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import com.amazonaws.auth.{BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+
+/**
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
+ *
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the given stream.
+ *
+ * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+ * <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ * <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ * <endpoint-url> endpoint of the Kinesis service
+ * (e.g. https://kinesis.us-east-1.amazonaws.com)
+ *
+ *
+ * Example:
+ * # export AWS keys if necessary
+ * $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ * $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ * # run the example
+ * $ SPARK_HOME/bin/run-example streaming.KinesisWordCountASL myAppName mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts dummy data
+ * onto the Kinesis stream.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata service
+ * For more information, see
+ * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on
+ * the Kinesis Spark Streaming integration.
+ */
+object KinesisWordCountASL extends Logging {
+ def main(args: Array[String]) {
+ // Check that all required args were passed in.
+ if (args.length != 3) {
+ System.err.println(
+ """
+ |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
+ |
+ | <app-name> is the name of the consumer app, used to track the read data in DynamoDB
+ | <stream-name> is the name of the Kinesis stream
+ | <endpoint-url> is the endpoint of the Kinesis service
+ | (e.g. https://kinesis.us-east-1.amazonaws.com)
+ |
+ |Generate input data for Kinesis stream using the example KinesisWordProducerASL.
+ |See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
+ |details.
+ """.stripMargin)
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Populate the appropriate variables from the given args
+ val Array(appName, streamName, endpointUrl) = args
+
+
+ // Determine the number of shards from the stream using the low-level Kinesis Client
+ // from the AWS Java SDK.
+ val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
+ require(credentials != null,
+ "No AWS credentials found. Please specify credentials using one of the methods specified " +
+ "in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
+ val kinesisClient = new AmazonKinesisClient(credentials)
+ kinesisClient.setEndpoint(endpointUrl)
+ val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
+
+
+ // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
+ // This is not a necessity; if there are less receivers/DStreams than the number of shards,
+ // then the shards will be automatically distributed among the receivers and each receiver
+ // will receive data from multiple shards.
+ val numStreams = numShards
+
+ // Spark Streaming batch interval
+ val batchInterval = Milliseconds(2000)
+
+ // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
+ // on sequence number of records that have been received. Same as batchInterval for this
+ // example.
+ val kinesisCheckpointInterval = batchInterval
+
+ // Get the region name from the endpoint URL to save Kinesis Client Library metadata in
+ // DynamoDB of the same region as the Kinesis stream
+ val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+
+ // Setup the SparkConfig and StreamingContext
+ val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
+ val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+ // Create the Kinesis DStreams
+ val kinesisStreams = (0 until numStreams).map { i =>
+ KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
+ InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
+ }
+
+ // Union all the streams
+ val unionStreams = ssc.union(kinesisStreams)
+
+ // Convert each line of Array[Byte] to String, and split into words
+ val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
+
+ // Map each word to a (word, 1) tuple so we can reduce by key to count the words
+ val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+
+ // Print the first 10 wordCounts
+ wordCounts.print()
+
+ // Start the streaming context and await termination
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+/**
+ * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
+ * <records-per-sec> <words-per-record>
+ *
+ * <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ * <endpoint-url> is the endpoint of the Kinesis service
+ * (ie. https://kinesis.us-east-1.amazonaws.com)
+ * <records-per-sec> is the rate of records per second to put onto the stream
+ * <words-per-record> is the rate of records per second to put onto the stream
+ *
+ * Example:
+ * $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL mySparkStream \
+ * https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
+ */
+object KinesisWordProducerASL {
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println(
+ """
+ |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> <records-per-sec>
+ <words-per-record>
+ |
+ | <stream-name> is the name of the Kinesis stream
+ | <endpoint-url> is the endpoint of the Kinesis service
+ | (e.g. https://kinesis.us-east-1.amazonaws.com)
+ | <records-per-sec> is the rate of records per second to put onto the stream
+ | <words-per-record> is the rate of records per second to put onto the stream
+ |
+ """.stripMargin)
+
+ System.exit(1)
+ }
+
+ // Set default log4j logging level to WARN to hide Spark logs
+ StreamingExamples.setStreamingLogLevels()
+
+ // Populate the appropriate variables from the given args
+ val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
+
+ // Generate the records and return the totals
+ val totals = generate(stream, endpoint, recordsPerSecond.toInt,
+ wordsPerRecord.toInt)
+
+ // Print the array of (word, total) tuples
+ println("Totals for the words sent")
+ totals.foreach(println(_))
+ }
+
+ def generate(stream: String,
+ endpoint: String,
+ recordsPerSecond: Int,
+ wordsPerRecord: Int): Seq[(String, Int)] = {
+
+ val randomWords = List("spark", "you", "are", "my", "father")
+ val totals = scala.collection.mutable.Map[String, Int]()
+
+ // Create the low-level Kinesis Client from the AWS Java SDK.
+ val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
+ kinesisClient.setEndpoint(endpoint)
+
+ println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
+ s" $recordsPerSecond records per second and $wordsPerRecord words per record")
+
+ // Iterate and put records onto the stream per the given recordPerSec and wordsPerRecord
+ for (i <- 1 to 10) {
+ // Generate recordsPerSec records to put onto the stream
+ val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+ // Randomly generate wordsPerRecord number of words
+ val data = (1 to wordsPerRecord.toInt).map(x => {
+ // Get a random index to a word
+ val randomWordIdx = Random.nextInt(randomWords.size)
+ val randomWord = randomWords(randomWordIdx)
+
+ // Increment total count to compare to server counts later
+ totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
+
+ randomWord
+ }).mkString(" ")
+
+ // Create a partitionKey based on recordNum
+ val partitionKey = s"partitionKey-$recordNum"
+
+ // Create a PutRecordRequest with an Array[Byte] version of the data
+ val putRecordRequest = new PutRecordRequest().withStreamName(stream)
+ .withPartitionKey(partitionKey)
+ .withData(ByteBuffer.wrap(data.getBytes()))
+
+ // Put the record onto the stream and capture the PutRecordResult
+ val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+ }
+
+ // Sleep for a second
+ Thread.sleep(1000)
+ println("Sent " + recordsPerSecond + " records")
+ }
+ // Convert the totals to (index, total) tuple
+ totals.toSeq.sortBy(_._1)
+ }
+}
+
+/**
+ * Utility functions for Spark Streaming examples.
+ * This has been lifted from the examples/ project to remove the circular dependency.
+ */
+private[streaming] object StreamingExamples extends Logging {
+ // Set reasonable logging levels for streaming if the user has not configured log4j.
+ def setStreamingLogLevels() {
+ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ // We first log something to initialize Spark's default logging, then we override the
+ // logging level.
+ logInfo("Setting log level to [WARN] for streaming example." +
+ " To override add a custom log4j.properties to the classpath.")
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+ }
+}
+// scalastyle:on println
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
new file mode 100644
index 0000000000..3996f168e6
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark._
+import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.util.NextIterator
+
+
+/** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */
+private[kinesis]
+case class SequenceNumberRange(
+ streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String)
+
+/** Class representing an array of Kinesis sequence number ranges */
+private[kinesis]
+case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
+ def isEmpty(): Boolean = ranges.isEmpty
+
+ def nonEmpty(): Boolean = ranges.nonEmpty
+
+ override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
+}
+
+private[kinesis]
+object SequenceNumberRanges {
+ def apply(range: SequenceNumberRange): SequenceNumberRanges = {
+ new SequenceNumberRanges(Seq(range))
+ }
+}
+
+
+/** Partition storing the information of the ranges of Kinesis sequence numbers to read */
+private[kinesis]
+class KinesisBackedBlockRDDPartition(
+ idx: Int,
+ blockId: BlockId,
+ val isBlockIdValid: Boolean,
+ val seqNumberRanges: SequenceNumberRanges
+ ) extends BlockRDDPartition(blockId, idx)
+
+/**
+ * A BlockRDD where the block data is backed by Kinesis, which can accessed using the
+ * sequence numbers of the corresponding blocks.
+ */
+private[kinesis]
+class KinesisBackedBlockRDD[T: ClassTag](
+ sc: SparkContext,
+ val regionName: String,
+ val endpointUrl: String,
+ @transient private val _blockIds: Array[BlockId],
+ @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
+ @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
+ val retryTimeoutMs: Int = 10000,
+ val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
+ val awsCredentialsOption: Option[SerializableAWSCredentials] = None
+ ) extends BlockRDD[T](sc, _blockIds) {
+
+ require(_blockIds.length == arrayOfseqNumberRanges.length,
+ "Number of blockIds is not equal to the number of sequence number ranges")
+
+ override def isValid(): Boolean = true
+
+ override def getPartitions: Array[Partition] = {
+ Array.tabulate(_blockIds.length) { i =>
+ val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
+ new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i))
+ }
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ val blockManager = SparkEnv.get.blockManager
+ val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition]
+ val blockId = partition.blockId
+
+ def getBlockFromBlockManager(): Option[Iterator[T]] = {
+ logDebug(s"Read partition data of $this from block manager, block $blockId")
+ blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+ }
+
+ def getBlockFromKinesis(): Iterator[T] = {
+ val credentials = awsCredentialsOption.getOrElse {
+ new DefaultAWSCredentialsProviderChain().getCredentials()
+ }
+ partition.seqNumberRanges.ranges.iterator.flatMap { range =>
+ new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
+ range, retryTimeoutMs).map(messageHandler)
+ }
+ }
+ if (partition.isBlockIdValid) {
+ getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
+ } else {
+ getBlockFromKinesis()
+ }
+ }
+}
+
+
+/**
+ * An iterator that return the Kinesis data based on the given range of sequence numbers.
+ * Internally, it repeatedly fetches sets of records starting from the fromSequenceNumber,
+ * until the endSequenceNumber is reached.
+ */
+private[kinesis]
+class KinesisSequenceRangeIterator(
+ credentials: AWSCredentials,
+ endpointUrl: String,
+ regionId: String,
+ range: SequenceNumberRange,
+ retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+
+ private val client = new AmazonKinesisClient(credentials)
+ private val streamName = range.streamName
+ private val shardId = range.shardId
+
+ private var toSeqNumberReceived = false
+ private var lastSeqNumber: String = null
+ private var internalIterator: Iterator[Record] = null
+
+ client.setEndpoint(endpointUrl, "kinesis", regionId)
+
+ override protected def getNext(): Record = {
+ var nextRecord: Record = null
+ if (toSeqNumberReceived) {
+ finished = true
+ } else {
+
+ if (internalIterator == null) {
+
+ // If the internal iterator has not been initialized,
+ // then fetch records from starting sequence number
+ internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber)
+ } else if (!internalIterator.hasNext) {
+
+ // If the internal iterator does not have any more records,
+ // then fetch more records after the last consumed sequence number
+ internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber)
+ }
+
+ if (!internalIterator.hasNext) {
+
+ // If the internal iterator still does not have any data, then throw exception
+ // and terminate this iterator
+ finished = true
+ throw new SparkException(
+ s"Could not read until the end sequence number of the range: $range")
+ } else {
+
+ // Get the record, copy the data into a byte array and remember its sequence number
+ nextRecord = internalIterator.next()
+ lastSeqNumber = nextRecord.getSequenceNumber()
+
+ // If the this record's sequence number matches the stopping sequence number, then make sure
+ // the iterator is marked finished next time getNext() is called
+ if (nextRecord.getSequenceNumber == range.toSeqNumber) {
+ toSeqNumberReceived = true
+ }
+ }
+ }
+ nextRecord
+ }
+
+ override protected def close(): Unit = {
+ client.shutdown()
+ }
+
+ /**
+ * Get records starting from or after the given sequence number.
+ */
+ private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = {
+ val shardIterator = getKinesisIterator(iteratorType, seqNum)
+ val result = getRecordsAndNextKinesisIterator(shardIterator)
+ result._1
+ }
+
+ /**
+ * Get the records starting from using a Kinesis shard iterator (which is a progress handle
+ * to get records from Kinesis), and get the next shard iterator for next consumption.
+ */
+ private def getRecordsAndNextKinesisIterator(
+ shardIterator: String): (Iterator[Record], String) = {
+ val getRecordsRequest = new GetRecordsRequest
+ getRecordsRequest.setRequestCredentials(credentials)
+ getRecordsRequest.setShardIterator(shardIterator)
+ val getRecordsResult = retryOrTimeout[GetRecordsResult](
+ s"getting records using shard iterator") {
+ client.getRecords(getRecordsRequest)
+ }
+ // De-aggregate records, if KPL was used in producing the records. The KCL automatically
+ // handles de-aggregation during regular operation. This code path is used during recovery
+ val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
+ (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
+ }
+
+ /**
+ * Get the Kinesis shard iterator for getting records starting from or after the given
+ * sequence number.
+ */
+ private def getKinesisIterator(
+ iteratorType: ShardIteratorType,
+ sequenceNumber: String): String = {
+ val getShardIteratorRequest = new GetShardIteratorRequest
+ getShardIteratorRequest.setRequestCredentials(credentials)
+ getShardIteratorRequest.setStreamName(streamName)
+ getShardIteratorRequest.setShardId(shardId)
+ getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
+ getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
+ val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
+ s"getting shard iterator from sequence number $sequenceNumber") {
+ client.getShardIterator(getShardIteratorRequest)
+ }
+ getShardIteratorResult.getShardIterator
+ }
+
+ /** Helper method to retry Kinesis API request with exponential backoff and timeouts */
+ private def retryOrTimeout[T](message: String)(body: => T): T = {
+ import KinesisSequenceRangeIterator._
+
+ var startTimeMs = System.currentTimeMillis()
+ var retryCount = 0
+ var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
+ var result: Option[T] = None
+ var lastError: Throwable = null
+
+ def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= retryTimeoutMs
+ def isMaxRetryDone = retryCount >= MAX_RETRIES
+
+ while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
+ if (retryCount > 0) { // wait only if this is a retry
+ Thread.sleep(waitTimeMs)
+ waitTimeMs *= 2 // if you have waited, then double wait time for next round
+ }
+ try {
+ result = Some(body)
+ } catch {
+ case NonFatal(t) =>
+ lastError = t
+ t match {
+ case ptee: ProvisionedThroughputExceededException =>
+ logWarning(s"Error while $message [attempt = ${retryCount + 1}]", ptee)
+ case e: Throwable =>
+ throw new SparkException(s"Error while $message", e)
+ }
+ }
+ retryCount += 1
+ }
+ result.getOrElse {
+ if (isTimedOut) {
+ throw new SparkException(
+ s"Timed out after $retryTimeoutMs ms while $message, last exception: ", lastError)
+ } else {
+ throw new SparkException(
+ s"Gave up after $retryCount retries while $message, last exception: ", lastError)
+ }
+ }
+ }
+}
+
+private[streaming]
+object KinesisSequenceRangeIterator {
+ val MAX_RETRIES = 3
+ val MIN_RETRY_WAIT_TIME_MS = 100
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
new file mode 100644
index 0000000000..1ca6d4302c
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers we can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param clock In order to use ManualClocks for the purpose of testing
+ */
+private[kinesis] class KinesisCheckpointer(
+ receiver: KinesisReceiver[_],
+ checkpointInterval: Duration,
+ workerId: String,
+ clock: Clock = new SystemClock) extends Logging {
+
+ // a map from shardId's to checkpointers
+ private val checkpointers = new ConcurrentHashMap[String, IRecordProcessorCheckpointer]()
+
+ private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]()
+
+ private val checkpointerThread: RecurringTimer = startCheckpointerThread()
+
+ /** Update the checkpointer instance to the most recent one for the given shardId. */
+ def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
+ checkpointers.put(shardId, checkpointer)
+ }
+
+ /**
+ * Stop tracking the specified shardId.
+ *
+ * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown [[ShutdownReason.TERMINATE]],
+ * we will use that to make the final checkpoint. If `null` is provided, we will not make the
+ * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+ */
+ def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
+ synchronized {
+ checkpointers.remove(shardId)
+ checkpoint(shardId, checkpointer)
+ }
+ }
+
+ /** Perform the checkpoint. */
+ private def checkpoint(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
+ try {
+ if (checkpointer != null) {
+ receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
+ val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
+ // Kinesis sequence numbers are monotonically increasing strings, therefore we can do
+ // safely do the string comparison
+ if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
+ /* Perform the checkpoint */
+ KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
+ logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint at sequence number" +
+ s" $latestSeqNum for shardId $shardId")
+ lastCheckpointedSeqNums.put(shardId, latestSeqNum)
+ }
+ }
+ } else {
+ logDebug(s"Checkpointing skipped for shardId $shardId. Checkpointer not set.")
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
+ }
+ }
+
+ /** Checkpoint the latest saved sequence numbers for all active shardId's. */
+ private def checkpointAll(): Unit = synchronized {
+ // if this method throws an exception, then the scheduled task will not run again
+ try {
+ val shardIds = checkpointers.keys()
+ while (shardIds.hasMoreElements) {
+ val shardId = shardIds.nextElement()
+ checkpoint(shardId, checkpointers.get(shardId))
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to checkpoint to DynamoDB.", e)
+ }
+ }
+
+ /**
+ * Start the checkpointer thread with the given checkpoint duration.
+ */
+ private def startCheckpointerThread(): RecurringTimer = {
+ val period = checkpointInterval.milliseconds
+ val threadName = s"Kinesis Checkpointer - Worker $workerId"
+ val timer = new RecurringTimer(clock, period, _ => checkpointAll(), threadName)
+ timer.start()
+ logDebug(s"Started checkpointer thread: $threadName")
+ timer
+ }
+
+ /**
+ * Shutdown the checkpointer. Should be called on the onStop of the Receiver.
+ */
+ def shutdown(): Unit = {
+ // the recurring timer checkpoints for us one last time.
+ checkpointerThread.stop(interruptTimer = false)
+ checkpointers.clear()
+ lastCheckpointedSeqNums.clear()
+ logInfo("Successfully shutdown Kinesis Checkpointer.")
+ }
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
new file mode 100644
index 0000000000..5223c81a8e
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.reflect.ClassTag
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.{Duration, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+
+private[kinesis] class KinesisInputDStream[T: ClassTag](
+ _ssc: StreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointAppName: String,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: Record => T,
+ awsCredentialsOption: Option[SerializableAWSCredentials]
+ ) extends ReceiverInputDStream[T](_ssc) {
+
+ private[streaming]
+ override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
+
+ // This returns true even for when blockInfos is empty
+ val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
+
+ if (allBlocksHaveRanges) {
+ // Create a KinesisBackedBlockRDD, even when there are no blocks
+ val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+ val seqNumRanges = blockInfos.map {
+ _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+ logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
+ s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+ new KinesisBackedBlockRDD(
+ context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
+ isBlockIdValid = isBlockIdValid,
+ retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
+ messageHandler = messageHandler,
+ awsCredentialsOption = awsCredentialsOption)
+ } else {
+ logWarning("Kinesis sequence number information was not present with some block metadata," +
+ " it may not be possible to recover from failures")
+ super.createBlockRDD(time, blockInfos)
+ }
+ }
+
+ override def getReceiver(): Receiver[T] = {
+ new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
+ checkpointAppName, checkpointInterval, storageLevel, messageHandler, awsCredentialsOption)
+ }
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
new file mode 100644
index 0000000000..48ee2a9597
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+ extends AWSCredentials {
+ override def getAWSAccessKeyId: String = accessKeyId
+ override def getAWSSecretKey: String = secretKey
+}
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ *
+ * The way this Receiver works is as follows:
+ *
+ * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
+ * KinesisRecordProcessor
+ * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
+ * inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
+ * - When the block generator defines a block, then the recorded sequence number ranges that were
+ * inserted into the block are recorded separately for being used later.
+ * - When the block is ready to be pushed, the block is pushed and the ranges are reported as
+ * metadata of the block. In addition, the ranges are used to find out the latest sequence
+ * number for each shard that can be checkpointed through the DynamoDB.
+ * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
+ * number for it own shard.
+ *
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Region name used by the Kinesis Client Library for
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
+ * by the Kinesis Client Library. If you change the App name or Stream name,
+ * the KCL will throw errors. This usually requires deleting the backing
+ * DynamoDB table with the same name this Kinesis application.
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
+ * the credentials
+ */
+private[kinesis] class KinesisReceiver[T](
+ val streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointAppName: String,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: Record => T,
+ awsCredentialsOption: Option[SerializableAWSCredentials])
+ extends Receiver[T](storageLevel) with Logging { receiver =>
+
+ /*
+ * =================================================================================
+ * The following vars are initialize in the onStart() method which executes in the
+ * Spark worker after this Receiver is serialized and shipped to the worker.
+ * =================================================================================
+ */
+
+ /**
+ * workerId is used by the KCL should be based on the ip address of the actual Spark Worker
+ * where this code runs (not the driver's IP address.)
+ */
+ @volatile private var workerId: String = null
+
+ /**
+ * Worker is the core client abstraction from the Kinesis Client Library (KCL).
+ * A worker can process more than one shards from the given stream.
+ * Each shard is assigned its own IRecordProcessor and the worker run multiple such
+ * processors.
+ */
+ @volatile private var worker: Worker = null
+ @volatile private var workerThread: Thread = null
+
+ /** BlockGenerator used to generates blocks out of Kinesis data */
+ @volatile private var blockGenerator: BlockGenerator = null
+
+ /**
+ * Sequence number ranges added to the current block being generated.
+ * Accessing and updating of this map is synchronized by locks in BlockGenerator.
+ */
+ private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
+
+ /** Sequence number ranges of data added to each generated block */
+ private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
+
+ /**
+ * The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
+ */
+ @volatile private var kinesisCheckpointer: KinesisCheckpointer = null
+
+ /**
+ * Latest sequence number ranges that have been stored successfully.
+ * This is used for checkpointing through KCL */
+ private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
+
+ /**
+ * This is called when the KinesisReceiver starts and must be non-blocking.
+ * The KCL creates and manages the receiving/processing thread pool through Worker.run().
+ */
+ override def onStart() {
+ blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
+
+ workerId = Utils.localHostName() + ":" + UUID.randomUUID()
+
+ kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
+ // KCL config instance
+ val awsCredProvider = resolveAWSCredentialsProvider()
+ val kinesisClientLibConfiguration =
+ new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
+ .withKinesisEndpoint(endpointUrl)
+ .withInitialPositionInStream(initialPositionInStream)
+ .withTaskBackoffTimeMillis(500)
+ .withRegionName(regionName)
+
+ /*
+ * RecordProcessorFactory creates impls of IRecordProcessor.
+ * IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
+ * IRecordProcessor.processRecords() method.
+ * We're using our custom KinesisRecordProcessor in this case.
+ */
+ val recordProcessorFactory = new IRecordProcessorFactory {
+ override def createProcessor: IRecordProcessor =
+ new KinesisRecordProcessor(receiver, workerId)
+ }
+
+ worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
+ workerThread = new Thread() {
+ override def run(): Unit = {
+ try {
+ worker.run()
+ } catch {
+ case NonFatal(e) =>
+ restart("Error running the KCL worker in Receiver", e)
+ }
+ }
+ }
+
+ blockIdToSeqNumRanges.clear()
+ blockGenerator.start()
+
+ workerThread.setName(s"Kinesis Receiver ${streamId}")
+ workerThread.setDaemon(true)
+ workerThread.start()
+
+ logInfo(s"Started receiver with workerId $workerId")
+ }
+
+ /**
+ * This is called when the KinesisReceiver stops.
+ * The KCL worker.shutdown() method stops the receiving/processing threads.
+ * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown.
+ */
+ override def onStop() {
+ if (workerThread != null) {
+ if (worker != null) {
+ worker.shutdown()
+ worker = null
+ }
+ workerThread.join()
+ workerThread = null
+ logInfo(s"Stopped receiver for workerId $workerId")
+ }
+ workerId = null
+ if (kinesisCheckpointer != null) {
+ kinesisCheckpointer.shutdown()
+ kinesisCheckpointer = null
+ }
+ }
+
+ /** Add records of the given shard to the current block being generated */
+ private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
+ if (records.size > 0) {
+ val dataIterator = records.iterator().asScala.map(messageHandler)
+ val metadata = SequenceNumberRange(streamName, shardId,
+ records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
+ blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
+ }
+ }
+
+ /** Get the latest sequence number for the given shard that can be checkpointed through KCL */
+ private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
+ Option(shardIdToLatestStoredSeqNum.get(shardId))
+ }
+
+ /**
+ * Set the checkpointer that will be used to checkpoint sequence numbers to DynamoDB for the
+ * given shardId.
+ */
+ def setCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
+ assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
+ kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
+ }
+
+ /**
+ * Remove the checkpointer for the given shardId. The provided checkpointer will be used to
+ * checkpoint one last time for the given shard. If `checkpointer` is `null`, then we will not
+ * checkpoint.
+ */
+ def removeCheckpointer(shardId: String, checkpointer: IRecordProcessorCheckpointer): Unit = {
+ assert(kinesisCheckpointer != null, "Kinesis Checkpointer not initialized!")
+ kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
+ }
+
+ /**
+ * Remember the range of sequence numbers that was added to the currently active block.
+ * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
+ */
+ private def rememberAddedRange(range: SequenceNumberRange): Unit = {
+ seqNumRangesInCurrentBlock += range
+ }
+
+ /**
+ * Finalize the ranges added to the block that was active and prepare the ranges buffer
+ * for next block. Internally, this is synchronized with `rememberAddedRange()`.
+ */
+ private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
+ blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
+ seqNumRangesInCurrentBlock.clear()
+ logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
+ }
+
+ /** Store the block along with its associated ranges */
+ private def storeBlockWithRanges(
+ blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
+ val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
+ if (rangesToReportOption.isEmpty) {
+ stop("Error while storing block into Spark, could not find sequence number ranges " +
+ s"for block $blockId")
+ return
+ }
+
+ val rangesToReport = rangesToReportOption.get
+ var attempt = 0
+ var stored = false
+ var throwable: Throwable = null
+ while (!stored && attempt <= 3) {
+ try {
+ store(arrayBuffer, rangesToReport)
+ stored = true
+ } catch {
+ case NonFatal(th) =>
+ attempt += 1
+ throwable = th
+ }
+ }
+ if (!stored) {
+ stop("Error while storing block into Spark", throwable)
+ }
+
+ // Update the latest sequence number that have been successfully stored for each shard
+ // Note that we are doing this sequentially because the array of sequence number ranges
+ // is assumed to be
+ rangesToReport.ranges.foreach { range =>
+ shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
+ }
+ }
+
+ /**
+ * If AWS credential is provided, return a AWSCredentialProvider returning that credential.
+ * Otherwise, return the DefaultAWSCredentialsProviderChain.
+ */
+ private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
+ awsCredentialsOption match {
+ case Some(awsCredentials) =>
+ logInfo("Using provided AWS credentials")
+ new AWSCredentialsProvider {
+ override def getCredentials: AWSCredentials = awsCredentials
+ override def refresh(): Unit = { }
+ }
+ case None =>
+ logInfo("Using DefaultAWSCredentialsProviderChain")
+ new DefaultAWSCredentialsProviderChain()
+ }
+ }
+
+
+ /**
+ * Class to handle blocks generated by this receiver's block generator. Specifically, in
+ * the context of the Kinesis Receiver, this handler does the following.
+ *
+ * - When an array of records is added to the current active block in the block generator,
+ * this handler keeps track of the corresponding sequence number range.
+ * - When the currently active block is ready to sealed (not more records), this handler
+ * keep track of the list of ranges added into this block in another H
+ */
+ private class GeneratedBlockHandler extends BlockGeneratorListener {
+
+ /**
+ * Callback method called after a data item is added into the BlockGenerator.
+ * The data addition, block generation, and calls to onAddData and onGenerateBlock
+ * are all synchronized through the same lock.
+ */
+ def onAddData(data: Any, metadata: Any): Unit = {
+ rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
+ }
+
+ /**
+ * Callback method called after a block has been generated.
+ * The data addition, block generation, and calls to onAddData and onGenerateBlock
+ * are all synchronized through the same lock.
+ */
+ def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ finalizeRangesForCurrentBlock(blockId)
+ }
+
+ /** Callback method called when a block is ready to be pushed / stored. */
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ storeBlockWithRanges(blockId,
+ arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
+ }
+
+ /** Callback called in case of any error in internal of the BlockGenerator */
+ def onError(message: String, throwable: Throwable): Unit = {
+ reportError(message, throwable)
+ }
+ }
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
new file mode 100644
index 0000000000..b5b76cb92d
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for each
+ * shard in the Kinesis stream upon startup. This is normally done in separate threads,
+ * but the KCLs within the KinesisReceivers will balance themselves out if you create
+ * multiple Receivers.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ */
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], workerId: String)
+ extends IRecordProcessor with Logging {
+
+ // shardId populated during initialize()
+ @volatile
+ private var shardId: String = _
+
+ /**
+ * The Kinesis Client Library calls this method during IRecordProcessor initialization.
+ *
+ * @param shardId assigned by the KCL to this particular RecordProcessor.
+ */
+ override def initialize(shardId: String) {
+ this.shardId = shardId
+ logInfo(s"Initialized workerId $workerId with shardId $shardId")
+ }
+
+ /**
+ * This method is called by the KCL when a batch of records is pulled from the Kinesis stream.
+ * This is the record-processing bridge between the KCL's IRecordProcessor.processRecords()
+ * and Spark Streaming's Receiver.store().
+ *
+ * @param batch list of records from the Kinesis stream shard
+ * @param checkpointer used to update Kinesis when this batch has been processed/stored
+ * in the DStream
+ */
+ override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
+ if (!receiver.isStopped()) {
+ try {
+ receiver.addRecords(shardId, batch)
+ logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
+ receiver.setCheckpointer(shardId, checkpointer)
+ } catch {
+ case NonFatal(e) => {
+ /*
+ * If there is a failure within the batch, the batch will not be checkpointed.
+ * This will potentially cause records since the last checkpoint to be processed
+ * more than once.
+ */
+ logError(s"Exception: WorkerId $workerId encountered and exception while storing " +
+ s" or checkpointing a batch for workerId $workerId and shardId $shardId.", e)
+
+ /* Rethrow the exception to the Kinesis Worker that is managing this RecordProcessor. */
+ throw e
+ }
+ }
+ } else {
+ /* RecordProcessor has been stopped. */
+ logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" +
+ s" and shardId $shardId. No more records will be processed.")
+ }
+ }
+
+ /**
+ * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
+ * 1) the stream is resharding by splitting or merging adjacent shards
+ * (ShutdownReason.TERMINATE)
+ * 2) the failed or latent Worker has stopped sending heartbeats for whatever reason
+ * (ShutdownReason.ZOMBIE)
+ *
+ * @param checkpointer used to perform a Kinesis checkpoint for ShutdownReason.TERMINATE
+ * @param reason for shutdown (ShutdownReason.TERMINATE or ShutdownReason.ZOMBIE)
+ */
+ override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason) {
+ logInfo(s"Shutdown: Shutting down workerId $workerId with reason $reason")
+ reason match {
+ /*
+ * TERMINATE Use Case. Checkpoint.
+ * Checkpoint to indicate that all records from the shard have been drained and processed.
+ * It's now OK to read from the new shards that resulted from a resharding event.
+ */
+ case ShutdownReason.TERMINATE =>
+ receiver.removeCheckpointer(shardId, checkpointer)
+
+ /*
+ * ZOMBIE Use Case or Unknown reason. NoOp.
+ * No checkpoint because other workers may have taken over and already started processing
+ * the same records.
+ * This may lead to records being processed more than once.
+ */
+ case _ =>
+ receiver.removeCheckpointer(shardId, null) // return null so that we don't checkpoint
+ }
+
+ }
+}
+
+private[kinesis] object KinesisRecordProcessor extends Logging {
+ /**
+ * Retry the given amount of times with a random backoff time (millis) less than the
+ * given maxBackOffMillis
+ *
+ * @param expression expression to evalute
+ * @param numRetriesLeft number of retries left
+ * @param maxBackOffMillis: max millis between retries
+ *
+ * @return evaluation of the given expression
+ * @throws Unretryable exception, unexpected exception,
+ * or any exception that persists after numRetriesLeft reaches 0
+ */
+ @annotation.tailrec
+ def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: Int): T = {
+ util.Try { expression } match {
+ /* If the function succeeded, evaluate to x. */
+ case util.Success(x) => x
+ /* If the function failed, either retry or throw the exception */
+ case util.Failure(e) => e match {
+ /* Retry: Throttling or other Retryable exception has occurred */
+ case _: ThrottlingException | _: KinesisClientLibDependencyException if numRetriesLeft > 1
+ => {
+ val backOffMillis = Random.nextInt(maxBackOffMillis)
+ Thread.sleep(backOffMillis)
+ logError(s"Retryable Exception: Random backOffMillis=${backOffMillis}", e)
+ retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
+ }
+ /* Throw: Shutdown has been requested by the Kinesis Client Library. */
+ case _: ShutdownException => {
+ logError(s"ShutdownException: Caught shutdown exception, skipping checkpoint.", e)
+ throw e
+ }
+ /* Throw: Non-retryable exception has occurred with the Kinesis Client Library */
+ case _: InvalidStateException => {
+ logError(s"InvalidStateException: Cannot save checkpoint to the DynamoDB table used" +
+ s" by the Amazon Kinesis Client Library. Table likely doesn't exist.", e)
+ throw e
+ }
+ /* Throw: Unexpected exception has occurred */
+ case _ => {
+ logError(s"Unexpected, non-retryable exception.", e)
+ throw e
+ }
+ }
+ }
+ }
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000000..0ace453ee9
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer data.
+ *
+ * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS FILE!
+ */
+private[kinesis] class KinesisTestUtils extends Logging {
+
+ val endpointUrl = KinesisTestUtils.endpointUrl
+ val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+ val streamShardCount = 2
+
+ private val createStreamTimeoutSeconds = 300
+ private val describeStreamPollTimeSeconds = 1
+
+ @volatile
+ private var streamCreated = false
+
+ @volatile
+ private var _streamName: String = _
+
+ protected lazy val kinesisClient = {
+ val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+ client.setEndpoint(endpointUrl)
+ client
+ }
+
+ private lazy val dynamoDB = {
+ val dynamoDBClient = new AmazonDynamoDBClient(new DefaultAWSCredentialsProviderChain())
+ dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+ new DynamoDB(dynamoDBClient)
+ }
+
+ protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+ if (!aggregate) {
+ new SimpleDataGenerator(kinesisClient)
+ } else {
+ throw new UnsupportedOperationException("Aggregation is not supported through this code path")
+ }
+ }
+
+ def streamName: String = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ _streamName
+ }
+
+ def createStream(): Unit = {
+ require(!streamCreated, "Stream already created")
+ _streamName = findNonExistentStreamName()
+
+ // Create a stream. The number of shards determines the provisioned throughput.
+ logInfo(s"Creating stream ${_streamName}")
+ val createStreamRequest = new CreateStreamRequest()
+ createStreamRequest.setStreamName(_streamName)
+ createStreamRequest.setShardCount(2)
+ kinesisClient.createStream(createStreamRequest)
+
+ // The stream is now being created. Wait for it to become active.
+ waitForStreamToBeActive(_streamName)
+ streamCreated = true
+ logInfo(s"Created stream ${_streamName}")
+ }
+
+ /**
+ * Push data to Kinesis stream and return a map of
+ * shardId -> seq of (data, seq number) pushed to corresponding shard
+ */
+ def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, String)]] = {
+ require(streamCreated, "Stream not yet created, call createStream() to create one")
+ val producer = getProducer(aggregate)
+ val shardIdToSeqNumbers = producer.sendData(streamName, testData)
+ logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+ shardIdToSeqNumbers.toMap
+ }
+
+ /**
+ * Expose a Python friendly API.
+ */
+ def pushData(testData: java.util.List[Int]): Unit = {
+ pushData(testData.asScala, aggregate = false)
+ }
+
+ def deleteStream(): Unit = {
+ try {
+ if (streamCreated) {
+ kinesisClient.deleteStream(streamName)
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete stream $streamName")
+ }
+ }
+
+ def deleteDynamoDBTable(tableName: String): Unit = {
+ try {
+ val table = dynamoDB.getTable(tableName)
+ table.delete()
+ table.waitForDelete()
+ } catch {
+ case e: Exception =>
+ logWarning(s"Could not delete DynamoDB table $tableName")
+ }
+ }
+
+ private def describeStream(streamNameToDescribe: String): Option[StreamDescription] = {
+ try {
+ val describeStreamRequest = new DescribeStreamRequest().withStreamName(streamNameToDescribe)
+ val desc = kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+ Some(desc)
+ } catch {
+ case rnfe: ResourceNotFoundException =>
+ None
+ }
+ }
+
+ private def findNonExistentStreamName(): String = {
+ var testStreamName: String = null
+ do {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+ } while (describeStream(testStreamName).nonEmpty)
+ testStreamName
+ }
+
+ private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+ val startTime = System.currentTimeMillis()
+ val endTime = startTime + TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+ while (System.currentTimeMillis() < endTime) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+ describeStream(streamNameToWaitFor).foreach { description =>
+ val streamStatus = description.getStreamStatus()
+ logDebug(s"\t- current state: $streamStatus\n")
+ if ("ACTIVE".equals(streamStatus)) {
+ return
+ }
+ }
+ }
+ require(false, s"Stream $streamName never became active")
+ }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+ val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+ val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+ val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
+
+ lazy val shouldRunTests = {
+ val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+ if (isEnvSet) {
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(
+ s"""
+ |Kinesis tests that actually send data has been enabled by setting the environment
+ |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
+ |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
+ |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
+ |To change this endpoint URL to a different region, you can set the environment variable
+ |$endVarNameForEndpoint to the desired endpoint URL
+ |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
+ """.stripMargin)
+ // scalastyle:on println
+ }
+ isEnvSet
+ }
+
+ lazy val endpointUrl = {
+ val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+ // scalastyle:off println
+ // Print this so that they are easily visible on the console and not hidden in the log4j logs.
+ println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+ // scalastyle:on println
+ url
+ }
+
+ def isAWSCredentialsPresent: Boolean = {
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+ }
+
+ def getAWSCredentials(): AWSCredentials = {
+ assert(shouldRunTests,
+ "Kinesis test not enabled, should not attempt to get AWS credentials")
+ Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+ case Success(cred) => cred
+ case Failure(e) =>
+ throw new Exception(
+ s"""
+ |Kinesis tests enabled using environment variable $envVarNameForEnablingTests
+ |but could not find AWS credentials. Please follow instructions in AWS documentation
+ |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
+ |can find the credentials.
+ """.stripMargin)
+ }
+ }
+}
+
+/** A wrapper interface that will allow us to consolidate the code for synthetic data generation. */
+private[kinesis] trait KinesisDataGenerator {
+ /** Sends the data to Kinesis and returns the metadata for everything that has been sent. */
+ def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]]
+}
+
+private[kinesis] class SimpleDataGenerator(
+ client: AmazonKinesisClient) extends KinesisDataGenerator {
+ override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+ data.foreach { num =>
+ val str = num.toString
+ val data = ByteBuffer.wrap(str.getBytes())
+ val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+ .withData(data)
+ .withPartitionKey(str)
+
+ val putRecordResult = client.putRecord(putRecordRequest)
+ val shardId = putRecordResult.getShardId
+ val seqNumber = putRecordResult.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+
+ shardIdToSeqNumbers.toMap
+ }
+}
diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
new file mode 100644
index 0000000000..15ac588b82
--- /dev/null
+++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.reflect.ClassTag
+
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Duration, StreamingContext}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object KinesisUtils {
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param messageHandler A custom message handler that can generate a generic output from a
+ * Kinesis `Record`, which contains both message data, and metadata.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: Record => T): ReceiverInputDStream[T] = {
+ val cleanedHandler = ssc.sc.clean(messageHandler)
+ // Setting scope to override receiver stream's scope of "receiver stream"
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+ cleanedHandler, None)
+ }
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param messageHandler A custom message handler that can generate a generic output from a
+ * Kinesis `Record`, which contains both message data, and metadata.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
+ */
+ // scalastyle:off
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: Record => T,
+ awsAccessKeyId: String,
+ awsSecretKey: String): ReceiverInputDStream[T] = {
+ // scalastyle:on
+ val cleanedHandler = ssc.sc.clean(messageHandler)
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+ cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
+ }
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ def createStream(
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+ // Setting scope to override receiver stream's scope of "receiver stream"
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+ defaultMessageHandler, None)
+ }
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param ssc StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
+ */
+ def createStream(
+ ssc: StreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = {
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
+ initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
+ defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
+ }
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ *
+ * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets AWS credentials.
+ * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
+ * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name
+ * in [[org.apache.spark.SparkConf]].
+ *
+ * @param ssc StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Endpoint url of Kinesis service
+ * (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ @deprecated("use other forms of createStream", "1.4.0")
+ def createStream(
+ ssc: StreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ checkpointInterval: Duration,
+ initialPositionInStream: InitialPositionInStream,
+ storageLevel: StorageLevel
+ ): ReceiverInputDStream[Array[Byte]] = {
+ ssc.withNamedScope("kinesis stream") {
+ new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
+ getRegionByEndpoint(endpointUrl), initialPositionInStream, ssc.sc.appName,
+ checkpointInterval, storageLevel, defaultMessageHandler, None)
+ }
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
+ * @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param messageHandler A custom message handler that can generate a generic output from a
+ * Kinesis `Record`, which contains both message data, and metadata.
+ * @param recordClass Class of the records in DStream
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: JFunction[Record, T],
+ recordClass: Class[T]): JavaReceiverInputDStream[T] = {
+ implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
+ createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param messageHandler A custom message handler that can generate a generic output from a
+ * Kinesis `Record`, which contains both message data, and metadata.
+ * @param recordClass Class of the records in DStream
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
+ */
+ // scalastyle:off
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ messageHandler: JFunction[Record, T],
+ recordClass: Class[T],
+ awsAccessKeyId: String,
+ awsSecretKey: String): JavaReceiverInputDStream[T] = {
+ // scalastyle:on
+ implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
+ createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel, cleanedHandler,
+ awsAccessKeyId, awsSecretKey)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note: The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets the AWS credentials.
+ *
+ * @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel, defaultMessageHandler(_))
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * The given AWS credentials will get saved in DStream checkpoints if checkpointing
+ * is enabled. Make sure that your checkpoint directory is secure.
+ *
+ * @param jssc Java StreamingContext object
+ * @param kinesisAppName Kinesis application name used by the Kinesis Client Library
+ * (KCL) to update DynamoDB
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName Name of region used by the Kinesis Client Library (KCL) to update
+ * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects.
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ * @param awsAccessKeyId AWS AccessKeyId (if null, will use DefaultAWSCredentialsProviderChain)
+ * @param awsSecretKey AWS SecretKey (if null, will use DefaultAWSCredentialsProviderChain)
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: InitialPositionInStream,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+ initialPositionInStream, checkpointInterval, storageLevel,
+ defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kinesis stream.
+ * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+ *
+ * Note:
+ * - The AWS credentials will be discovered using the DefaultAWSCredentialsProviderChain
+ * on the workers. See AWS documentation to understand how DefaultAWSCredentialsProviderChain
+ * gets AWS credentials.
+ * - The region of the `endpointUrl` will be used for DynamoDB and CloudWatch.
+ * - The Kinesis application name used by the Kinesis Client Library (KCL) will be the app name in
+ * [[org.apache.spark.SparkConf]].
+ *
+ * @param jssc Java StreamingContext object
+ * @param streamName Kinesis stream name
+ * @param endpointUrl Endpoint url of Kinesis service
+ * (e.g., https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
+ * See the Kinesis Spark Streaming documentation for more
+ * details on the different types of checkpoints.
+ * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
+ * worker's initial starting position in the stream.
+ * The values are either the beginning of the stream
+ * per Kinesis' limit of 24 hours
+ * (InitialPositionInStream.TRIM_HORIZON) or
+ * the tip of the stream (InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ * StorageLevel.MEMORY_AND_DISK_2 is recommended.
+ */
+ @deprecated("use other forms of createStream", "1.4.0")
+ def createStream(
+ jssc: JavaStreamingContext,
+ streamName: String,
+ endpointUrl: String,
+ checkpointInterval: Duration,
+ initialPositionInStream: InitialPositionInStream,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ createStream(
+ jssc.ssc, streamName, endpointUrl, checkpointInterval, initialPositionInStream, storageLevel)
+ }
+
+ private def getRegionByEndpoint(endpointUrl: String): String = {
+ RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+ }
+
+ private def validateRegion(regionName: String): String = {
+ Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
+ throw new IllegalArgumentException(s"Region name '$regionName' is not valid")
+ }
+ }
+
+ private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
+ if (record == null) return null
+ val byteBuffer = record.getData()
+ val byteArray = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(byteArray)
+ byteArray
+ }
+}
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+ def getInitialPositionInStream(initialPositionInStream: Int): InitialPositionInStream = {
+ initialPositionInStream match {
+ case 0 => InitialPositionInStream.LATEST
+ case 1 => InitialPositionInStream.TRIM_HORIZON
+ case _ => throw new IllegalArgumentException(
+ "Illegal InitialPositionInStream. Please use " +
+ "InitialPositionInStream.LATEST or InitialPositionInStream.TRIM_HORIZON")
+ }
+ }
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ kinesisAppName: String,
+ streamName: String,
+ endpointUrl: String,
+ regionName: String,
+ initialPositionInStream: Int,
+ checkpointInterval: Duration,
+ storageLevel: StorageLevel,
+ awsAccessKeyId: String,
+ awsSecretKey: String
+ ): JavaReceiverInputDStream[Array[Byte]] = {
+ if (awsAccessKeyId == null && awsSecretKey != null) {
+ throw new IllegalArgumentException("awsSecretKey is set but awsAccessKeyId is null")
+ }
+ if (awsAccessKeyId != null && awsSecretKey == null) {
+ throw new IllegalArgumentException("awsAccessKeyId is set but awsSecretKey is null")
+ }
+ if (awsAccessKeyId == null && awsSecretKey == null) {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel)
+ } else {
+ KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName,
+ getInitialPositionInStream(initialPositionInStream), checkpointInterval, storageLevel,
+ awsAccessKeyId, awsSecretKey)
+ }
+ }
+
+}
diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000000..5c2371c543
--- /dev/null
+++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testKinesisStream() {
+ // Tests the API, does not actually test data receiving
+ JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", new Duration(2000),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+
+ ssc.stop();
+ }
+
+
+ private static Function<Record, String> handler = new Function<Record, String>() {
+ @Override
+ public String call(Record record) {
+ return record.getPartitionKey() + "-" + record.getSequenceNumber();
+ }
+ };
+
+ @Test
+ public void testCustomHandler() {
+ // Tests the API, does not actually test data receiving
+ JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream",
+ "https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST,
+ new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class);
+
+ ssc.stop();
+ }
+}
diff --git a/external/kinesis-asl/src/test/resources/log4j.properties b/external/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..edbecdae92
--- /dev/null
+++ b/external/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
new file mode 100644
index 0000000000..fdb270eaad
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.amazonaws.services.kinesis.producer.{KinesisProducer => KPLProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
+
+private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
+ override protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+ if (!aggregate) {
+ new SimpleDataGenerator(kinesisClient)
+ } else {
+ new KPLDataGenerator(regionName)
+ }
+ }
+}
+
+/** A wrapper for the KinesisProducer provided in the KPL. */
+private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataGenerator {
+
+ private lazy val producer: KPLProducer = {
+ val conf = new KinesisProducerConfiguration()
+ .setRecordMaxBufferedTime(1000)
+ .setMaxConnections(1)
+ .setRegion(regionName)
+ .setMetricsLevel("none")
+
+ new KPLProducer(conf)
+ }
+
+ override def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, String)]] = {
+ val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]()
+ data.foreach { num =>
+ val str = num.toString
+ val data = ByteBuffer.wrap(str.getBytes())
+ val future = producer.addUserRecord(streamName, str, data)
+ val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+ override def onFailure(t: Throwable): Unit = {} // do nothing
+
+ override def onSuccess(result: UserRecordResult): Unit = {
+ val shardId = result.getShardId
+ val seqNumber = result.getSequenceNumber()
+ val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+ new ArrayBuffer[(Int, String)]())
+ sentSeqNumbers += ((num, seqNumber))
+ }
+ }
+ Futures.addCallback(future, kinesisCallBack)
+ }
+ producer.flushSync()
+ shardIdToSeqNumbers.toMap
+ }
+}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
new file mode 100644
index 0000000000..2555332d22
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
+
+abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
+ extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {
+
+ private val testData = 1 to 8
+
+ private var testUtils: KinesisTestUtils = null
+ private var shardIds: Seq[String] = null
+ private var shardIdToData: Map[String, Seq[Int]] = null
+ private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
+ private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = null
+ private var shardIdToRange: Map[String, SequenceNumberRange] = null
+ private var allRanges: Seq[SequenceNumberRange] = null
+
+ private var blockManager: BlockManager = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ runIfTestsEnabled("Prepare KinesisTestUtils") {
+ testUtils = new KPLBasedKinesisTestUtils()
+ testUtils.createStream()
+
+ shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = aggregateTestData)
+ require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
+
+ shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
+ shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
+ shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
+ shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
+ val seqNumRange = SequenceNumberRange(
+ testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
+ (shardId, seqNumRange)
+ }
+ allRanges = shardIdToRange.values.toSeq
+ }
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ val conf = new SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
+ sc = new SparkContext(conf)
+ blockManager = sc.env.blockManager
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ if (testUtils != null) {
+ testUtils.deleteStream()
+ }
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ testIfEnabled("Basic reading from Kinesis") {
+ // Verify all data using multiple ranges in a single RDD partition
+ val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
+ testUtils.endpointUrl, fakeBlockIds(1),
+ Array(SequenceNumberRanges(allRanges.toArray))
+ ).map { bytes => new String(bytes).toInt }.collect()
+ assert(receivedData1.toSet === testData.toSet)
+
+ // Verify all data using one range in each of the multiple RDD partitions
+ val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
+ testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+ allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+ ).map { bytes => new String(bytes).toInt }.collect()
+ assert(receivedData2.toSet === testData.toSet)
+
+ // Verify ordering within each partition
+ val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, testUtils.regionName,
+ testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+ allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+ ).map { bytes => new String(bytes).toInt }.collectPartitions()
+ assert(receivedData3.length === allRanges.size)
+ for (i <- 0 until allRanges.size) {
+ assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
+ }
+ }
+
+ testIfEnabled("Read data available in both block manager and Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2)
+ }
+
+ testIfEnabled("Read data available only in block manager, not in Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0)
+ }
+
+ testIfEnabled("Read data available only in Kinesis, not in block manager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 2)
+ }
+
+ testIfEnabled("Read data available partially in block manager, rest in Kinesis") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 1)
+ }
+
+ testIfEnabled("Test isBlockValid skips block fetching from block manager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 0,
+ testIsBlockValid = true)
+ }
+
+ testIfEnabled("Test whether RDD is valid after removing blocks from block anager") {
+ testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 2,
+ testBlockRemove = true)
+ }
+
+ /**
+ * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
+ * and the rest to a write ahead log, and then reading reading it all back using the RDD.
+ * It can also test if the partitions that were read from the log were again stored in
+ * block manager.
+ *
+ *
+ *
+ * @param numPartitions Number of partitions in RDD
+ * @param numPartitionsInBM Number of partitions to write to the BlockManager.
+ * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
+ * @param numPartitionsInKinesis Number of partitions to write to the Kinesis.
+ * Partitions (numPartitions - 1 - numPartitionsInKinesis) to
+ * (numPartitions - 1) will be written to Kinesis
+ * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
+ * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
+ * reads falling back to the WAL
+ * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
+ *
+ * numPartitionsInBM = 3
+ * |------------------|
+ * | |
+ * 0 1 2 3 4
+ * | |
+ * |-------------------------|
+ * numPartitionsInKinesis = 4
+ */
+ private def testRDD(
+ numPartitions: Int,
+ numPartitionsInBM: Int,
+ numPartitionsInKinesis: Int,
+ testIsBlockValid: Boolean = false,
+ testBlockRemove: Boolean = false
+ ): Unit = {
+ require(shardIds.size > 1, "Need at least 2 shards to test")
+ require(numPartitionsInBM <= shardIds.size,
+ "Number of partitions in BlockManager cannot be more than the Kinesis test shards available")
+ require(numPartitionsInKinesis <= shardIds.size,
+ "Number of partitions in Kinesis cannot be more than the Kinesis test shards available")
+ require(numPartitionsInBM <= numPartitions,
+ "Number of partitions in BlockManager cannot be more than that in RDD")
+ require(numPartitionsInKinesis <= numPartitions,
+ "Number of partitions in Kinesis cannot be more than that in RDD")
+
+ // Put necessary blocks in the block manager
+ val blockIds = fakeBlockIds(numPartitions)
+ blockIds.foreach(blockManager.removeBlock(_))
+ (0 until numPartitionsInBM).foreach { i =>
+ val blockData = shardIdToData(shardIds(i)).iterator.map { _.toString.getBytes() }
+ blockManager.putIterator(blockIds(i), blockData, StorageLevel.MEMORY_ONLY)
+ }
+
+ // Create the necessary ranges to use in the RDD
+ val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
+ SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")))
+ val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
+ val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis)))
+ SequenceNumberRanges(Array(range))
+ }
+ val ranges = (fakeRanges ++ realRanges)
+
+
+ // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
+ require(
+ blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
+ "Expected blocks not in BlockManager"
+ )
+
+ require(
+ blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
+ "Unexpected blocks in BlockManager"
+ )
+
+ // Make sure that the right sequence `numPartitionsInKinesis` are configured, and others are not
+ require(
+ ranges.takeRight(numPartitionsInKinesis).forall {
+ _.ranges.forall { _.streamName == testUtils.streamName }
+ }, "Incorrect configuration of RDD, expected ranges not set: "
+ )
+
+ require(
+ ranges.dropRight(numPartitionsInKinesis).forall {
+ _.ranges.forall { _.streamName != testUtils.streamName }
+ }, "Incorrect configuration of RDD, unexpected ranges set"
+ )
+
+ val rdd = new KinesisBackedBlockRDD[Array[Byte]](
+ sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
+ val collectedData = rdd.map { bytes =>
+ new String(bytes).toInt
+ }.collect()
+ assert(collectedData.toSet === testData.toSet)
+
+ // Verify that the block fetching is skipped when isBlockValid is set to false.
+ // This is done by using a RDD whose data is only in memory but is set to skip block fetching
+ // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
+ // in BlockManager.
+ if (testIsBlockValid) {
+ require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
+ require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
+ val rdd2 = new KinesisBackedBlockRDD[Array[Byte]](
+ sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
+ isBlockIdValid = Array.fill(blockIds.length)(false))
+ intercept[SparkException] {
+ rdd2.collect()
+ }
+ }
+
+ // Verify that the RDD is not invalid after the blocks are removed and can still read data
+ // from write ahead log
+ if (testBlockRemove) {
+ require(numPartitions === numPartitionsInKinesis,
+ "All partitions must be in WAL for this test")
+ require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
+ rdd.removeBlocks()
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === testData.toSet)
+ }
+ }
+
+ /** Generate fake block ids */
+ private def fakeBlockIds(num: Int): Array[BlockId] = {
+ Array.tabulate(num) { i => new StreamBlockId(0, i) }
+ }
+}
+
+class WithAggregationKinesisBackedBlockRDDSuite
+ extends KinesisBackedBlockRDDTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisBackedBlockRDDSuite
+ extends KinesisBackedBlockRDDTests(aggregateTestData = false)
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
new file mode 100644
index 0000000000..e1499a8220
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent.{ExecutorService, TimeoutException}
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.ManualClock
+
+class KinesisCheckpointerSuite extends TestSuiteBase
+ with MockitoSugar
+ with BeforeAndAfterEach
+ with PrivateMethodTester
+ with Eventually {
+
+ private val workerId = "dummyWorkerId"
+ private val shardId = "dummyShardId"
+ private val seqNum = "123"
+ private val otherSeqNum = "245"
+ private val checkpointInterval = Duration(10)
+ private val someSeqNum = Some(seqNum)
+ private val someOtherSeqNum = Some(otherSeqNum)
+
+ private var receiverMock: KinesisReceiver[Array[Byte]] = _
+ private var checkpointerMock: IRecordProcessorCheckpointer = _
+ private var kinesisCheckpointer: KinesisCheckpointer = _
+ private var clock: ManualClock = _
+
+ private val checkpoint = PrivateMethod[Unit]('checkpoint)
+
+ override def beforeEach(): Unit = {
+ receiverMock = mock[KinesisReceiver[Array[Byte]]]
+ checkpointerMock = mock[IRecordProcessorCheckpointer]
+ clock = new ManualClock()
+ kinesisCheckpointer = new KinesisCheckpointer(receiverMock, checkpointInterval, workerId, clock)
+ }
+
+ test("checkpoint is not called twice for the same sequence number") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+ kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+ kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+
+ verify(checkpointerMock, times(1)).checkpoint(anyString())
+ }
+
+ test("checkpoint is called after sequence number increases") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+ .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+ kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+ kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+
+ verify(checkpointerMock, times(1)).checkpoint(seqNum)
+ verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+ }
+
+ test("should checkpoint if we have exceeded the checkpoint interval") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+ .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+
+ kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+ clock.advance(5 * checkpointInterval.milliseconds)
+
+ eventually(timeout(1 second)) {
+ verify(checkpointerMock, times(1)).checkpoint(seqNum)
+ verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+ }
+ }
+
+ test("shouldn't checkpoint if we have not exceeded the checkpoint interval") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+ clock.advance(checkpointInterval.milliseconds / 2)
+
+ verify(checkpointerMock, never()).checkpoint(anyString())
+ }
+
+ test("should not checkpoint for the same sequence number") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+
+ clock.advance(checkpointInterval.milliseconds * 5)
+ eventually(timeout(1 second)) {
+ verify(checkpointerMock, atMost(1)).checkpoint(anyString())
+ }
+ }
+
+ test("removing checkpointer checkpoints one last time") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock)
+ verify(checkpointerMock, times(1)).checkpoint(anyString())
+ }
+
+ test("if checkpointing is going on, wait until finished before removing and checkpointing") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+ .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+ when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
+ override def answer(invocations: InvocationOnMock): Unit = {
+ clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
+ }
+ })
+
+ kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+ clock.advance(checkpointInterval.milliseconds)
+ eventually(timeout(1 second)) {
+ verify(checkpointerMock, times(1)).checkpoint(anyString())
+ }
+ // don't block test thread
+ val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock))(
+ ExecutionContext.global)
+
+ intercept[TimeoutException] {
+ Await.ready(f, 50 millis)
+ }
+
+ clock.advance(checkpointInterval.milliseconds / 2)
+ eventually(timeout(1 second)) {
+ verify(checkpointerMock, times(2)).checkpoint(anyString())
+ }
+ }
+}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
new file mode 100644
index 0000000000..ee428f31d6
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Kinesis real data transfer tests or
+ * ignores them based on env variable is set or not.
+ */
+trait KinesisFunSuite extends SparkFunSuite {
+ import KinesisTestUtils._
+
+ /** Run the test if environment variable is set or ignore the test */
+ def testIfEnabled(testName: String)(testBody: => Unit) {
+ if (shouldRunTests) {
+ test(testName)(testBody)
+ } else {
+ ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
+ }
+ }
+
+ /** Run the give body of code only if Kinesis tests are enabled */
+ def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
+ if (shouldRunTests) {
+ body
+ } else {
+ ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
+ }
+ }
+}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
new file mode 100644
index 0000000000..fd15b6ccdc
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.Arrays
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions._
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.Utils
+
+/**
+ * Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
+ */
+class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAfter
+ with MockitoSugar {
+
+ val app = "TestKinesisReceiver"
+ val stream = "mySparkStream"
+ val endpoint = "endpoint-url"
+ val workerId = "dummyWorkerId"
+ val shardId = "dummyShardId"
+ val seqNum = "dummySeqNum"
+ val checkpointInterval = Duration(10)
+ val someSeqNum = Some(seqNum)
+
+ val record1 = new Record()
+ record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8)))
+ val record2 = new Record()
+ record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8)))
+ val batch = Arrays.asList(record1, record2)
+
+ var receiverMock: KinesisReceiver[Array[Byte]] = _
+ var checkpointerMock: IRecordProcessorCheckpointer = _
+
+ override def beforeFunction(): Unit = {
+ receiverMock = mock[KinesisReceiver[Array[Byte]]]
+ checkpointerMock = mock[IRecordProcessorCheckpointer]
+ }
+
+ test("check serializability of SerializableAWSCredentials") {
+ Utils.deserialize[SerializableAWSCredentials](
+ Utils.serialize(new SerializableAWSCredentials("x", "y")))
+ }
+
+ test("process records including store and set checkpointer") {
+ when(receiverMock.isStopped()).thenReturn(false)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).addRecords(shardId, batch)
+ verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
+ }
+
+ test("shouldn't store and update checkpointer when receiver is stopped") {
+ when(receiverMock.isStopped()).thenReturn(true)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.processRecords(batch, checkpointerMock)
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
+ verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
+ }
+
+ test("shouldn't update checkpointer when exception occurs during store") {
+ when(receiverMock.isStopped()).thenReturn(false)
+ when(
+ receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+ ).thenThrow(new RuntimeException())
+
+ intercept[RuntimeException] {
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.processRecords(batch, checkpointerMock)
+ }
+
+ verify(receiverMock, times(1)).isStopped()
+ verify(receiverMock, times(1)).addRecords(shardId, batch)
+ verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
+ }
+
+ test("shutdown should checkpoint if the reason is TERMINATE") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
+
+ verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), meq(checkpointerMock))
+ }
+
+
+ test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
+ when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+ val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+ recordProcessor.initialize(shardId)
+ recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+ recordProcessor.shutdown(checkpointerMock, null)
+
+ verify(receiverMock, times(2)).removeCheckpointer(meq(shardId),
+ meq[IRecordProcessorCheckpointer](null))
+ }
+
+ test("retry success on first attempt") {
+ val expectedIsStopped = false
+ when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(1)).isStopped()
+ }
+
+ test("retry success on second attempt after a Kinesis throttling exception") {
+ val expectedIsStopped = false
+ when(receiverMock.isStopped())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
+ }
+
+ test("retry success on second attempt after a Kinesis dependency exception") {
+ val expectedIsStopped = false
+ when(receiverMock.isStopped())
+ .thenThrow(new KinesisClientLibDependencyException("error message"))
+ .thenReturn(expectedIsStopped)
+
+ val actualVal = KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+ assert(actualVal == expectedIsStopped)
+
+ verify(receiverMock, times(2)).isStopped()
+ }
+
+ test("retry failed after a shutdown exception") {
+ when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error message"))
+
+ intercept[ShutdownException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+
+ verify(checkpointerMock, times(1)).checkpoint()
+ }
+
+ test("retry failed after an invalid state exception") {
+ when(checkpointerMock.checkpoint()).thenThrow(new InvalidStateException("error message"))
+
+ intercept[InvalidStateException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+
+ verify(checkpointerMock, times(1)).checkpoint()
+ }
+
+ test("retry failed after unexpected exception") {
+ when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error message"))
+
+ intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+
+ verify(checkpointerMock, times(1)).checkpoint()
+ }
+
+ test("retry failed after exhausing all retries") {
+ val expectedErrorMessage = "final try error message"
+ when(checkpointerMock.checkpoint())
+ .thenThrow(new ThrottlingException("error message"))
+ .thenThrow(new ThrottlingException(expectedErrorMessage))
+
+ val exception = intercept[RuntimeException] {
+ KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+ }
+ exception.getMessage().shouldBe(expectedErrorMessage)
+
+ verify(checkpointerMock, times(2)).checkpoint()
+ }
+}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
new file mode 100644
index 0000000000..ca5d13da46
--- /dev/null
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.Matchers._
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.kinesis.KinesisTestUtils._
+import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.util.Utils
+
+abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite
+ with Eventually with BeforeAndAfter with BeforeAndAfterAll {
+
+ // This is the name that KCL will use to save metadata to DynamoDB
+ private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+ private val batchDuration = Seconds(1)
+
+ // Dummy parameters for API testing
+ private val dummyEndpointUrl = defaultEndpointUrl
+ private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
+ private val dummyAWSAccessKey = "dummyAccessKey"
+ private val dummyAWSSecretKey = "dummySecretKey"
+
+ private var testUtils: KinesisTestUtils = null
+ private var ssc: StreamingContext = null
+ private var sc: SparkContext = null
+
+ override def beforeAll(): Unit = {
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
+ sc = new SparkContext(conf)
+
+ runIfTestsEnabled("Prepare KinesisTestUtils") {
+ testUtils = new KPLBasedKinesisTestUtils()
+ testUtils.createStream()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ if (sc != null) {
+ sc.stop()
+ }
+ if (testUtils != null) {
+ // Delete the Kinesis stream as well as the DynamoDB table generated by
+ // Kinesis Client Library when consuming the stream
+ testUtils.deleteStream()
+ testUtils.deleteDynamoDBTable(appName)
+ }
+ }
+
+ before {
+ ssc = new StreamingContext(sc, batchDuration)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop(stopSparkContext = false)
+ ssc = null
+ }
+ if (testUtils != null) {
+ testUtils.deleteDynamoDBTable(appName)
+ }
+ }
+
+ test("KinesisUtils API") {
+ // Tests the API, does not actually test data receiving
+ val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
+ dummyEndpointUrl, Seconds(2),
+ InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ dummyEndpointUrl, dummyRegionName,
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
+ val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
+ dummyEndpointUrl, dummyRegionName,
+ InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
+ dummyAWSAccessKey, dummyAWSSecretKey)
+ }
+
+ test("RDD generation") {
+ val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
+ dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
+ StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
+ assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
+
+ val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
+ val time = Time(1000)
+
+ // Generate block info data for testing
+ val seqNumRanges1 = SequenceNumberRanges(
+ SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+ val blockId1 = StreamBlockId(kinesisStream.id, 123)
+ val blockInfo1 = ReceivedBlockInfo(
+ 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
+
+ val seqNumRanges2 = SequenceNumberRanges(
+ SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+ val blockId2 = StreamBlockId(kinesisStream.id, 345)
+ val blockInfo2 = ReceivedBlockInfo(
+ 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
+
+ // Verify that the generated KinesisBackedBlockRDD has the all the right information
+ val blockInfos = Seq(blockInfo1, blockInfo2)
+ val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
+ nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
+ val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
+ assert(kinesisRDD.regionName === dummyRegionName)
+ assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
+ assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
+ assert(kinesisRDD.awsCredentialsOption ===
+ Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
+ assert(nonEmptyRDD.partitions.size === blockInfos.size)
+ nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
+ val partitions = nonEmptyRDD.partitions.map {
+ _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
+ assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
+ assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
+ assert(partitions.forall { _.isBlockIdValid === true })
+
+ // Verify that KinesisBackedBlockRDD is generated even when there are no blocks
+ val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
+ emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+ emptyRDD.partitions shouldBe empty
+
+ // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
+ blockInfos.foreach { _.setBlockIdInvalid() }
+ kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
+ assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
+ }
+ }
+
+
+ /**
+ * Test the stream by sending data to a Kinesis stream and receiving from it.
+ * This test is not run by default as it requires AWS credentials that the test
+ * environment may not have. Even if there is AWS credentials available, the user
+ * may not want to run these tests to avoid the Kinesis costs. To enable this test,
+ * you must have AWS credentials available through the default AWS provider chain,
+ * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
+ */
+ testIfEnabled("basic operation") {
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
+ val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+ testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ testUtils.pushData(testData, aggregateTestData)
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop(stopSparkContext = false)
+ }
+
+ testIfEnabled("custom message handling") {
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
+ def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
+ val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+ testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+ stream shouldBe a [ReceiverInputDStream[_]]
+
+ val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+ stream.foreachRDD { rdd =>
+ collected ++= rdd.collect()
+ logInfo("Collected = " + collected.mkString(", "))
+ }
+ ssc.start()
+
+ val testData = 1 to 10
+ eventually(timeout(120 seconds), interval(10 second)) {
+ testUtils.pushData(testData, aggregateTestData)
+ val modData = testData.map(_ + 5)
+ assert(collected === modData.toSet, "\nData received does not match data sent")
+ }
+ ssc.stop(stopSparkContext = false)
+ }
+
+ testIfEnabled("failure recovery") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ val checkpointDir = Utils.createTempDir().getAbsolutePath
+
+ ssc = new StreamingContext(sc, Milliseconds(1000))
+ ssc.checkpoint(checkpointDir)
+
+ val awsCredentials = KinesisTestUtils.getAWSCredentials()
+ val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
+
+ val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+ testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
+ Seconds(10), StorageLevel.MEMORY_ONLY,
+ awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+ // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
+ kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
+ collectedData.synchronized {
+ collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+ }
+ })
+
+ ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
+ ssc.start()
+
+ def numBatchesWithData: Int =
+ collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
+
+ def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
+
+ // Run until there are at least 10 batches with some data in them
+ // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
+ // function failed with exceptions, and nothing got added to `collectedData`
+ eventually(timeout(2 minutes), interval(1 seconds)) {
+ testUtils.pushData(1 to 5, aggregateTestData)
+ assert(isCheckpointPresent && numBatchesWithData > 10)
+ }
+ ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
+
+ // Restart the context from checkpoint and verify whether the
+ logInfo("Restarting from checkpoint")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ val recoveredKinesisStream = ssc.graph.getInputStreams().head
+
+ // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
+ // and return the same data
+ collectedData.synchronized {
+ val times = collectedData.keySet
+ times.foreach { time =>
+ val (arrayOfSeqNumRanges, data) = collectedData(time)
+ val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+ rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+ // Verify the recovered sequence ranges
+ val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+ assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+ arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
+ assert(expected.ranges.toSeq === found.ranges.toSeq)
+ }
+
+ // Verify the recovered data
+ assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
+ }
+ }
+ ssc.stop()
+ }
+}
+
+class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false)
diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml
new file mode 100644
index 0000000000..bfb92791de
--- /dev/null
+++ b/external/spark-ganglia-lgpl/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <!-- Ganglia integration is not included by default due to LGPL-licensed code -->
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-ganglia-lgpl_2.11</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Ganglia Integration</name>
+
+ <properties>
+ <sbt.project.name>ganglia-lgpl</sbt.project.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-ganglia</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
new file mode 100644
index 0000000000..3b1880e143
--- /dev/null
+++ b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.ganglia.GangliaReporter
+import info.ganglia.gmetric4j.gmetric.GMetric
+import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.metrics.MetricsSystem
+
+class GangliaSink(val property: Properties, val registry: MetricRegistry,
+ securityMgr: SecurityManager) extends Sink {
+ val GANGLIA_KEY_PERIOD = "period"
+ val GANGLIA_DEFAULT_PERIOD = 10
+
+ val GANGLIA_KEY_UNIT = "unit"
+ val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
+
+ val GANGLIA_KEY_MODE = "mode"
+ val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
+
+ // TTL for multicast messages. If listeners are X hops away in network, must be at least X.
+ val GANGLIA_KEY_TTL = "ttl"
+ val GANGLIA_DEFAULT_TTL = 1
+
+ val GANGLIA_KEY_HOST = "host"
+ val GANGLIA_KEY_PORT = "port"
+
+ def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
+
+ if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
+ throw new Exception("Ganglia sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
+ throw new Exception("Ganglia sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(GANGLIA_KEY_HOST).get
+ val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+ val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+ val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
+ .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+ val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+ .getOrElse(GANGLIA_DEFAULT_PERIOD)
+ val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
+ .map(u => TimeUnit.valueOf(u.toUpperCase))
+ .getOrElse(GANGLIA_DEFAULT_UNIT)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val ganglia = new GMetric(host, port, mode, ttl)
+ val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(ganglia)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+
+ override def report() {
+ reporter.report()
+ }
+}
+