From 6ffa9bb226ac9ceec4a34f0011c35d2d9710f8f8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 29 Dec 2013 11:26:56 -0800 Subject: Documentation and adding supervise option --- docs/spark-standalone.md | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) (limited to 'docs') diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index b822265b5a..59adbce156 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -10,11 +10,7 @@ In addition to running on the Mesos or YARN cluster managers, Spark also provide # Installing Spark Standalone to a Cluster -The easiest way to deploy Spark is by running the `./make-distribution.sh` script to create a binary distribution. -This distribution can be deployed to any machine with the Java runtime installed; there is no need to install Scala. - -The recommended procedure is to deploy and start the master on one node first, get the master spark URL, -then modify `conf/spark-env.sh` in the `dist/` directory before deploying to all the other nodes. +To install Spark Standlone mode, you simply place a compiled version of Spark on each node on the cluster. You can obtain pre-built versions of Spark with each release or [build it yourself](index.html#building). # Starting a Cluster Manually @@ -150,6 +146,38 @@ automatically set MASTER from the `SPARK_MASTER_IP` and `SPARK_MASTER_PORT` vari You can also pass an option `-c ` to control the number of cores that spark-shell uses on the cluster. +# Launching Applications Inside the Cluster + +You may also run your application entirely inside of the cluster by submitting your application driver using the submission client. The syntax for submitting applications is as follows: + + + ./spark-class org.apache.spark.deploy.client.DriverClient launch + [client-options] \ + \ + [application-options] + + cluster-url: The URL of the master node. + application-jar-url: Path to a bundled jar including your application and all dependencies. + Accepts hdfs://, file://, and http:// paths. + main-class: The entry point for your application. + + Client Options: + --memory (amount of memory, in MB, allocated for your driver program) + --cores (number of cores allocated for your driver program) + --supervise (whether to automatically restart your driver on application or node failure) + +Keep in mind that your driver program will be executed on a remote worker machine. You can control the execution environment in the following ways: + + * _Environment variables_: These will be captured from the environment in which you launch the client and applied when launching the driver program. + * _Java options_: You can add java options by setting `SPARK_JAVA_OPTS` in the environment in which you launch the submission client. + * _Dependencies_: You'll still need to call `sc.addJar` inside of your driver program to add your application jar and any dependencies. If you submit a local application jar to the client (e.g one with a `file://` URL), it will be uploaded into the working directory of your driver program. Then, you can add it using `sc.addJar("jar-name.jar")`. + +Once you submit a driver program, it will appear in the cluster management UI at port 8080 and +be assigned an identifier. If you'd like to prematurely terminate the program, you can do so using +the same client: + + ./spark-class org.apache.spark.deploy.client.DriverClient kill + # Resource Scheduling The standalone cluster mode currently only supports a simple FIFO scheduler across applications. -- cgit v1.2.3 From b72cceba2727586c1e1f89c58b66417628e1afa7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 6 Jan 2014 22:05:53 -0800 Subject: Some doc fixes --- docs/spark-standalone.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'docs') diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index f426db0141..7da64749b7 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -157,8 +157,7 @@ You may also run your application entirely inside of the cluster by submitting y [application-options] cluster-url: The URL of the master node. - application-jar-url: Path to a bundled jar including your application and all dependencies. - Accepts hdfs://, file://, and http:// paths. + application-jar-url: Path to a bundled jar including your application and all dependencies. Currently, the URL must be visible from inside of your cluster, for instance, in an HDFS directory. main-class: The entry point for your application. Client Options: @@ -170,7 +169,7 @@ Keep in mind that your driver program will be executed on a remote worker machin * _Environment variables_: These will be captured from the environment in which you launch the client and applied when launching the driver program. * _Java options_: You can add java options by setting `SPARK_JAVA_OPTS` in the environment in which you launch the submission client. - * _Dependencies_: You'll still need to call `sc.addJar` inside of your driver program to add your application jar and any dependencies. If you submit a local application jar to the client (e.g one with a `file://` URL), it will be uploaded into the working directory of your driver program. Then, you can add it using `sc.addJar("jar-name.jar")`. + * _Dependencies_: You'll still need to call `sc.addJar` inside of your program to make your bundled application jar visible on all worker nodes. Once you submit a driver program, it will appear in the cluster management UI at port 8080 and be assigned an identifier. If you'd like to prematurely terminate the program, you can do so using -- cgit v1.2.3 From 82a1d38aea3b10930a2659b9c0e7ad2fb2c2ab4a Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 7 Jan 2014 17:46:02 -0800 Subject: Simplify and fix pyspark script. This patch removes compatibility for IPython < 1.0 but fixes the launch script and makes it much simpler. I tested this using the three commands in the PySpark documentation page: 1. IPYTHON=1 ./pyspark 2. IPYTHON_OPTS="notebook" ./pyspark 3. IPYTHON_OPTS="notebook --pylab inline" ./pyspark There are two changes: - We rely on PYTHONSTARTUP env var to start PySpark - Removed the quotes around $IPYTHON_OPTS... having quotes gloms them together as a single argument passed to `exec` which seemed to cause ipython to fail (it instead expects them as multiple arguments). --- bin/pyspark | 8 +------- docs/python-programming-guide.md | 5 +++-- 2 files changed, 4 insertions(+), 9 deletions(-) (limited to 'docs') diff --git a/bin/pyspark b/bin/pyspark index d6810f4686..a70da5674e 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -51,7 +51,6 @@ export PYSPARK_PYTHON export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH # Load the PySpark shell.py script when ./pyspark is used interactively: -export OLD_PYTHONSTARTUP=$PYTHONSTARTUP export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py if [ -n "$IPYTHON_OPTS" ]; then @@ -59,12 +58,7 @@ if [ -n "$IPYTHON_OPTS" ]; then fi if [[ "$IPYTHON" = "1" ]] ; then - # IPython <1.0.0 doesn't honor PYTHONSTARTUP, while 1.0.0+ does. - # Hence we clear PYTHONSTARTUP and use the -c "%run $IPYTHONSTARTUP" command which works on all versions - # We also force interactive mode with "-i" - IPYTHONSTARTUP=$PYTHONSTARTUP - PYTHONSTARTUP= - exec ipython "$IPYTHON_OPTS" -i -c "%run $IPYTHONSTARTUP" + exec ipython $IPYTHON_OPTS else exec "$PYSPARK_PYTHON" "$@" fi diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index dc187b3efe..c4236f8312 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -99,8 +99,9 @@ $ MASTER=local[4] ./bin/pyspark ## IPython -It is also possible to launch PySpark in [IPython](http://ipython.org), the enhanced Python interpreter. -To do this, set the `IPYTHON` variable to `1` when running `bin/pyspark`: +It is also possible to launch PySpark in [IPython](http://ipython.org), the +enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To +use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`: {% highlight bash %} $ IPYTHON=1 ./bin/pyspark -- cgit v1.2.3 From c78b381e91c9902a1510a2ed4ec5c898b51adfe8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 7 Jan 2014 23:56:04 -0800 Subject: Fixes --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- .../main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 +- docs/spark-standalone.md | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 0475bb17c0..43b9b1cff9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -55,7 +55,7 @@ class DriverActor(master: String, response: Promise[(Boolean, String)]) extends /** * Executable utility for starting and terminating drivers inside of a standalone cluster. */ -object DriverClient { +object Client { def main(args: Array[String]) { val driverArgs = new ClientArguments(args) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index a9cb998cc2..18885d7ca6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -100,7 +100,7 @@ private[spark] class ExecutorRunner( def getCommandSeq = { val command = Command(appDesc.command.mainClass, - appDesc.command.arguments.map(substituteVariables), appDesc.command.environment) + appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment) CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 7da64749b7..ecd642cc60 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -151,19 +151,20 @@ You can also pass an option `-c ` to control the number of cores that You may also run your application entirely inside of the cluster by submitting your application driver using the submission client. The syntax for submitting applications is as follows: - ./spark-class org.apache.spark.deploy.client.DriverClient launch + ./spark-class org.apache.spark.deploy.Client launch [client-options] \ \ [application-options] cluster-url: The URL of the master node. - application-jar-url: Path to a bundled jar including your application and all dependencies. Currently, the URL must be visible from inside of your cluster, for instance, in an HDFS directory. + application-jar-url: Path to a bundled jar including your application and all dependencies. Currently, the URL must be globally visible inside of your cluster, for instance, an `hdfs://` path or a `file://` path that is present on all nodes. main-class: The entry point for your application. Client Options: --memory (amount of memory, in MB, allocated for your driver program) --cores (number of cores allocated for your driver program) --supervise (whether to automatically restart your driver on application or node failure) + --verbose (prints increased logging output) Keep in mind that your driver program will be executed on a remote worker machine. You can control the execution environment in the following ways: -- cgit v1.2.3 From 112c0a1776bbc866a1026a9579c6f72f293414c4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 8 Jan 2014 21:16:16 -0800 Subject: Fixing config option "retained_stages" => "retainedStages". This is a very esoteric option and it's out of sync with the style we use. So it seems fitting to fix it for 0.9.0. --- core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b7b87250b9..d6d9f0cedf 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler._ */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember - val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt + val RETAINED_STAGES = sc.conf.get("spark.ui.retainedStages", "1000").toInt val DEFAULT_POOL_NAME = "default" val stageIdToPool = new HashMap[Int, String]() diff --git a/docs/configuration.md b/docs/configuration.md index 1d6c3d1633..6717757781 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -130,7 +130,7 @@ Apart from these, the following properties are also available, and may be useful - spark.ui.retained_stages + spark.ui.retainedStages 1000 How many stages the Spark UI remembers before garbage collecting. -- cgit v1.2.3 From c617083e478e3cfbddc4232060aa7b7a0c5812d4 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 9 Jan 2014 09:53:51 -0600 Subject: yarn-client addJar fix and misc other --- .../main/scala/org/apache/spark/SparkContext.scala | 8 ++-- docs/running-on-yarn.md | 15 ++++++- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 29 +++++++++++-- .../cluster/YarnClientSchedulerBackend.scala | 50 ++++++++++++---------- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 29 +++++++++++-- 5 files changed, 94 insertions(+), 37 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fce8f2d48c..f1695c936d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -669,10 +669,10 @@ class SparkContext( key = uri.getScheme match { // A JAR file which exists only on the driver node case null | "file" => - if (SparkHadoopUtil.get.isYarnMode()) { - // In order for this to work on yarn the user must specify the --addjars option to - // the client to upload the file into the distributed cache to make it show up in the - // current working directory. + if (SparkHadoopUtil.get.isYarnMode() && master == "yarn-standalone") { + // In order for this to work in yarn standalone mode the user must specify the + // --addjars option to the client to upload the file into the distributed cache + // of the AM to make it show up in the current working directory. val fileName = new Path(uri.getPath).getName() try { env.httpFileServer.addJar(new File(fileName)) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index b206270107..3bd62646ba 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -101,7 +101,19 @@ With this mode, your application is actually run on the remote machine where the With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR -In order to tune worker core/number/memory etc. You need to export SPARK_WORKER_CORES, SPARK_WORKER_MEMORY, SPARK_WORKER_INSTANCES e.g. by ./conf/spark-env.sh +Configuration in yarn-client mode: + +In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options. + +* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required) +* `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2) +* `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1). +* `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G) +* `SPARK_MASTER_MEMORY`, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb) +* `SPARK_YARN_APP_NAME`, The name of your application (Default: Spark) +* `SPARK_YARN_QUEUE`, The hadoop queue to use for allocation requests (Default: 'default') +* `SPARK_YARN_DIST_FILES`, Comma separated list of files to be distributed with the job. +* `SPARK_YARN_DIST_ARCHIVES`, Comma separated list of archives to be distributed with the job. For example: @@ -114,7 +126,6 @@ For example: SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ MASTER=yarn-client ./bin/spark-shell -You can also send extra files to yarn cluster for worker to use by exporting SPARK_YARN_DIST_FILES=file1,file2... etc. # Building Spark for Hadoop/YARN 2.2.x diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac..66e38ee840 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -76,6 +76,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + appAttemptId = getApplicationAttemptId() resourceManager = registerWithResourceManager() val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() @@ -103,10 +107,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -119,6 +125,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV) diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 4b1b5da048..22e55e0c60 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -22,6 +22,8 @@ import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable.ArrayBuffer + private[spark] class YarnClientSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) @@ -31,45 +33,47 @@ private[spark] class YarnClientSchedulerBackend( var client: Client = null var appId: ApplicationId = null + private[spark] def addArg(optionName: String, optionalParam: String, arrayBuf: ArrayBuffer[String]) { + Option(System.getenv(optionalParam)) foreach { + optParam => { + arrayBuf += (optionName, optParam) + } + } + } + override def start() { super.start() - val defalutWorkerCores = "2" - val defalutWorkerMemory = "512m" - val defaultWorkerNumber = "1" - val userJar = System.getenv("SPARK_YARN_APP_JAR") - val distFiles = System.getenv("SPARK_YARN_DIST_FILES") - var workerCores = System.getenv("SPARK_WORKER_CORES") - var workerMemory = System.getenv("SPARK_WORKER_MEMORY") - var workerNumber = System.getenv("SPARK_WORKER_INSTANCES") - if (userJar == null) throw new SparkException("env SPARK_YARN_APP_JAR is not set") - if (workerCores == null) - workerCores = defalutWorkerCores - if (workerMemory == null) - workerMemory = defalutWorkerMemory - if (workerNumber == null) - workerNumber = defaultWorkerNumber - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort - val argsArray = Array[String]( + val argsArrayBuf = new ArrayBuffer[String]() + argsArrayBuf += ( "--class", "notused", "--jar", userJar, "--args", hostport, - "--worker-memory", workerMemory, - "--worker-cores", workerCores, - "--num-workers", workerNumber, - "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher", - "--files", distFiles + "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) - val args = new ClientArguments(argsArray, conf) + // process any optional arguments, use the defaults already defined in ClientArguments + // if things aren't specified + Map("--master-memory" -> "SPARK_MASTER_MEMORY", + "--num-workers" -> "SPARK_WORKER_INSTANCES", + "--worker-memory" -> "SPARK_WORKER_MEMORY", + "--worker-cores" -> "SPARK_WORKER_CORES", + "--queue" -> "SPARK_YARN_QUEUE", + "--name" -> "SPARK_YARN_APP_NAME", + "--files" -> "SPARK_YARN_DIST_FILES", + "--archives" -> "SPARK_YARN_DIST_ARCHIVES") + .foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) } + + logDebug("ClientArguments called with: " + argsArrayBuf) + val args = new ClientArguments(argsArrayBuf.toArray, conf) client = new Client(args, conf) appId = client.runApp() waitForApp() diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 49248a8516..3e3a4672b4 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -78,6 +78,10 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar def run() { + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. + System.setProperty("spark.local.dir", getLocalDirs()) + amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() @@ -94,10 +98,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L)) + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) + reporterThread = launchReporterThread(interval) // Wait for the reporter thread to Finish. @@ -110,6 +116,21 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar System.exit(0) } + /** Get the Yarn approved local directories. */ + private def getLocalDirs(): String = { + // Hadoop 0.23 and 2.x have different Environment variable names for the + // local dirs, so lets check both. We assume one of the 2 is set. + // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X + val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) + .getOrElse(Option(System.getenv("LOCAL_DIRS")) + .getOrElse("")) + + if (localDirs.isEmpty()) { + throw new Exception("Yarn Local dirs can't be empty") + } + localDirs + } + private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() val containerIdString = envs.get(ApplicationConstants.Environment.CONTAINER_ID.name()) -- cgit v1.2.3 From 460f655cc60b1a1759b6f85ae8860595fc7586b4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 9 Jan 2014 22:42:50 -0800 Subject: Enable shuffle consolidation by default. Bump this to being enabled for 0.9.0. --- core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index e2b24298a5..6e0ff143b7 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -64,7 +64,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - conf.getBoolean("spark.shuffle.consolidateFiles", false) + conf.getBoolean("spark.shuffle.consolidateFiles", true) private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 diff --git a/docs/configuration.md b/docs/configuration.md index 6717757781..b1a0e19167 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -371,7 +371,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.consolidateFiles - false + true If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations. -- cgit v1.2.3 From e4c51d21135978908f7f4a46683f70ef98b720ec Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 10 Jan 2014 15:09:51 -0800 Subject: Address Patrick's and Reynold's comments Aside from trivial formatting changes, use nulls instead of Options for DiskMapIterator, and add documentation for spark.shuffle.externalSorting and spark.shuffle.memoryFraction. Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6. --- .../main/scala/org/apache/spark/Aggregator.scala | 2 +- .../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 3 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../util/collection/ExternalAppendOnlyMap.scala | 89 ++++++++++++---------- docs/configuration.md | 24 +++++- 5 files changed, 73 insertions(+), 47 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 08a96b0c34..8b30cd4bfe 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -32,7 +32,7 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { private val sparkConf = SparkEnv.get.conf - private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean + private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { if (!externalSorting) { diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index b7c7773e58..a73714abca 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { - - val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean + val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c56e2ca2df..56cae6f6b9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66) + val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) (Runtime.getRuntime.maxMemory * memoryFraction).toLong } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 50f05351eb..e3bcd895aa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( // Collective memory threshold shared across all running tasks private val maxMemoryThreshold = { - val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75) + val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong } - // How many inserts into this map before tracking its shuffle memory usage - private val initialInsertThreshold = - sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000) + // Number of pairs in the in-memory map + private var numPairsInMemory = 0 + + // Number of in-memory pairs inserted before tracking the map's shuffle memory usage + private val trackMemoryThreshold = 1000 + + // How many times we have spilled so far + private var spillCount = 0 private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 - private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean + private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) private val comparator = new KCComparator[K, C] private val ser = serializer.newInstance() - private var insertCount = 0 - private var spillCount = 0 /** * Insert the given key and value into the map. @@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( * enough room for this to happen. If so, allocate the memory required to grow the map; * otherwise, spill the in-memory map to disk. * - * The shuffle memory usage of the first initialInsertThreshold entries is not tracked. + * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked. */ def insert(key: K, value: V) { - insertCount += 1 val update: (Boolean, C) => C = (hadVal, oldVal) => { if (hadVal) mergeValue(oldVal, value) else createCombiner(value) } - if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) { + if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) { val mapSize = currentMap.estimateSize() var shouldSpill = false val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap @@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( val availableMemory = maxMemoryThreshold - (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) - // Assume map grow factor is 2x + // Assume map growth factor is 2x shouldSpill = availableMemory < mapSize * 2 if (!shouldSpill) { shuffleMemoryMap(threadId) = mapSize * 2 @@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } currentMap.changeValue(key, update) + numPairsInMemory += 1 } /** @@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( */ private def spill(mapSize: Long) { spillCount += 1 - logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)" + logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)" .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() val writer = @@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap.synchronized { shuffleMemoryMap(Thread.currentThread().getId) = 0 } - insertCount = 0 + numPairsInMemory = 0 } + /** + * Return an iterator that merges the in-memory map with the spilled maps. + * If no spill has occurred, simply return the in-memory map's iterator. + */ override def iterator: Iterator[(K, C)] = { if (spilledMaps.isEmpty) { currentMap.iterator @@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } - /** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */ + /** + * An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps + */ private class ExternalIterator extends Iterator[(K, C)] { // A fixed-size queue that maintains a buffer for each stream we are currently merging @@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( val sortedMap = currentMap.destructiveSortedIterator(comparator) val inputStreams = Seq(sortedMap) ++ spilledMaps - inputStreams.foreach{ it => + inputStreams.foreach { it => val kcPairs = getMorePairs(it) mergeHeap.enqueue(StreamBuffer(it, kcPairs)) } @@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( /** * Fetch from the given iterator until a key of different hash is retrieved. In the * event of key hash collisions, this ensures no pairs are hidden from being merged. + * Assume the given iterator is in sorted order. */ def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { val kcPairs = new ArrayBuffer[(K, C)] @@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( baseCombiner } - override def hasNext: Boolean = { - mergeHeap.foreach{ buffer => - if (!buffer.pairs.isEmpty) { - return true - } - } - false - } + /** + * Return true if there exists an input stream that still has unvisited pairs + */ + override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty) + /** + * Select a key with the minimum hash, then combine all values with the same key from all input streams. + */ override def next(): (K, C) = { - // Select a return key from the StreamBuffer that holds the lowest key hash + // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) if (minPairs.length == 0) { @@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( } } - // Iterate through (K, C) pairs in sorted order from an on-disk map + /** + * An iterator that returns (K, C) pairs in sorted order from an on-disk map + */ private class DiskMapIterator(file: File) extends Iterator[(K, C)] { val fileStream = new FileInputStream(file) val bufferedStream = new FastBufferedInputStream(fileStream) val deserializeStream = ser.deserializeStream(bufferedStream) - var nextItem: Option[(K, C)] = None + var nextItem: (K, C) = null var eof = false - def readNextItem(): Option[(K, C)] = { + def readNextItem(): (K, C) = { if (!eof) { try { - return Some(deserializeStream.readObject().asInstanceOf[(K, C)]) + return deserializeStream.readObject().asInstanceOf[(K, C)] } catch { case e: EOFException => eof = true cleanup() } } - None + null } override def hasNext: Boolean = { - nextItem match { - case Some(item) => true - case None => - nextItem = readNextItem() - nextItem.isDefined + if (nextItem == null) { + nextItem = readNextItem() } + nextItem != null } override def next(): (K, C) = { - nextItem match { - case Some(item) => - nextItem = None - item - case None => - val item = readNextItem() - item.getOrElse(throw new NoSuchElementException) + val item = if (nextItem == null) readNextItem() else nextItem + if (item == null) { + throw new NoSuchElementException } + nextItem = null + item } // TODO: Ensure this gets called even if the iterator isn't drained. diff --git a/docs/configuration.md b/docs/configuration.md index 6717757781..c1158491f0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -104,13 +104,24 @@ Apart from these, the following properties are also available, and may be useful spark.storage.memoryFraction - 0.66 + 0.6 Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase + generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase it if you configure your own old generation size. + + spark.shuffle.memoryFraction + 0.3 + + Fraction of Java heap to use for aggregation and cogroups during shuffles, if + spark.shuffle.externalSorting is enabled. At any given time, the collective size of + all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will + begin to spill to disk. If spills are often, consider increasing this value at the expense of + spark.storage.memoryFraction. + + spark.mesos.coarse false @@ -376,6 +387,15 @@ Apart from these, the following properties are also available, and may be useful If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations. + + spark.shuffle.externalSorting + true + + If set to "true", spills in-memory maps used for shuffles to disk when a memory threshold is reached. This + threshold is specified by spark.shuffle.memoryFraction. Enable this especially for memory-intensive + applications. + + spark.speculation false -- cgit v1.2.3 From 2e393cd5fdfbf3a85fced370b5c42315e86dad49 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 10 Jan 2014 15:45:38 -0800 Subject: Update documentation for externalSorting --- docs/configuration.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'docs') diff --git a/docs/configuration.md b/docs/configuration.md index c1158491f0..40a57c4bc6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -391,9 +391,8 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.externalSorting true - If set to "true", spills in-memory maps used for shuffles to disk when a memory threshold is reached. This - threshold is specified by spark.shuffle.memoryFraction. Enable this especially for memory-intensive - applications. + If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling + threshold is specified by spark.shuffle.memoryFraction. -- cgit v1.2.3 From 448aef6790caa3728bcc43f518afb69807597c39 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 11:31:54 -0800 Subject: Moved DStream, DStreamCheckpointData and PairDStream from org.apache.spark.streaming to org.apache.spark.streaming.dstream. --- docs/streaming-programming-guide.md | 2 +- .../apache/spark/streaming/flume/FlumeUtils.scala | 3 +- .../apache/spark/streaming/kafka/KafkaUtils.scala | 3 +- .../apache/spark/streaming/mqtt/MQTTUtils.scala | 3 +- .../spark/streaming/twitter/TwitterUtils.scala | 3 +- .../spark/streaming/zeromq/ZeroMQUtils.scala | 3 +- .../scala/org/apache/spark/streaming/DStream.scala | 741 -------------------- .../spark/streaming/DStreamCheckpointData.scala | 128 ---- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../spark/streaming/PairDStreamFunctions.scala | 621 ----------------- .../spark/streaming/api/java/JavaDStream.scala | 3 +- .../spark/streaming/api/java/JavaDStreamLike.scala | 1 + .../spark/streaming/api/java/JavaPairDStream.scala | 1 + .../streaming/api/java/JavaStreamingContext.scala | 1 + .../apache/spark/streaming/dstream/DStream.scala | 742 +++++++++++++++++++++ .../streaming/dstream/DStreamCheckpointData.scala | 126 ++++ .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../spark/streaming/dstream/FilteredDStream.scala | 2 +- .../streaming/dstream/FlatMapValuedDStream.scala | 2 +- .../streaming/dstream/FlatMappedDStream.scala | 2 +- .../spark/streaming/dstream/ForEachDStream.scala | 2 +- .../spark/streaming/dstream/GlommedDStream.scala | 2 +- .../spark/streaming/dstream/InputDStream.scala | 2 +- .../streaming/dstream/MapPartitionedDStream.scala | 2 +- .../spark/streaming/dstream/MapValuedDStream.scala | 2 +- .../spark/streaming/dstream/MappedDStream.scala | 2 +- .../streaming/dstream/PairDStreamFunctions.scala | 622 +++++++++++++++++ .../streaming/dstream/ReducedWindowedDStream.scala | 2 +- .../spark/streaming/dstream/ShuffledDStream.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 2 +- .../streaming/dstream/TransformedDStream.scala | 2 +- .../spark/streaming/dstream/UnionDStream.scala | 3 +- .../spark/streaming/util/MasterFailureTest.scala | 2 +- .../spark/streaming/BasicOperationsSuite.scala | 1 + .../apache/spark/streaming/CheckpointSuite.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 1 + .../spark/streaming/StreamingListenerSuite.scala | 1 + .../org/apache/spark/streaming/TestSuiteBase.scala | 2 +- .../spark/streaming/WindowOperationsSuite.scala | 1 + .../spark/tools/JavaAPICompletenessChecker.scala | 50 +- 40 files changed, 1555 insertions(+), 1543 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/DStream.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala (limited to 'docs') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1c9ece6270..cec1b75baf 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -167,7 +167,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo -A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions). +A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.dstream.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions). ## Output Operations When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined: diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 834b775d4f..a01c17ac5d 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -18,8 +18,9 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.dstream.DStream object FlumeUtils { /** diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index c2d851f943..df4ecac8d1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -26,8 +26,9 @@ import java.util.{Map => JMap} import kafka.serializer.{Decoder, StringDecoder} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream} +import org.apache.spark.streaming.dstream.DStream object KafkaUtils { diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 0e6c25dbee..eacb26f6c5 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -18,9 +18,10 @@ package org.apache.spark.streaming.mqtt import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.DStream object MQTTUtils { /** diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 5e506ffabc..8ea52c4e5b 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -20,8 +20,9 @@ package org.apache.spark.streaming.twitter import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.DStream object TwitterUtils { /** diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 546d9df3b5..669eb0d9c9 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -25,8 +25,9 @@ import akka.zeromq.Subscribe import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy -import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.dstream.DStream object ZeroMQUtils { /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala deleted file mode 100644 index d59146e069..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ /dev/null @@ -1,741 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import StreamingContext._ -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner - -import scala.collection.mutable.HashMap -import scala.reflect.ClassTag - -import java.io.{ObjectInputStream, IOException, ObjectOutputStream} - - -/** - * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous - * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] - * for more details on RDDs). DStreams can either be created from live data (such as, data from - * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations - * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each - * DStream periodically generates a RDD, either from live data or by transforming the RDD generated - * by a parent DStream. - * - * This class contains the basic operations available on all DStreams, such as `map`, `filter` and - * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available - * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations - * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through - * implicit conversions when `spark.streaming.StreamingContext._` is imported. - * - * DStreams internally is characterized by a few basic properties: - * - A list of other DStreams that the DStream depends on - * - A time interval at which the DStream generates an RDD - * - A function that is used to generate an RDD after each time interval - */ - -abstract class DStream[T: ClassTag] ( - @transient private[streaming] var ssc: StreamingContext - ) extends Serializable with Logging { - - // ======================================================================= - // Methods that should be implemented by subclasses of DStream - // ======================================================================= - - /** Time interval after which the DStream generates a RDD */ - def slideDuration: Duration - - /** List of parent DStreams on which this DStream depends on */ - def dependencies: List[DStream[_]] - - /** Method that generates a RDD for the given time */ - def compute (validTime: Time): Option[RDD[T]] - - // ======================================================================= - // Methods and fields available on all DStreams - // ======================================================================= - - // RDDs generated, marked as private[streaming] so that testsuites can access it - @transient - private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () - - // Time zero for the DStream - private[streaming] var zeroTime: Time = null - - // Duration for which the DStream will remember each RDD created - private[streaming] var rememberDuration: Duration = null - - // Storage level of the RDDs in the stream - private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE - - // Checkpoint details - private[streaming] val mustCheckpoint = false - private[streaming] var checkpointDuration: Duration = null - private[streaming] val checkpointData = new DStreamCheckpointData(this) - - // Reference to whole DStream graph - private[streaming] var graph: DStreamGraph = null - - private[streaming] def isInitialized = (zeroTime != null) - - // Duration for which the DStream requires its parent DStream to remember each RDD created - private[streaming] def parentRememberDuration = rememberDuration - - /** Return the StreamingContext associated with this DStream */ - def context = ssc - - /** Persist the RDDs of this DStream with the given storage level */ - def persist(level: StorageLevel): DStream[T] = { - if (this.isInitialized) { - throw new UnsupportedOperationException( - "Cannot change storage level of an DStream after streaming context has started") - } - this.storageLevel = level - this - } - - /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) - - /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ - def cache(): DStream[T] = persist() - - /** - * Enable periodic checkpointing of RDDs of this DStream - * @param interval Time interval after which generated RDD will be checkpointed - */ - def checkpoint(interval: Duration): DStream[T] = { - if (isInitialized) { - throw new UnsupportedOperationException( - "Cannot change checkpoint interval of an DStream after streaming context has started") - } - persist() - checkpointDuration = interval - this - } - - /** - * Initialize the DStream by setting the "zero" time, based on which - * the validity of future times is calculated. This method also recursively initializes - * its parent DStreams. - */ - private[streaming] def initialize(time: Time) { - if (zeroTime != null && zeroTime != time) { - throw new Exception("ZeroTime is already initialized to " + zeroTime - + ", cannot initialize it again to " + time) - } - zeroTime = time - - // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger - if (mustCheckpoint && checkpointDuration == null) { - checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt - logInfo("Checkpoint interval automatically set to " + checkpointDuration) - } - - // Set the minimum value of the rememberDuration if not already set - var minRememberDuration = slideDuration - if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { - minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten - } - if (rememberDuration == null || rememberDuration < minRememberDuration) { - rememberDuration = minRememberDuration - } - - // Initialize the dependencies - dependencies.foreach(_.initialize(zeroTime)) - } - - private[streaming] def validate() { - assert(rememberDuration != null, "Remember duration is set to null") - - assert( - !mustCheckpoint || checkpointDuration != null, - "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + - " Please use DStream.checkpoint() to set the interval." - ) - - assert( - checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, - "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + - " or SparkContext.checkpoint() to set the checkpoint directory." - ) - - assert( - checkpointDuration == null || checkpointDuration >= slideDuration, - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + - "Please set it to at least " + slideDuration + "." - ) - - assert( - checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), - "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + - "Please set it to a multiple " + slideDuration + "." - ) - - assert( - checkpointDuration == null || storageLevel != StorageLevel.NONE, - "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + - "level has not been set to enable persisting. Please use DStream.persist() to set the " + - "storage level to use memory for better checkpointing performance." - ) - - assert( - checkpointDuration == null || rememberDuration > checkpointDuration, - "The remember duration for " + this.getClass.getSimpleName + " has been set to " + - rememberDuration + " which is not more than the checkpoint interval (" + - checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." - ) - - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) - logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - assert( - metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, - "It seems you are doing some DStream window operation or setting a checkpoint interval " + - "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + - "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." - ) - - dependencies.foreach(_.validate()) - - logInfo("Slide time = " + slideDuration) - logInfo("Storage level = " + storageLevel) - logInfo("Checkpoint interval = " + checkpointDuration) - logInfo("Remember duration = " + rememberDuration) - logInfo("Initialized and validated " + this) - } - - private[streaming] def setContext(s: StreamingContext) { - if (ssc != null && ssc != s) { - throw new Exception("Context is already set in " + this + ", cannot set it again") - } - ssc = s - logInfo("Set context for " + this) - dependencies.foreach(_.setContext(ssc)) - } - - private[streaming] def setGraph(g: DStreamGraph) { - if (graph != null && graph != g) { - throw new Exception("Graph is already set in " + this + ", cannot set it again") - } - graph = g - dependencies.foreach(_.setGraph(graph)) - } - - private[streaming] def remember(duration: Duration) { - if (duration != null && duration > rememberDuration) { - rememberDuration = duration - logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) - } - dependencies.foreach(_.remember(parentRememberDuration)) - } - - /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ - private[streaming] def isTimeValid(time: Time): Boolean = { - if (!isInitialized) { - throw new Exception (this + " has not been initialized") - } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { - logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) - false - } else { - logDebug("Time " + time + " is valid") - true - } - } - - /** - * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal - * method that should not be called directly. - */ - private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { - // If this DStream was not initialized (i.e., zeroTime not set), then do it - // If RDD was already generated, then retrieve it from HashMap - generatedRDDs.get(time) match { - - // If an RDD was already generated and is being reused, then - // probably all RDDs in this DStream will be reused and hence should be cached - case Some(oldRDD) => Some(oldRDD) - - // if RDD was not generated, and if the time is valid - // (based on sliding time of this DStream), then generate the RDD - case None => { - if (isTimeValid(time)) { - compute(time) match { - case Some(newRDD) => - if (storageLevel != StorageLevel.NONE) { - newRDD.persist(storageLevel) - logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) - } - if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { - newRDD.checkpoint() - logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) - } - generatedRDDs.put(time, newRDD) - Some(newRDD) - case None => - None - } - } else { - None - } - } - } - } - - /** - * Generate a SparkStreaming job for the given time. This is an internal method that - * should not be called directly. This default implementation creates a job - * that materializes the corresponding RDD. Subclasses of DStream may override this - * to generate their own jobs. - */ - private[streaming] def generateJob(time: Time): Option[Job] = { - getOrCompute(time) match { - case Some(rdd) => { - val jobFunc = () => { - val emptyFunc = { (iterator: Iterator[T]) => {} } - context.sparkContext.runJob(rdd, emptyFunc) - } - Some(new Job(time, jobFunc)) - } - case None => None - } - } - - /** - * Clear metadata that are older than `rememberDuration` of this DStream. - * This is an internal method that should not be called directly. This default - * implementation clears the old generated RDDs. Subclasses of DStream may override - * this to clear their own metadata along with the generated RDDs. - */ - private[streaming] def clearMetadata(time: Time) { - val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) - generatedRDDs --= oldRDDs.keys - logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + - (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) - dependencies.foreach(_.clearMetadata(time)) - } - - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - - /** - * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of - * this stream. This is an internal method that should not be called directly. This is - * a default implementation that saves only the file names of the checkpointed RDDs to - * checkpointData. Subclasses of DStream (especially those of InputDStream) may override - * this method to save custom checkpoint data. - */ - private[streaming] def updateCheckpointData(currentTime: Time) { - logInfo("Updating checkpoint data for time " + currentTime) - checkpointData.update(currentTime) - dependencies.foreach(_.updateCheckpointData(currentTime)) - logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) - } - - private[streaming] def clearCheckpointData(time: Time) { - logInfo("Clearing checkpoint data") - checkpointData.cleanup(time) - dependencies.foreach(_.clearCheckpointData(time)) - logInfo("Cleared checkpoint data") - } - - /** - * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method - * that should not be called directly. This is a default implementation that recreates RDDs - * from the checkpoint file names stored in checkpointData. Subclasses of DStream that - * override the updateCheckpointData() method would also need to override this method. - */ - private[streaming] def restoreCheckpointData() { - // Create RDDs from the checkpoint data - logInfo("Restoring checkpoint data") - checkpointData.restore() - dependencies.foreach(_.restoreCheckpointData()) - logInfo("Restored checkpoint data") - } - - @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { - logDebug(this.getClass().getSimpleName + ".writeObject used") - if (graph != null) { - graph.synchronized { - if (graph.checkpointInProgress) { - oos.defaultWriteObject() - } else { - val msg = "Object of " + this.getClass.getName + " is being serialized " + - " possibly as a part of closure of an RDD operation. This is because " + - " the DStream object is being referred to from within the closure. " + - " Please rewrite the RDD operation inside this DStream to avoid this. " + - " This has been enforced to avoid bloating of Spark tasks " + - " with unnecessary objects." - throw new java.io.NotSerializableException(msg) - } - } - } else { - throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") - } - } - - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - logDebug(this.getClass().getSimpleName + ".readObject used") - ois.defaultReadObject() - generatedRDDs = new HashMap[Time, RDD[T]] () - } - - // ======================================================================= - // DStream operations - // ======================================================================= - - /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[U: ClassTag](mapFunc: T => U): DStream[U] = { - new MappedDStream(this, context.sparkContext.clean(mapFunc)) - } - - /** - * Return a new DStream by applying a function to all elements of this DStream, - * and then flattening the results - */ - def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { - new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) - } - - /** Return a new DStream containing only the elements that satisfy a predicate. */ - def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) - - /** - * Return a new DStream in which each RDD is generated by applying glom() to each RDD of - * this DStream. Applying glom() to an RDD coalesces all elements within each partition into - * an array. - */ - def glom(): DStream[Array[T]] = new GlommedDStream(this) - - - /** - * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the - * returned DStream has exactly numPartitions partitions. - */ - def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) - - /** - * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs - * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition - * of the RDD. - */ - def mapPartitions[U: ClassTag]( - mapPartFunc: Iterator[T] => Iterator[U], - preservePartitioning: Boolean = false - ): DStream[U] = { - new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing each RDD - * of this DStream. - */ - def reduce(reduceFunc: (T, T) => T): DStream[T] = - this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) - - /** - * Return a new DStream in which each RDD has a single element generated by counting each RDD - * of this DStream. - */ - def count(): DStream[Long] = { - this.map(_ => (null, 1L)) - .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) - .reduceByKey(_ + _) - .map(_._2) - } - - /** - * Return a new DStream in which each RDD contains the counts of each distinct value in - * each RDD of this DStream. Hash partitioning is used to generate - * the RDDs with `numPartitions` partitions (Spark's default number of partitions if - * `numPartitions` not specified). - */ - def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = - this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) - } - - /** - * Apply a function to each RDD in this DStream. This is an output operator, so - * 'this' DStream will be registered as an output stream and therefore materialized. - */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { - ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream. - */ - def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { - transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream. - */ - def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { - //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) - val cleanedF = context.sparkContext.clean(transformFunc) - val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { - assert(rdds.length == 1) - cleanedF(rdds.head.asInstanceOf[RDD[T]], time) - } - new TransformedDStream[U](Seq(this), realTransformFunc) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream and 'other' DStream. - */ - def transformWith[U: ClassTag, V: ClassTag]( - other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] - ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) - transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) - } - - /** - * Return a new DStream in which each RDD is generated by applying a function - * on each RDD of 'this' DStream and 'other' DStream. - */ - def transformWith[U: ClassTag, V: ClassTag]( - other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] - ): DStream[V] = { - val cleanedF = ssc.sparkContext.clean(transformFunc) - val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { - assert(rdds.length == 2) - val rdd1 = rdds(0).asInstanceOf[RDD[T]] - val rdd2 = rdds(1).asInstanceOf[RDD[U]] - cleanedF(rdd1, rdd2, time) - } - new TransformedDStream[V](Seq(this, other), realTransformFunc) - } - - /** - * Print the first ten elements of each RDD generated in this DStream. This is an output - * operator, so this DStream will be registered as an output stream and there materialized. - */ - def print() { - def foreachFunc = (rdd: RDD[T], time: Time) => { - val first11 = rdd.take(11) - println ("-------------------------------------------") - println ("Time: " + time) - println ("-------------------------------------------") - first11.take(10).foreach(println) - if (first11.size > 10) println("...") - println() - } - val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) - ssc.registerOutputStream(newStream) - } - - /** - * Return a new DStream in which each RDD contains all the elements in seen in a - * sliding window of time over this DStream. The new DStream generates RDDs with - * the same interval as this DStream. - * @param windowDuration width of the window; must be a multiple of this DStream's interval. - */ - def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) - - /** - * Return a new DStream in which each RDD contains all the elements in seen in a - * sliding window of time over this DStream. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { - new WindowedDStream(this, windowDuration, slideDuration) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByWindow( - reduceFunc: (T, T) => T, - windowDuration: Duration, - slideDuration: Duration - ): DStream[T] = { - this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) - } - - /** - * Return a new DStream in which each RDD has a single element generated by reducing all - * elements in a sliding window over this DStream. However, the reduction is done incrementally - * using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient than reduceByWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByWindow( - reduceFunc: (T, T) => T, - invReduceFunc: (T, T) => T, - windowDuration: Duration, - slideDuration: Duration - ): DStream[T] = { - this.map(x => (1, x)) - .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) - .map(_._2) - } - - /** - * Return a new DStream in which each RDD has a single element generated by counting the number - * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { - this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) - } - - /** - * Return a new DStream in which each RDD contains the count of distinct elements in - * RDDs in a sliding window over this DStream. Hash partitioning is used to generate - * the RDDs with `numPartitions` partitions (Spark's default number of partitions if - * `numPartitions` not specified). - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream. - */ - def countByValueAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int = ssc.sc.defaultParallelism - ): DStream[(T, Long)] = { - - this.map(x => (x, 1L)).reduceByKeyAndWindow( - (x: Long, y: Long) => x + y, - (x: Long, y: Long) => x - y, - windowDuration, - slideDuration, - numPartitions, - (x: (T, Long)) => x._2 != 0L - ) - } - - /** - * Return a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same slideDuration as this DStream. - */ - def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) - - /** - * Return all the RDDs defined by the Interval object (both end times included) - */ - def slice(interval: Interval): Seq[RDD[T]] = { - slice(interval.beginTime, interval.endTime) - } - - /** - * Return all the RDDs between 'fromTime' to 'toTime' (both included) - */ - def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") - } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") - } - val alignedToTime = toTime.floor(slideDuration) - val alignedFromTime = fromTime.floor(slideDuration) - - logInfo("Slicing from " + fromTime + " to " + toTime + - " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") - - alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { - if (time >= zeroTime) getOrCompute(time) else None - }) - } - - /** - * Save each RDD in this DStream as a Sequence file of serialized objects. - * The file name at each batch interval is generated based on `prefix` and - * `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsObjectFiles(prefix: String, suffix: String = "") { - val saveFunc = (rdd: RDD[T], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsObjectFile(file) - } - this.foreach(saveFunc) - } - - /** - * Save each RDD in this DStream as at text file, using string representation - * of elements. The file name at each batch interval is generated based on - * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsTextFiles(prefix: String, suffix: String = "") { - val saveFunc = (rdd: RDD[T], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsTextFile(file) - } - this.foreach(saveFunc) - } - - def register() { - ssc.registerOutputStream(this) - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala deleted file mode 100644 index 671f7bbce7..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import scala.collection.mutable.{HashMap, HashSet} -import scala.reflect.ClassTag - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.FileSystem - -import org.apache.spark.Logging - -import java.io.{ObjectInputStream, IOException} - -private[streaming] -class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) - extends Serializable with Logging { - protected val data = new HashMap[Time, AnyRef]() - - // Mapping of the batch time to the checkpointed RDD file of that time - @transient private var timeToCheckpointFile = new HashMap[Time, String] - // Mapping of the batch time to the time of the oldest checkpointed RDD - // in that batch's checkpoint data - @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] - - @transient private var fileSystem : FileSystem = null - protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] - - /** - * Updates the checkpoint data of the DStream. This gets called every time - * the graph checkpoint is initiated. Default implementation records the - * checkpoint files to which the generate RDDs of the DStream has been saved. - */ - def update(time: Time) { - - // Get the checkpointed RDDs from the generated RDDs - val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) - .map(x => (x._1, x._2.getCheckpointFile.get)) - logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - - // Add the checkpoint files to the data to be serialized - if (!checkpointFiles.isEmpty) { - currentCheckpointFiles.clear() - currentCheckpointFiles ++= checkpointFiles - // Add the current checkpoint files to the map of all checkpoint files - // This will be used to delete old checkpoint files - timeToCheckpointFile ++= currentCheckpointFiles - // Remember the time of the oldest checkpoint RDD in current state - timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) - } - } - - /** - * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been - * written to the checkpoint directory. - */ - def cleanup(time: Time) { - // Get the time of the oldest checkpointed RDD that was written as part of the - // checkpoint of `time` - timeToOldestCheckpointFileTime.remove(time) match { - case Some(lastCheckpointFileTime) => - // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` - // This is because checkpointed RDDs older than this are not going to be needed - // even after master fails, as the checkpoint data of `time` does not refer to those files - val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) - logDebug("Files to delete:\n" + filesToDelete.mkString(",")) - filesToDelete.foreach { - case (time, file) => - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) - } - fileSystem.delete(path, true) - timeToCheckpointFile -= time - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) - fileSystem = null - } - } - case None => - logInfo("Nothing to delete") - } - } - - /** - * Restore the checkpoint data. This gets called once when the DStream graph - * (along with its DStreams) are being restored from a graph checkpoint file. - * Default implementation restores the RDDs from their checkpoint files. - */ - def restore() { - // Create RDDs from the checkpoint data - currentCheckpointFiles.foreach { - case(time, file) => { - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") - dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) - } - } - } - - override def toString() = { - "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" - } - - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - ois.defaultReadObject() - timeToOldestCheckpointFileTime = new HashMap[Time, Time] - timeToCheckpointFile = new HashMap[Time, String] - } -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 668e5324e6..31038a06b8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -17,11 +17,11 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream} +import scala.collection.mutable.ArrayBuffer import java.io.{ObjectInputStream, IOException, ObjectOutputStream} -import collection.mutable.ArrayBuffer import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream} final private[streaming] class DStreamGraph extends Serializable with Logging { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala deleted file mode 100644 index 56dbcbda23..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream._ - -import org.apache.spark.{Partitioner, HashPartitioner} -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} -import org.apache.spark.storage.StorageLevel - -import scala.collection.mutable.ArrayBuffer -import scala.reflect.{ClassTag, classTag} - -import org.apache.hadoop.mapred.{JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.conf.Configuration - -class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) -extends Serializable { - - private[streaming] def ssc = self.ssc - - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { - new HashPartitioner(numPartitions) - } - - /** - * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - */ - def groupByKey(): DStream[(K, Seq[V])] = { - groupByKey(defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - */ - def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { - groupByKey(defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] - * is used to control the partitioning of each RDD. - */ - def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { - val createCombiner = (v: V) => ArrayBuffer[V](v) - val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) - val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) - combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. - */ - def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { - reduceByKey(reduceFunc, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs - * with `numPartitions` partitions. - */ - def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { - reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. - */ - def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) - } - - /** - * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. - */ - def combineByKey[C: ClassTag]( - createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiner: (C, C) => C, - partitioner: Partitioner, - mapSideCombine: Boolean = true): DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to - * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs - * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with - * Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window. Similar to - * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream; if not specified - * then Spark's default number of partitions will be used - */ - def groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int - ): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) - } - - /** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. - */ - def groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner - ): DStream[(K, Seq[V])] = { - val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v - val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v - val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 - self.groupByKey(partitioner) - .window(windowDuration, slideDuration) - .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream - * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate - * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to - * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to - * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream. - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int - ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to - * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD - * in the new DStream. - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner - ): DStream[(K, V)] = { - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - self.reduceByKey(cleanedReduceFunc, partitioner) - .window(windowDuration, slideDuration) - .reduceByKey(cleanedReduceFunc, partitioner) - } - - /** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration = self.slideDuration, - numPartitions: Int = ssc.sc.defaultParallelism, - filterFunc: ((K, V)) => Boolean = null - ): DStream[(K, V)] = { - - reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowDuration, - slideDuration, defaultPartitioner(numPartitions), filterFunc - ) - } - - /** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ - def reduceByKeyAndWindow( - reduceFunc: (V, V) => V, - invReduceFunc: (V, V) => V, - windowDuration: Duration, - slideDuration: Duration, - partitioner: Partitioner, - filterFunc: ((K, V)) => Boolean - ): DStream[(K, V)] = { - - val cleanedReduceFunc = ssc.sc.clean(reduceFunc) - val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) - val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None - new ReducedWindowedDStream[K, V]( - self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, - windowDuration, slideDuration, partitioner - ) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S] - ): DStream[(K, S)] = { - updateStateByKey(updateFunc, defaultPartitioner()) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @param numPartitions Number of partitions of each RDD in the new DStream. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S], - numPartitions: Int - ): DStream[(K, S)] = { - updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Seq[V], Option[S]) => Option[S], - partitioner: Partitioner - ): DStream[(K, S)] = { - val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { - iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) - } - updateStateByKey(newUpdateFunc, partitioner, true) - } - - /** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * @param updateFunc State update function. If `this` function returns None, then - * corresponding state key-value pair will be eliminated. Note, that - * this function may generate a different a tuple with a different key - * than the input key. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. - * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. - * @tparam S State type - */ - def updateStateByKey[S: ClassTag]( - updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], - partitioner: Partitioner, - rememberPartitioner: Boolean - ): DStream[(K, S)] = { - new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) - } - - /** - * Return a new DStream by applying a map function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ - def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { - new MapValuedDStream[K, V, U](self, mapValuesFunc) - } - - /** - * Return a new DStream by applying a flatmap function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ - def flatMapValues[U: ClassTag]( - flatMapValuesFunc: V => TraversableOnce[U] - ): DStream[(K, U)] = { - new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number - * of partitions. - */ - def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. - */ - def cogroup[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (Seq[V], Seq[W]))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - */ - def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { - join[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ - def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { - join[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - */ - def join[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (V, W))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. - */ - def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { - leftOuterJoin[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ - def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - numPartitions: Int - ): DStream[(K, (V, Option[W]))] = { - leftOuterJoin[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control - * the partitioning of each RDD. - */ - def leftOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (V, Option[W]))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) - ) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. - */ - def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { - rightOuterJoin[W](other, defaultPartitioner()) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ - def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - numPartitions: Int - ): DStream[(K, (Option[V], W))] = { - rightOuterJoin[W](other, defaultPartitioner(numPartitions)) - } - - /** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control - * the partitioning of each RDD. - */ - def rightOuterJoin[W: ClassTag]( - other: DStream[(K, W)], - partitioner: Partitioner - ): DStream[(K, (Option[V], W))] = { - self.transformWith( - other, - (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) - ) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval - * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" - */ - def saveAsHadoopFiles[F <: OutputFormat[K, V]]( - prefix: String, - suffix: String - )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval - * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" - */ - def saveAsHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf - ) { - val saveFunc = (rdd: RDD[(K, V)], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) - } - self.foreach(saveFunc) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( - prefix: String, - suffix: String - )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) - } - - /** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ - def saveAsNewAPIHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration - ) { - val saveFunc = (rdd: RDD[(K, V)], time: Time) => { - val file = rddToFileName(prefix, suffix, time) - rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) - } - self.foreach(saveFunc) - } - - private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass - - private def getValueClass() = implicitly[ClassTag[V]].runtimeClass -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d29033df32..c92854ccd9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -17,13 +17,14 @@ package org.apache.spark.streaming.api.java -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.api.java.JavaRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import scala.reflect.ClassTag +import org.apache.spark.streaming.dstream.DStream /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0..d3cd52ad7c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _} import java.util import org.apache.spark.rdd.RDD import JavaDStream._ +import org.apache.spark.streaming.dstream.DStream trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]] extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 6c3467d405..6bb985ca54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel import com.google.common.base.Optional import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PairRDDFunctions +import org.apache.spark.streaming.dstream.DStream class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( implicit val kManifest: ClassTag[K], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index ea7f7da6f3..03b422333f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamingListener import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.dstream.DStream /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala new file mode 100644 index 0000000000..fd72ebc3d8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag + +import java.io.{ObjectInputStream, IOException, ObjectOutputStream} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.MetadataCleaner +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.streaming.Duration + + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ + +abstract class DStream[T: ClassTag] ( + @transient private[streaming] var ssc: StreamingContext + ) extends Serializable with Logging { + + // ======================================================================= + // Methods that should be implemented by subclasses of DStream + // ======================================================================= + + /** Time interval after which the DStream generates a RDD */ + def slideDuration: Duration + + /** List of parent DStreams on which this DStream depends on */ + def dependencies: List[DStream[_]] + + /** Method that generates a RDD for the given time */ + def compute (validTime: Time): Option[RDD[T]] + + // ======================================================================= + // Methods and fields available on all DStreams + // ======================================================================= + + // RDDs generated, marked as private[streaming] so that testsuites can access it + @transient + private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () + + // Time zero for the DStream + private[streaming] var zeroTime: Time = null + + // Duration for which the DStream will remember each RDD created + private[streaming] var rememberDuration: Duration = null + + // Storage level of the RDDs in the stream + private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE + + // Checkpoint details + private[streaming] val mustCheckpoint = false + private[streaming] var checkpointDuration: Duration = null + private[streaming] val checkpointData = new DStreamCheckpointData(this) + + // Reference to whole DStream graph + private[streaming] var graph: DStreamGraph = null + + private[streaming] def isInitialized = (zeroTime != null) + + // Duration for which the DStream requires its parent DStream to remember each RDD created + private[streaming] def parentRememberDuration = rememberDuration + + /** Return the StreamingContext associated with this DStream */ + def context = ssc + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(level: StorageLevel): DStream[T] = { + if (this.isInitialized) { + throw new UnsupportedOperationException( + "Cannot change storage level of an DStream after streaming context has started") + } + this.storageLevel = level + this + } + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): DStream[T] = persist() + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration): DStream[T] = { + if (isInitialized) { + throw new UnsupportedOperationException( + "Cannot change checkpoint interval of an DStream after streaming context has started") + } + persist() + checkpointDuration = interval + this + } + + /** + * Initialize the DStream by setting the "zero" time, based on which + * the validity of future times is calculated. This method also recursively initializes + * its parent DStreams. + */ + private[streaming] def initialize(time: Time) { + if (zeroTime != null && zeroTime != time) { + throw new Exception("ZeroTime is already initialized to " + zeroTime + + ", cannot initialize it again to " + time) + } + zeroTime = time + + // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger + if (mustCheckpoint && checkpointDuration == null) { + checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt + logInfo("Checkpoint interval automatically set to " + checkpointDuration) + } + + // Set the minimum value of the rememberDuration if not already set + var minRememberDuration = slideDuration + if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { + minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + } + if (rememberDuration == null || rememberDuration < minRememberDuration) { + rememberDuration = minRememberDuration + } + + // Initialize the dependencies + dependencies.foreach(_.initialize(zeroTime)) + } + + private[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + + assert( + !mustCheckpoint || checkpointDuration != null, + "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set." + + " Please use DStream.checkpoint() to set the interval." + ) + + assert( + checkpointDuration == null || context.sparkContext.checkpointDir.isDefined, + "The checkpoint directory has not been set. Please use StreamingContext.checkpoint()" + + " or SparkContext.checkpoint() to set the checkpoint directory." + ) + + assert( + checkpointDuration == null || checkpointDuration >= slideDuration, + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + + "Please set it to at least " + slideDuration + "." + ) + + assert( + checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), + "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + + "Please set it to a multiple " + slideDuration + "." + ) + + assert( + checkpointDuration == null || storageLevel != StorageLevel.NONE, + "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + + "level has not been set to enable persisting. Please use DStream.persist() to set the " + + "storage level to use memory for better checkpointing performance." + ) + + assert( + checkpointDuration == null || rememberDuration > checkpointDuration, + "The remember duration for " + this.getClass.getSimpleName + " has been set to " + + rememberDuration + " which is not more than the checkpoint interval (" + + checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." + ) + + val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) + assert( + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." + ) + + dependencies.foreach(_.validate()) + + logInfo("Slide time = " + slideDuration) + logInfo("Storage level = " + storageLevel) + logInfo("Checkpoint interval = " + checkpointDuration) + logInfo("Remember duration = " + rememberDuration) + logInfo("Initialized and validated " + this) + } + + private[streaming] def setContext(s: StreamingContext) { + if (ssc != null && ssc != s) { + throw new Exception("Context is already set in " + this + ", cannot set it again") + } + ssc = s + logInfo("Set context for " + this) + dependencies.foreach(_.setContext(ssc)) + } + + private[streaming] def setGraph(g: DStreamGraph) { + if (graph != null && graph != g) { + throw new Exception("Graph is already set in " + this + ", cannot set it again") + } + graph = g + dependencies.foreach(_.setGraph(graph)) + } + + private[streaming] def remember(duration: Duration) { + if (duration != null && duration > rememberDuration) { + rememberDuration = duration + logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) + } + dependencies.foreach(_.remember(parentRememberDuration)) + } + + /** Checks whether the 'time' is valid wrt slideDuration for generating RDD */ + private[streaming] def isTimeValid(time: Time): Boolean = { + if (!isInitialized) { + throw new Exception (this + " has not been initialized") + } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { + logInfo("Time " + time + " is invalid as zeroTime is " + zeroTime + " and slideDuration is " + slideDuration + " and difference is " + (time - zeroTime)) + false + } else { + logDebug("Time " + time + " is valid") + true + } + } + + /** + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal + * method that should not be called directly. + */ + private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { + // If this DStream was not initialized (i.e., zeroTime not set), then do it + // If RDD was already generated, then retrieve it from HashMap + generatedRDDs.get(time) match { + + // If an RDD was already generated and is being reused, then + // probably all RDDs in this DStream will be reused and hence should be cached + case Some(oldRDD) => Some(oldRDD) + + // if RDD was not generated, and if the time is valid + // (based on sliding time of this DStream), then generate the RDD + case None => { + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => + if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) + } + if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { + newRDD.checkpoint() + logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) + } + generatedRDDs.put(time, newRDD) + Some(newRDD) + case None => + None + } + } else { + None + } + } + } + } + + /** + * Generate a SparkStreaming job for the given time. This is an internal method that + * should not be called directly. This default implementation creates a job + * that materializes the corresponding RDD. Subclasses of DStream may override this + * to generate their own jobs. + */ + private[streaming] def generateJob(time: Time): Option[Job] = { + getOrCompute(time) match { + case Some(rdd) => { + val jobFunc = () => { + val emptyFunc = { (iterator: Iterator[T]) => {} } + context.sparkContext.runJob(rdd, emptyFunc) + } + Some(new Job(time, jobFunc)) + } + case None => None + } + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This default + * implementation clears the old generated RDDs. Subclasses of DStream may override + * this to clear their own metadata along with the generated RDDs. + */ + private[streaming] def clearMetadata(time: Time) { + val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) + generatedRDDs --= oldRDDs.keys + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) + dependencies.foreach(_.clearMetadata(time)) + } + + /* Adds metadata to the Stream while it is running. + * This method should be overwritten by sublcasses of InputDStream. + */ + private[streaming] def addMetadata(metadata: Any) { + if (metadata != null) { + logInfo("Dropping Metadata: " + metadata.toString) + } + } + + /** + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of + * this stream. This is an internal method that should not be called directly. This is + * a default implementation that saves only the file names of the checkpointed RDDs to + * checkpointData. Subclasses of DStream (especially those of InputDStream) may override + * this method to save custom checkpoint data. + */ + private[streaming] def updateCheckpointData(currentTime: Time) { + logInfo("Updating checkpoint data for time " + currentTime) + checkpointData.update(currentTime) + dependencies.foreach(_.updateCheckpointData(currentTime)) + logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) + } + + private[streaming] def clearCheckpointData(time: Time) { + logInfo("Clearing checkpoint data") + checkpointData.cleanup(time) + dependencies.foreach(_.clearCheckpointData(time)) + logInfo("Cleared checkpoint data") + } + + /** + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method + * that should not be called directly. This is a default implementation that recreates RDDs + * from the checkpoint file names stored in checkpointData. Subclasses of DStream that + * override the updateCheckpointData() method would also need to override this method. + */ + private[streaming] def restoreCheckpointData() { + // Create RDDs from the checkpoint data + logInfo("Restoring checkpoint data") + checkpointData.restore() + dependencies.foreach(_.restoreCheckpointData()) + logInfo("Restored checkpoint data") + } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (graph != null) { + graph.synchronized { + if (graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException("Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") + ois.defaultReadObject() + generatedRDDs = new HashMap[Time, RDD[T]] () + } + + // ======================================================================= + // DStream operations + // ======================================================================= + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[U: ClassTag](mapFunc: T => U): DStream[U] = { + new MappedDStream(this, context.sparkContext.clean(mapFunc)) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = { + new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) + } + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): DStream[Array[T]] = new GlommedDStream(this) + + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U: ClassTag]( + mapPartFunc: Iterator[T] => Iterator[U], + preservePartitioning: Boolean = false + ): DStream[U] = { + new MapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(reduceFunc: (T, T) => T): DStream[T] = + this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): DStream[Long] = { + this.map(_ => (null, 1L)) + .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) + .reduceByKey(_ + _) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD contains the counts of each distinct value in + * each RDD of this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + */ + def countByValue(numPartitions: Int = ssc.sc.defaultParallelism): DStream[(T, Long)] = + this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: RDD[T] => Unit) { + this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: (RDD[T], Time) => Unit) { + ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream. + */ + def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = { + transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r))) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream. + */ + def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { + //new TransformedDStream(this, context.sparkContext.clean(transformFunc)) + val cleanedF = context.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 1) + cleanedF(rdds.head.asInstanceOf[RDD[T]], time) + } + new TransformedDStream[U](Seq(this), realTransformFunc) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of 'this' DStream and 'other' DStream. + */ + def transformWith[U: ClassTag, V: ClassTag]( + other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V] + ): DStream[V] = { + val cleanedF = ssc.sparkContext.clean(transformFunc) + val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + assert(rdds.length == 2) + val rdd1 = rdds(0).asInstanceOf[RDD[T]] + val rdd2 = rdds(1).asInstanceOf[RDD[U]] + cleanedF(rdd1, rdd2, time) + } + new TransformedDStream[V](Seq(this, other), realTransformFunc) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.take(11) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(10).foreach(println) + if (first11.size > 10) println("...") + println() + } + val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc)) + ssc.registerOutputStream(newStream) + } + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. The new DStream generates RDDs with + * the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + */ + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) + + /** + * Return a new DStream in which each RDD contains all the elements in seen in a + * sliding window of time over this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + new WindowedDStream(this, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.reduce(reduceFunc).window(windowDuration, slideDuration).reduce(reduceFunc) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. However, the reduction is done incrementally + * using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: (T, T) => T, + invReduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { + this.map(x => (1, x)) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) + .map(_._2) + } + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a sliding window over this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD contains the count of distinct elements in + * RDDs in a sliding window over this DStream. Hash partitioning is used to generate + * the RDDs with `numPartitions` partitions (Spark's default number of partitions if + * `numPartitions` not specified). + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def countByValueAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(T, Long)] = { + + this.map(x => (x, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowDuration, + slideDuration, + numPartitions, + (x: (T, Long)) => x._2 != 0L + ) + } + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same slideDuration as this DStream. + */ + def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) + + /** + * Return all the RDDs defined by the Interval object (both end times included) + */ + def slice(interval: Interval): Seq[RDD[T]] = { + slice(interval.beginTime, interval.endTime) + } + + /** + * Return all the RDDs between 'fromTime' to 'toTime' (both included) + */ + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { + logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") + } + val alignedToTime = toTime.floor(slideDuration) + val alignedFromTime = fromTime.floor(slideDuration) + + logInfo("Slicing from " + fromTime + " to " + toTime + + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") + + alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { + if (time >= zeroTime) getOrCompute(time) else None + }) + } + + /** + * Save each RDD in this DStream as a Sequence file of serialized objects. + * The file name at each batch interval is generated based on `prefix` and + * `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsObjectFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsObjectFile(file) + } + this.foreach(saveFunc) + } + + /** + * Save each RDD in this DStream as at text file, using string representation + * of elements. The file name at each batch interval is generated based on + * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsTextFiles(prefix: String, suffix: String = "") { + val saveFunc = (rdd: RDD[T], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsTextFile(file) + } + this.foreach(saveFunc) + } + + def register() { + ssc.registerOutputStream(this) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala new file mode 100644 index 0000000000..2da4127f47 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import scala.collection.mutable.HashMap +import scala.reflect.ClassTag +import java.io.{ObjectInputStream, IOException} +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.FileSystem +import org.apache.spark.Logging +import org.apache.spark.streaming.Time + +private[streaming] +class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) + extends Serializable with Logging { + protected val data = new HashMap[Time, AnyRef]() + + // Mapping of the batch time to the checkpointed RDD file of that time + @transient private var timeToCheckpointFile = new HashMap[Time, String] + // Mapping of the batch time to the time of the oldest checkpointed RDD + // in that batch's checkpoint data + @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] + + @transient private var fileSystem : FileSystem = null + protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] + + /** + * Updates the checkpoint data of the DStream. This gets called every time + * the graph checkpoint is initiated. Default implementation records the + * checkpoint files to which the generate RDDs of the DStream has been saved. + */ + def update(time: Time) { + + // Get the checkpointed RDDs from the generated RDDs + val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) + .map(x => (x._1, x._2.getCheckpointFile.get)) + logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) + + // Add the checkpoint files to the data to be serialized + if (!checkpointFiles.isEmpty) { + currentCheckpointFiles.clear() + currentCheckpointFiles ++= checkpointFiles + // Add the current checkpoint files to the map of all checkpoint files + // This will be used to delete old checkpoint files + timeToCheckpointFile ++= currentCheckpointFiles + // Remember the time of the oldest checkpoint RDD in current state + timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) + } + } + + /** + * Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been + * written to the checkpoint directory. + */ + def cleanup(time: Time) { + // Get the time of the oldest checkpointed RDD that was written as part of the + // checkpoint of `time` + timeToOldestCheckpointFileTime.remove(time) match { + case Some(lastCheckpointFileTime) => + // Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime` + // This is because checkpointed RDDs older than this are not going to be needed + // even after master fails, as the checkpoint data of `time` does not refer to those files + val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) + logDebug("Files to delete:\n" + filesToDelete.mkString(",")) + filesToDelete.foreach { + case (time, file) => + try { + val path = new Path(file) + if (fileSystem == null) { + fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) + } + fileSystem.delete(path, true) + timeToCheckpointFile -= time + logInfo("Deleted checkpoint file '" + file + "' for time " + time) + } catch { + case e: Exception => + logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + fileSystem = null + } + } + case None => + logInfo("Nothing to delete") + } + } + + /** + * Restore the checkpoint data. This gets called once when the DStream graph + * (along with its DStreams) are being restored from a graph checkpoint file. + * Default implementation restores the RDDs from their checkpoint files. + */ + def restore() { + // Create RDDs from the checkpoint data + currentCheckpointFiles.foreach { + case(time, file) => { + logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) + } + } + } + + override def toString() = { + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + timeToOldestCheckpointFileTime = new HashMap[Time, Time] + timeToCheckpointFile = new HashMap[Time, String] + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1f0f31c4b1..012fbb0711 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD -import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} +import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala index db2e0a4cee..c81534ae58 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala index 244dc3ee4f..6586234554 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala index 336c4b7a92..c7bb2833ea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala index 364abcde68..905bc723f6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.scheduler.Job import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala index 23136f44fa..a9bb51f054 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 8f84232cab..a1075ad304 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream} +import org.apache.spark.streaming.{Time, Duration, StreamingContext} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala index 8a04060e5b..3d8ee29df1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala index 0ce364fd46..7aea1f945d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala index c0b7491d09..02704a8d1c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala new file mode 100644 index 0000000000..f71dd17b2f --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream._ + +import org.apache.spark.{Partitioner, HashPartitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions} +import org.apache.spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} + +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.{Time, Duration} + +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) +extends Serializable { + + private[streaming] def ssc = self.ssc + + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + new HashPartitioner(numPartitions) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + groupByKey(defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + val createCombiner = (v: V) => ArrayBuffer[V](v) + val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) + val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) + combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { + reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + */ + def combineByKey[C: ClassTag]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true): DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream; if not specified + * then Spark's default number of partitions will be used + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, Seq[V])] = { + val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 + self.groupByKey(partitioner) + .window(windowDuration, slideDuration) + .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): DStream[(K, V)] = { + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD + * in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, V)] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowDuration, slideDuration) + .reduceByKey(cleanedReduceFunc, partitioner) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration = self.slideDuration, + numPartitions: Int = ssc.sc.defaultParallelism, + filterFunc: ((K, V)) => Boolean = null + ): DStream[(K, V)] = { + + reduceByKeyAndWindow( + reduceFunc, invReduceFunc, windowDuration, + slideDuration, defaultPartitioner(numPartitions), filterFunc + ) + } + + /** + * Return a new DStream by applying incremental `reduceByKey` over a sliding window. + * The reduced value of over a new window is calculated using the old window's reduced value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param filterFunc Optional function to filter expired key-value pairs; + * only pairs that satisfy the function are retained + */ + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner, + filterFunc: ((K, V)) => Boolean + ): DStream[(K, V)] = { + + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) + val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None + new ReducedWindowedDStream[K, V]( + self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc, + windowDuration, slideDuration, partitioner + ) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S] + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner()) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S], + numPartitions: Int + ): DStream[(K, S)] = { + updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Seq[V], Option[S]) => Option[S], + partitioner: Partitioner + ): DStream[(K, S)] = { + val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { + iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) + } + updateStateByKey(newUpdateFunc, partitioner, true) + } + + /** + * Return a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. Note, that + * this function may generate a different a tuple with a different key + * than the input key. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. + * @tparam S State type + */ + def updateStateByKey[S: ClassTag]( + updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], + partitioner: Partitioner, + rememberPartitioner: Boolean + ): DStream[(K, S)] = { + new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner) + } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ + def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = { + new MapValuedDStream[K, V, U](self, mapValuesFunc) + } + + /** + * Return a new DStream by applying a flatmap function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ + def flatMapValues[U: ClassTag]( + flatMapValuesFunc: V => TraversableOnce[U] + ): DStream[(K, U)] = { + new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number + * of partitions. + */ + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + */ + def cogroup[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Seq[V], Seq[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ + def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = { + join[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def join[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (V, Option[W]))] = { + leftOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def leftOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, Option[W]))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner) + ) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner()) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + numPartitions: Int + ): DStream[(K, (Option[V], W))] = { + rightOuterJoin[W](other, defaultPartitioner(numPartitions)) + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W: ClassTag]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (Option[V], W))] = { + self.transformWith( + other, + (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner) + ) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassTag[F]) { + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval + * is generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( + prefix: String, + suffix: String + )(implicit fm: ClassTag[F]) { + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration + ) { + val saveFunc = (rdd: RDD[(K, V)], time: Time) => { + val file = rddToFileName(prefix, suffix, time) + rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + } + self.foreach(saveFunc) + } + + private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass + + private def getValueClass() = implicitly[ClassTag[V]].runtimeClass +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index db56345ca8..7a6b1ea35e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -import org.apache.spark.streaming.{Duration, Interval, Time, DStream} +import org.apache.spark.streaming.{Duration, Interval, Time} import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index 84e69f277b..880a89bc36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Partitioner import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index e0ff3ccba4..cc583295a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.Partitioner import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Time, DStream} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index aeea060df7..7cd4554282 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import scala.reflect.ClassTag private[streaming] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 0d84ec84f2..4ecba03ab5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -17,9 +17,8 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.{Duration, DStream, Time} +import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD -import collection.mutable.ArrayBuffer import org.apache.spark.rdd.UnionRDD import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 162b19d7f0..e7403b5f1e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.util import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.ForEachDStream +import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} import StreamingContext._ import scala.util.Random diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 2e3a1e66ad..d293d20644 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import util.ManualClock import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.streaming.dstream.DStream class BasicOperationsSuite extends TestSuiteBase { test("map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 9590bca989..21a72e7cea 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -26,7 +26,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils import org.apache.spark.SparkConf diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9eb9b3684c..e0232c70a8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkConf, SparkContext} import org.apache.spark.util.{Utils, MetadataCleaner} +import org.apache.spark.streaming.dstream.DStream class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index fa64142096..9e0f2c900e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.scheduler._ import scala.collection.mutable.ArrayBuffer import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.streaming.dstream.DStream class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 3569624d51..75093d6106 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} +import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock import scala.collection.mutable.ArrayBuffer diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index c39abfc21b..8f3c2dd86c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.DStream class WindowOperationsSuite extends TestSuiteBase { diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala index f670f65bf5..475569c872 100644 --- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala +++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.api.java._ import org.apache.spark.rdd.{RDD, DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions} -import org.apache.spark.streaming.{PairDStreamFunctions, DStream, StreamingContext} +import org.apache.spark.streaming.{StreamingContext} import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions} private[spark] abstract class SparkType(val name: String) @@ -147,7 +148,7 @@ object JavaAPICompletenessChecker { } else { ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs)) } - case "org.apache.spark.streaming.DStream" => + case "org.apache.spark.streaming.dstream.DStream" => if (parameters(0).name == classOf[Tuple2[_, _]].getName) { val tupleParams = parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs) @@ -248,30 +249,29 @@ object JavaAPICompletenessChecker { "org.apache.spark.SparkContext.getSparkHome", "org.apache.spark.SparkContext.executorMemoryRequested", "org.apache.spark.SparkContext.getExecutorStorageStatus", - "org.apache.spark.streaming.DStream.generatedRDDs", - "org.apache.spark.streaming.DStream.zeroTime", - "org.apache.spark.streaming.DStream.rememberDuration", - "org.apache.spark.streaming.DStream.storageLevel", - "org.apache.spark.streaming.DStream.mustCheckpoint", - "org.apache.spark.streaming.DStream.checkpointDuration", - "org.apache.spark.streaming.DStream.checkpointData", - "org.apache.spark.streaming.DStream.graph", - "org.apache.spark.streaming.DStream.isInitialized", - "org.apache.spark.streaming.DStream.parentRememberDuration", - "org.apache.spark.streaming.DStream.initialize", - "org.apache.spark.streaming.DStream.validate", - "org.apache.spark.streaming.DStream.setContext", - "org.apache.spark.streaming.DStream.setGraph", - "org.apache.spark.streaming.DStream.remember", - "org.apache.spark.streaming.DStream.getOrCompute", - "org.apache.spark.streaming.DStream.generateJob", - "org.apache.spark.streaming.DStream.clearOldMetadata", - "org.apache.spark.streaming.DStream.addMetadata", - "org.apache.spark.streaming.DStream.updateCheckpointData", - "org.apache.spark.streaming.DStream.restoreCheckpointData", - "org.apache.spark.streaming.DStream.isTimeValid", + "org.apache.spark.streaming.dstream.DStream.generatedRDDs", + "org.apache.spark.streaming.dstream.DStream.zeroTime", + "org.apache.spark.streaming.dstream.DStream.rememberDuration", + "org.apache.spark.streaming.dstream.DStream.storageLevel", + "org.apache.spark.streaming.dstream.DStream.mustCheckpoint", + "org.apache.spark.streaming.dstream.DStream.checkpointDuration", + "org.apache.spark.streaming.dstream.DStream.checkpointData", + "org.apache.spark.streaming.dstream.DStream.graph", + "org.apache.spark.streaming.dstream.DStream.isInitialized", + "org.apache.spark.streaming.dstream.DStream.parentRememberDuration", + "org.apache.spark.streaming.dstream.DStream.initialize", + "org.apache.spark.streaming.dstream.DStream.validate", + "org.apache.spark.streaming.dstream.DStream.setContext", + "org.apache.spark.streaming.dstream.DStream.setGraph", + "org.apache.spark.streaming.dstream.DStream.remember", + "org.apache.spark.streaming.dstream.DStream.getOrCompute", + "org.apache.spark.streaming.dstream.DStream.generateJob", + "org.apache.spark.streaming.dstream.DStream.clearOldMetadata", + "org.apache.spark.streaming.dstream.DStream.addMetadata", + "org.apache.spark.streaming.dstream.DStream.updateCheckpointData", + "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData", + "org.apache.spark.streaming.dstream.DStream.isTimeValid", "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId", - "org.apache.spark.streaming.StreamingContext.networkInputTracker", "org.apache.spark.streaming.StreamingContext.checkpointDir", "org.apache.spark.streaming.StreamingContext.checkpointDuration", "org.apache.spark.streaming.StreamingContext.receiverJobThread", -- cgit v1.2.3 From f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 Jan 2014 10:50:14 -0800 Subject: Rename DStream.foreach to DStream.foreachRDD `foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD. --- docs/streaming-programming-guide.md | 4 ++-- .../org/apache/spark/streaming/examples/RawNetworkGrep.scala | 2 +- .../apache/spark/streaming/examples/TwitterAlgebirdCMS.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterAlgebirdHLL.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterPopularTags.scala | 4 ++-- .../spark/streaming/examples/clickstream/PageViewStream.scala | 2 +- .../src/main/scala/org/apache/spark/streaming/DStream.scala | 10 +++++----- .../org/apache/spark/streaming/PairDStreamFunctions.scala | 4 ++-- .../org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 8 ++++---- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 10 files changed, 22 insertions(+), 22 deletions(-) (limited to 'docs') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1c9ece6270..3273817c78 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -175,7 +175,7 @@ When an output operator is called, it triggers the computation of a stream. Curr - + @@ -375,7 +375,7 @@ There are two failure behaviors based on which input sources are used. 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. 1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. -Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. +Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. ## Failure of the Driver Node A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint()` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index 3d08d86567..99b79c3949 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -58,7 +58,7 @@ object RawNetworkGrep { val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) - union.filter(_.contains("the")).count().foreach(r => + union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 80b5a98b14..483c4d3118 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -81,7 +81,7 @@ object TwitterAlgebirdCMS { val exactTopUsers = users.map(id => (id, 1)) .reduceByKey((a, b) => a + b) - approxTopUsers.foreach(rdd => { + approxTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => @@ -96,7 +96,7 @@ object TwitterAlgebirdCMS { } }) - exactTopUsers.foreach(rdd => { + exactTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partialMap = rdd.collect().toMap val partialTopK = rdd.map( diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index cb2f2c51a0..94c2bf29ac 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -67,7 +67,7 @@ object TwitterAlgebirdHLL { val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - approxUsers.foreach(rdd => { + approxUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() globalHll += partial @@ -76,7 +76,7 @@ object TwitterAlgebirdHLL { } }) - exactUsers.foreach(rdd => { + exactUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() userSet ++= partial diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 16c10feaba..8a70d4a978 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -56,13 +56,13 @@ object TwitterPopularTags { // Print popular hashtags - topCounts60.foreach(rdd => { + topCounts60.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) - topCounts10.foreach(rdd => { + topCounts10.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index da6b67bcce..bb44bc3d06 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -91,7 +91,7 @@ object PageViewStream { case "popularUsersSeen" => // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) - .foreach((rdd, time) => rdd.join(userList) + .foreachRDD((rdd, time) => rdd.join(userList) .map(_._2._2) .take(10) .foreach(u => println("Saw user %s at time %s".format(u, time)))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index b98f4a5101..93d57db494 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -487,15 +487,15 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + def foreachRDD(foreachFunc: RDD[T] => Unit) { + this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } @@ -719,7 +719,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } /** @@ -732,7 +732,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } def register() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 56dbcbda23..69d80c3711 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -582,7 +582,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } /** @@ -612,7 +612,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0..4b5d5ece52 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -244,16 +244,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction[R, Void]) { - dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) + def foreachRDD(foreachFunc: JFunction[R, Void]) { + dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction2[R, Time, Void]) { - dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) + def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f..9a187ce031 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase { val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) - stream.foreach(_ => {}) // Dummy output stream + stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) def getInputFromSlice(fromMillis: Long, toMillis: Long) = { -- cgit v1.2.3 From 2802cc80bcf267fd19a4cb43da505b82af675e08 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 12 Jan 2014 19:16:43 -0800 Subject: Disable shuffle file consolidation by default --- core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'docs') diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 6e0ff143b7..e2b24298a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -64,7 +64,7 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - conf.getBoolean("spark.shuffle.consolidateFiles", true) + conf.getBoolean("spark.shuffle.consolidateFiles", false) private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 diff --git a/docs/configuration.md b/docs/configuration.md index ad75e06fc7..40a57c4bc6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -382,7 +382,7 @@ Apart from these, the following properties are also available, and may be useful - + -- cgit v1.2.3
OperatorMeaning
foreach(func) foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system.
spark.shuffle.consolidateFilestruefalse If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.