From 760823d3937822ea4a6d6f476815442711c605fa Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 24 Dec 2013 23:20:34 -0800 Subject: Adding better option parsing --- .../spark/examples/DriverSubmissionTest.scala | 45 ++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala new file mode 100644 index 0000000000..9055ce7a39 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.collection.JavaConversions._ + +/** Prints out environmental information, sleeps, and then exits. Made to + * test driver submission in the standalone scheduler. */ +object DriverSubmissionTest { + def main(args: Array[String]) { + if (args.size < 1) { + println("Usage: DriverSubmissionTest ") + System.exit(0) + } + val numSecondsToSleep = args(0).toInt + + val env = System.getenv() + val properties = System.getProperties() + + println("Environment variables containing SPARK_TEST:") + env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) + + println("System properties containing spark.test:") + properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) + + for (i <- 1 until numSecondsToSleep) { + Thread.sleep(1000) + } + } +} -- cgit v1.2.3 From c9c0f745afcf00c17fa073e4ca6dd9433400be95 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 25 Dec 2013 00:54:34 -0800 Subject: Minor style clean-up --- .../apache/spark/deploy/client/DriverClient.scala | 2 ++ .../org/apache/spark/deploy/master/Master.scala | 3 +-- .../apache/spark/deploy/master/ui/IndexPage.scala | 22 +++++++++++----------- .../apache/spark/deploy/worker/DriverRunner.scala | 4 ++-- .../org/apache/spark/deploy/worker/Worker.scala | 3 --- .../spark/examples/DriverSubmissionTest.scala | 1 + 6 files changed, 17 insertions(+), 18 deletions(-) (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala index 9c0a626204..28c851bcfe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala @@ -101,6 +101,8 @@ object DriverClient { def main(args: Array[String]) { val driverArgs = new DriverClientArguments(args) + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. val (actorSystem, boundPort) = AkkaUtils.createActorSystem( "driverClient", Utils.localHostName(), 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f5d6fdab5f..0528ef43a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -185,7 +185,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act schedule() // TODO: It might be good to instead have the submission client poll the master to determine - // the current status of the driver. Since we may already want to expose this. + // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, "Driver successfully submitted") } @@ -611,7 +611,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - /** Generate a new driver ID given a driver's submission date */ def newDriverId(submitDate: Date): String = { val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) nextDriverNumber += 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 6a99d7ac02..3c6fca3780 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -106,20 +106,20 @@ private[spark] class IndexPage(parent: MasterWebUI) { -
-
-

Active Drivers

+
+
+

Active Drivers

- {activeDriversTable} -
+ {activeDriversTable}
+
-
-
-

Completed Drivers

- {completedDriversTable} -
-
; +
+
+

Completed Drivers

+ {completedDriversTable} +
+
; UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b030d6041a..28d4297299 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.DriverState import org.apache.spark.util.Utils /** - * Manages the execution of one driver process. + * Manages the execution of one driver, including automatically restarting the driver on failure. */ private[spark] class DriverRunner( val driverId: String, @@ -133,7 +133,7 @@ private[spark] class DriverRunner( localJarFilename } - /** Continue launching the supplied command until it exits zero. */ + /** Continue launching the supplied command until it exits zero or is killed. */ def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = { // Time to wait between submission retries. var waitSeconds = 1 diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b6a84fc371..42c28cf22d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -254,17 +254,14 @@ private[spark] class Worker( case KillDriver(driverId) => { logInfo(s"Asked to kill driver $driverId") - drivers.find(_._1 == driverId) match { case Some((id, runner)) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } - } - case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.FAILED => diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index 9055ce7a39..65251e9319 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -39,6 +39,7 @@ object DriverSubmissionTest { properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) for (i <- 1 until numSecondsToSleep) { + println(s"Alive for $i out of $numSecondsToSleep seconds") Thread.sleep(1000) } } -- cgit v1.2.3 From a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 2 Jan 2014 19:07:22 -0800 Subject: Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it. --- .../cluster/SparkDeploySchedulerBackend.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 58 ++++++++++++++++++++++ .../org/apache/spark/streaming/Checkpoint.scala | 12 +++-- .../apache/spark/streaming/StreamingContext.scala | 18 ++++++- 4 files changed, 85 insertions(+), 5 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 921b887a89..0615f7b565 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala new file mode 100644 index 0000000000..0e5f39f772 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -0,0 +1,58 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.IntParam +import java.io.File +import org.apache.spark.rdd.RDD +import com.google.common.io.Files +import java.nio.charset.Charset + +object RecoverableNetworkWordCount { + + def createContext(master: String, ip: String, port: Int, outputPath: String) = { + + val outputFile = new File(outputPath) + if (outputFile.exists()) outputFile.delete() + + // Create the context with a 1 second batch size + println("Creating new context") + val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => { + val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") + println(counts) + println("Appending to " + outputFile.getAbsolutePath) + Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + }) + ssc + } + + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println( + """ + |Usage: RecoverableNetworkWordCount + | + |In local mode, should be 'local[n]' with n > 1 + |Both and should be full paths + """.stripMargin + ) + System.exit(1) + } + val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val ssc = StreamingContext.getOrCreate(checkpointDirectory, + () => { + createContext(master, ip, port, outputPath) + }) + ssc.start() + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7b343d2376..139e2c08b5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.spark.Logging +import org.apache.spark.{SparkException, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner @@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private[streaming] object CheckpointReader extends Logging { + def doesCheckpointExist(path: String): Boolean = { + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) + val fs = new Path(path).getFileSystem(new Configuration()) + (attempts.count(p => fs.exists(p)) > 1) + } + def read(path: String): Checkpoint = { val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk")) + val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() @@ -175,7 +181,7 @@ object CheckpointReader extends Logging { } }) - throw new Exception("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Could not read checkpoint from path '" + path + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41da028a3c..01b213ab42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -570,12 +570,28 @@ class StreamingContext private ( } -object StreamingContext { +object StreamingContext extends Logging { implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } + def getOrCreate( + checkpointPath: String, + creatingFunc: () => StreamingContext, + createOnCheckpointError: Boolean = false + ): StreamingContext = { + if (CheckpointReader.doesCheckpointExist(checkpointPath)) { + logInfo("Creating streaming context from checkpoint file") + new StreamingContext(checkpointPath) + } else { + logInfo("Creating new streaming context") + val ssc = creatingFunc() + ssc.checkpoint(checkpointPath) + ssc + } + } + protected[streaming] def createNewSparkContext( master: String, appName: String, -- cgit v1.2.3 From 35f80da21aaea8c6fde089754ef3a86bc78e0428 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 8 Jan 2014 20:54:24 -0800 Subject: Set default logging to WARN for Spark streaming examples. This programatically sets the log level to WARN by default for streaming tests. If the user has already specified a log4j.properties file, the user's file will take precedence over this default. --- .../streaming/examples/JavaFlumeEventCount.java | 2 ++ .../streaming/examples/JavaKafkaWordCount.java | 2 ++ .../streaming/examples/JavaNetworkWordCount.java | 2 ++ .../spark/streaming/examples/JavaQueueStream.java | 2 ++ .../spark/streaming/examples/ActorWordCount.scala | 12 +++++------- .../spark/streaming/examples/FlumeEventCount.scala | 4 +++- .../spark/streaming/examples/HdfsWordCount.scala | 3 ++- .../spark/streaming/examples/KafkaWordCount.scala | 5 +++-- .../spark/streaming/examples/MQTTWordCount.scala | 8 +++----- .../spark/streaming/examples/NetworkWordCount.scala | 2 ++ .../spark/streaming/examples/QueueStream.scala | 8 +++++--- .../spark/streaming/examples/RawNetworkGrep.scala | 5 +++-- .../examples/StatefulNetworkWordCount.scala | 2 ++ .../streaming/examples/StreamingExamples.scala | 21 +++++++++++++++++++++ .../streaming/examples/TwitterAlgebirdCMS.scala | 10 ++++++---- .../streaming/examples/TwitterAlgebirdHLL.scala | 9 ++++++--- .../streaming/examples/TwitterPopularTags.scala | 2 ++ .../spark/streaming/examples/ZeroMQWordCount.scala | 1 + .../examples/clickstream/PageViewStream.scala | 4 +++- .../scala/org/apache/spark/streaming/DStream.scala | 2 +- .../org/apache/spark/streaming/DStreamGraph.scala | 8 ++++---- 21 files changed, 80 insertions(+), 34 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala (limited to 'examples/src') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index b11cfa667e..7b5a243e26 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -47,6 +47,8 @@ public final class JavaFlumeEventCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + String master = args[0]; String host = args[1]; int port = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 16b8a948e6..04f62ee204 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -59,6 +59,8 @@ public final class JavaKafkaWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index 1e2efd359c..c37b0cacc9 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -53,6 +53,8 @@ public final class JavaNetworkWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index e05551ab83..7ef9c6c8f4 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -41,6 +41,8 @@ public final class JavaQueueStream { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 4e0058cd70..57e1b1f806 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -18,17 +18,13 @@ package org.apache.spark.streaming.examples import scala.collection.mutable.LinkedList -import scala.util.Random import scala.reflect.ClassTag +import scala.util.Random -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.actorRef2Scala +import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} import org.apache.spark.SparkConf -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.receivers.Receiver import org.apache.spark.util.AkkaUtils @@ -147,6 +143,8 @@ object ActorWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(master, host, port) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index ae3709b3d9..a59be7899d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam /** * Produces a count of events received from Flume. @@ -44,6 +44,8 @@ object FlumeEventCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, host, IntParam(port)) = args val batchInterval = Milliseconds(2000) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index ea6ea67419..704b315ef8 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ - /** * Counts words in new text files created in the given directory * Usage: HdfsWordCount @@ -38,6 +37,8 @@ object HdfsWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 31a94bd224..4a3d81c09a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -23,8 +23,8 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.util.RawTextHelper._ import org.apache.spark.streaming.kafka._ +import org.apache.spark.streaming.util.RawTextHelper._ /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._ */ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 5) { System.err.println("Usage: KafkaWordCount ") System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, zkQuorum, group, topics, numThreads) = args val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 325290b66f..78b49fdcf1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.examples -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -43,6 +39,8 @@ object MQTTPublisher { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(brokerUrl, topic) = args.toSeq try { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 6a32c75373..c12139b3ec 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -39,6 +39,8 @@ object NetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala index 9d640e716b..4d4968ba6a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples +import scala.collection.mutable.SynchronizedQueue + import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import scala.collection.mutable.SynchronizedQueue - object QueueStream { def main(args: Array[String]) { @@ -30,7 +30,9 @@ object QueueStream { System.err.println("Usage: QueueStream ") System.exit(1) } - + + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index c0706d0724..3d08d86567 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -17,11 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming._ import org.apache.spark.streaming.util.RawTextHelper +import org.apache.spark.util.IntParam /** * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited @@ -45,6 +44,8 @@ object RawNetworkGrep { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index 002db57d59..1183eba846 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -39,6 +39,8 @@ object StatefulNetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala new file mode 100644 index 0000000000..d41d84a980 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala @@ -0,0 +1,21 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.Logging + +import org.apache.log4j.{Level, Logger} + +/** Utility functions for Spark Streaming examples. */ +object StreamingExamples extends Logging { + + /** Set reasonable logging levels for streaming if the user has not configured log4j. */ + def setStreamingLogLevels() { + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + // We first log something to initialize Spark's default logging, then we override the + // logging level. + logInfo("Setting log level to [WARN] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + Logger.getRootLogger.setLevel(Level.WARN) + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 3ccdc908e2..80b5a98b14 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel import com.twitter.algebird._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ /** @@ -51,6 +51,8 @@ object TwitterAlgebirdCMS { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // CMS parameters val DELTA = 1E-3 val EPS = 0.01 diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index c7e83e76b0..cb2f2c51a0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -17,10 +17,11 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel -import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ /** @@ -44,6 +45,8 @@ object TwitterAlgebirdHLL { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 val (master, filters) = (args.head, args.tail) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index e2b0418d55..16c10feaba 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -36,6 +36,8 @@ object TwitterPopularTags { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val (master, filters) = (args.head, args.tail) val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 03902ec353..12d2a1084f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -76,6 +76,7 @@ object ZeroMQWordCount { "In local mode, should be 'local[n]' with n > 1") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index 807af199f4..da6b67bcce 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -17,9 +17,10 @@ package org.apache.spark.streaming.examples.clickstream +import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.examples.StreamingExamples /** Analyses a streaming dataset of web page views. This class demonstrates several types of * operators available in Spark streaming. @@ -36,6 +37,7 @@ object PageViewStream { " errorRatePerZipCode, activeUserCount, popularUsersSeen") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val metric = args(0) val host = args(1) val port = args(2).toInt diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 00671ba520..837f1ea1d8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -333,7 +333,7 @@ abstract class DStream[T: ClassTag] ( var numForgotten = 0 val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys - logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) dependencies.foreach(_.clearOldMetadata(time)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index a09b891956..62d07b22c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -105,18 +105,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def generateJobs(time: Time): Seq[Job] = { this.synchronized { - logInfo("Generating jobs for time " + time) + logDebug("Generating jobs for time " + time) val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) - logInfo("Generated " + jobs.length + " jobs for time " + time) + logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } } def clearOldMetadata(time: Time) { this.synchronized { - logInfo("Clearing old metadata for time " + time) + logDebug("Clearing old metadata for time " + time) outputStreams.foreach(_.clearOldMetadata(time)) - logInfo("Cleared old metadata for time " + time) + logDebug("Cleared old metadata for time " + time) } } -- cgit v1.2.3 From 6f713e2a3e56185368b66fb087637dec112a1f5d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 9 Jan 2014 13:42:04 -0800 Subject: Changed the way StreamingContext finds and reads checkpoint files, and added JavaStreamingContext.getOrCreate. --- conf/slaves | 6 +- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../streaming/examples/JavaNetworkWordCount.java | 7 +- .../streaming/examples/NetworkWordCount.scala | 2 +- .../examples/RecoverableNetworkWordCount.scala | 43 ++++++++-- .../org/apache/spark/streaming/Checkpoint.scala | 98 ++++++++++++---------- .../spark/streaming/DStreamCheckpointData.scala | 57 ++++--------- .../org/apache/spark/streaming/DStreamGraph.scala | 4 +- .../apache/spark/streaming/StreamingContext.scala | 64 ++++++++++---- .../streaming/api/java/JavaStreamingContext.scala | 96 +++++++++++++++++++-- 10 files changed, 254 insertions(+), 125 deletions(-) (limited to 'examples/src') diff --git a/conf/slaves b/conf/slaves index 30ea300e07..2fbb50c4a8 100644 --- a/conf/slaves +++ b/conf/slaves @@ -1,5 +1 @@ -ec2-54-221-59-252.compute-1.amazonaws.com -ec2-67-202-26-243.compute-1.amazonaws.com -ec2-23-22-220-97.compute-1.amazonaws.com -ec2-50-16-98-100.compute-1.amazonaws.com -ec2-54-234-164-206.compute-1.amazonaws.com +localhost diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7514ce58fb..304e85f1c0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index def87c199b..d8d6046914 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -41,17 +41,17 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class JavaNetworkWordCount { public static void main(String[] args) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + + System.err.println("Usage: JavaNetworkWordCount \n" + "In local mode, should be 'local[n]' with n > 1"); System.exit(1); } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') JavaDStream lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override @@ -74,6 +74,5 @@ public class JavaNetworkWordCount { wordCounts.print(); ssc.start(); - } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index e2487dca5f..5ad4875980 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -44,7 +44,7 @@ object NetworkWordCount { System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 0e5f39f772..739f805e87 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Time, Seconds, StreamingContext} @@ -8,20 +25,37 @@ import org.apache.spark.rdd.RDD import com.google.common.io.Files import java.nio.charset.Charset +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * directory in a Hadoop compatible file system to which checkpoint + * data will be saved to; this must be a fault-tolerant file system + * like HDFS for the system to recover from driver failures + * (x, 1)).reduceByKey(_ + _) @@ -39,10 +73,10 @@ object RecoverableNetworkWordCount { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount + |Usage: RecoverableNetworkWordCount | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and should be full paths """.stripMargin ) System.exit(1) @@ -53,6 +87,5 @@ object RecoverableNetworkWordCount { createContext(master, ip, port, outputPath) }) ssc.start() - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 155d5bc02e..a32e4852c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -82,22 +82,28 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop attempts += 1 try { logInfo("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'") - // This is inherently thread unsafe, so alleviating it by writing to '.new' and + // This is inherently thread unsafe, so alleviating it by writing to '.next' and // then moving it to the final file val fos = fs.create(writeFile) fos.write(bytes) fos.close() + + // Back up existing checkpoint if it exists if (fs.exists(file) && fs.rename(file, bakFile)) { logDebug("Moved existing checkpoint file to " + bakFile) } - // paranoia - fs.delete(file, false) - fs.rename(writeFile, file) - - val finishTime = System.currentTimeMillis() - logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + - "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") - jobGenerator.onCheckpointCompletion(checkpointTime) + fs.delete(file, false) // paranoia + + // Rename temp written file to the right location + if (fs.rename(writeFile, file)) { + val finishTime = System.currentTimeMillis() + logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms") + jobGenerator.onCheckpointCompletion(checkpointTime) + } else { + throw new SparkException("Failed to rename checkpoint file from " + + writeFile + " to " + file) + } return } catch { case ioe: IOException => @@ -154,47 +160,47 @@ class CheckpointWriter(jobGenerator: JobGenerator, checkpointDir: String, hadoop private[streaming] object CheckpointReader extends Logging { - def doesCheckpointExist(path: String): Boolean = { - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) - val fs = new Path(path).getFileSystem(new Configuration()) - (attempts.count(p => fs.exists(p)) > 1) - } + private val graphFileNames = Seq("graph", "graph.bk") + + def read(checkpointDir: String, hadoopConf: Configuration): Option[Checkpoint] = { + val checkpointPath = new Path(checkpointDir) + def fs = checkpointPath.getFileSystem(hadoopConf) + val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) + + // Log the file listing if graph checkpoint file was not found + if (existingFiles.isEmpty) { + logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" + + fs.listStatus(checkpointPath).mkString("\n")) + return None + } + logInfo("Checkpoint files found: " + existingFiles.mkString(",")) - def read(path: String): Checkpoint = { - val fs = new Path(path).getFileSystem(new Configuration()) - val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk")) val compressionCodec = CompressionCodec.createCodec() - - attempts.foreach(file => { - if (fs.exists(file)) { - logInfo("Attempting to load checkpoint from file '" + file + "'") - try { - val fis = fs.open(file) - // ObjectInputStream uses the last defined user-defined class loader in the stack - // to find classes, which maybe the wrong class loader. Hence, a inherited version - // of ObjectInputStream is used to explicitly use the current thread's default class - // loader to find and load classes. This is a well know Java issue and has popped up - // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) - val zis = compressionCodec.compressedInputStream(fis) - val ois = new ObjectInputStreamWithLoader(zis, - Thread.currentThread().getContextClassLoader) - val cp = ois.readObject.asInstanceOf[Checkpoint] - ois.close() - fs.close() - cp.validate() - logInfo("Checkpoint successfully loaded from file '" + file + "'") - logInfo("Checkpoint was generated at time " + cp.checkpointTime) - return cp - } catch { - case e: Exception => - logError("Error loading checkpoint from file '" + file + "'", e) - } - } else { - logWarning("Could not read checkpoint from file '" + file + "' as it does not exist") + existingFiles.foreach(file => { + logInfo("Attempting to load checkpoint from file '" + file + "'") + try { + val fis = fs.open(file) + // ObjectInputStream uses the last defined user-defined class loader in the stack + // to find classes, which maybe the wrong class loader. Hence, a inherited version + // of ObjectInputStream is used to explicitly use the current thread's default class + // loader to find and load classes. This is a well know Java issue and has popped up + // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627) + val zis = compressionCodec.compressedInputStream(fis) + val ois = new ObjectInputStreamWithLoader(zis, + Thread.currentThread().getContextClassLoader) + val cp = ois.readObject.asInstanceOf[Checkpoint] + ois.close() + fs.close() + cp.validate() + logInfo("Checkpoint successfully loaded from file '" + file + "'") + logInfo("Checkpoint was generated at time " + cp.checkpointTime) + return Some(cp) + } catch { + case e: Exception => + logWarning("Error reading checkpoint from file '" + file + "'", e) } - }) - throw new SparkException("Could not read checkpoint from path '" + path + "'") + throw new SparkException("Failed to read checkpoint from directory '" + checkpointDir + "'") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala index e0567a1c19..1081d3c807 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala @@ -27,18 +27,16 @@ import org.apache.spark.Logging import java.io.{ObjectInputStream, IOException} - private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) extends Serializable with Logging { protected val data = new HashMap[Time, AnyRef]() - @transient private var allCheckpointFiles = new HashMap[Time, String] - @transient private var timeToLastCheckpointFileTime = new HashMap[Time, Time] + // Mapping of the batch time to the checkpointed RDD file of that time + @transient private var timeToCheckpointFile = new HashMap[Time, String] + // Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data + @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] @transient private var fileSystem : FileSystem = null - - //@transient private var lastCheckpointFiles: HashMap[Time, String] = null - protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** @@ -51,17 +49,14 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) // Get the checkpointed RDDs from the generated RDDs val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) + logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - logInfo("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) - // Make a copy of the existing checkpoint data (checkpointed RDDs) - // lastCheckpointFiles = checkpointFiles.clone() - - // If the new checkpoint data has checkpoints then replace existing with the new one + // Add the checkpoint files to the data to be serialized if (!currentCheckpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles - allCheckpointFiles ++= currentCheckpointFiles - timeToLastCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) + timeToCheckpointFile ++= currentCheckpointFiles + timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } } @@ -71,32 +66,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) * implementation, cleans up old checkpoint files. */ def cleanup(time: Time) { - /* - // If there is at least on checkpoint file in the current checkpoint files, - // then delete the old checkpoint files. - if (checkpointFiles.size > 0 && lastCheckpointFiles != null) { - (lastCheckpointFiles -- checkpointFiles.keySet).foreach { - case (time, file) => { - try { - val path = new Path(file) - if (fileSystem == null) { - fileSystem = path.getFileSystem(new Configuration()) - } - fileSystem.delete(path, true) - logInfo("Deleted checkpoint file '" + file + "' for time " + time) - } catch { - case e: Exception => - logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) - } - } - } - } - */ - timeToLastCheckpointFileTime.remove(time) match { + timeToOldestCheckpointFileTime.remove(time) match { case Some(lastCheckpointFileTime) => - logInfo("Deleting all files before " + time) - val filesToDelete = allCheckpointFiles.filter(_._1 < lastCheckpointFileTime) - logInfo("Files to delete:\n" + filesToDelete.mkString(",")) + val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) + logDebug("Files to delete:\n" + filesToDelete.mkString(",")) filesToDelete.foreach { case (time, file) => try { @@ -105,11 +78,12 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration) } fileSystem.delete(path, true) - allCheckpointFiles -= time + timeToCheckpointFile -= time logInfo("Deleted checkpoint file '" + file + "' for time " + time) } catch { case e: Exception => logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e) + fileSystem = null } } case None => @@ -138,7 +112,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { - timeToLastCheckpointFileTime = new HashMap[Time, Time] - allCheckpointFiles = new HashMap[Time, String] + ois.defaultReadObject() + timeToOldestCheckpointFileTime = new HashMap[Time, Time] + timeToCheckpointFile = new HashMap[Time, String] } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index bfedef2e4e..34919d315c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -130,11 +130,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } def clearCheckpointData(time: Time) { - logInfo("Restoring checkpoint data") + logInfo("Clearing checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.clearCheckpointData(time)) } - logInfo("Restored checkpoint data") + logInfo("Cleared checkpoint data for time " + time) } def restoreCheckpointData() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 59d2d546e6..30deba417e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -45,10 +45,11 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{LocalFileSystem, Path} import twitter4j.Status import twitter4j.auth.Authorization +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -89,10 +90,12 @@ class StreamingContext private ( /** * Re-create a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory + * @param hadoopConf Optional, configuration object if necessary for reading from + * HDFS compatible filesystems */ - def this(path: String) = this(null, CheckpointReader.read(path), null) + def this(path: String, hadoopConf: Configuration = new Configuration) = + this(null, CheckpointReader.read(path, hadoopConf).get, null) initLogging() @@ -170,8 +173,9 @@ class StreamingContext private ( /** * Set the context to periodically checkpoint the DStream operations for master - * fault-tolerance. The graph will be checkpointed every batch interval. - * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * fault-tolerance. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored. + * Note that this must be a fault-tolerant file system like HDFS for */ def checkpoint(directory: String) { if (directory != null) { @@ -577,6 +581,10 @@ class StreamingContext private ( } } +/** + * StreamingContext object contains a number of utility functions related to the + * StreamingContext class. + */ object StreamingContext extends Logging { @@ -584,19 +592,45 @@ object StreamingContext extends Logging { new PairDStreamFunctions[K, V](stream) } + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param creatingFunc Function to create a new StreamingContext + * @param hadoopConf Optional Hadoop configuration if necessary for reading from the + * file system + * @param createOnError Optional, whether to create a new StreamingContext if there is an + * error in reading checkpoint data. By default, an exception will be + * thrown on error. + */ def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, - createOnCheckpointError: Boolean = false + hadoopConf: Configuration = new Configuration(), + createOnError: Boolean = false ): StreamingContext = { - if (CheckpointReader.doesCheckpointExist(checkpointPath)) { - logInfo("Creating streaming context from checkpoint file") - new StreamingContext(checkpointPath) - } else { - logInfo("Creating new streaming context") - val ssc = creatingFunc() - ssc.checkpoint(checkpointPath) - ssc + + try { + CheckpointReader.read(checkpointPath, hadoopConf) match { + case Some(checkpoint) => + return new StreamingContext(null, checkpoint, null) + case None => + logInfo("Creating new StreamingContext") + return creatingFunc() + } + } catch { + case e: Exception => + if (createOnError) { + logWarning("Error reading checkpoint", e) + logInfo("Creating new StreamingContext") + return creatingFunc() + } else { + logError("Error reading checkpoint", e) + throw e + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index aad0d931e7..f38d145317 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -40,6 +40,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.hadoop.conf.Configuration /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -125,10 +126,16 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Re-creates a StreamingContext from a checkpoint file. - * @param path Path either to the directory that was specified as the checkpoint directory, or - * to the checkpoint file 'graph' or 'graph.bk'. + * @param path Path to the directory that was specified as the checkpoint directory */ - def this(path: String) = this (new StreamingContext(path)) + def this(path: String) = this(new StreamingContext(path)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path to the directory that was specified as the checkpoint directory + * + */ + def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf)) /** The underlying SparkContext */ val sc: JavaSparkContext = new JavaSparkContext(ssc.sc) @@ -699,13 +706,92 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * Starts the execution of the streams. + * Start the execution of the streams. */ def start() = ssc.start() /** - * Sstops the execution of the streams. + * Stop the execution of the streams. */ def stop() = ssc.stop() +} + +/** + * JavaStreamingContext object contains a number of static utility functions. + */ +object JavaStreamingContext { + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + */ + def getOrCreate( + checkpointPath: String, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf) + new JavaStreamingContext(ssc) + } + + /** + * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. + * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be + * recreated from the checkpoint data. If the data does not exist, then the StreamingContext + * will be created by called the provided `creatingFunc`. + * + * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program + * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext + * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible + * file system + * @param createOnError Whether to create a new JavaStreamingContext if there is an + * error in reading checkpoint data. + */ + def getOrCreate( + checkpointPath: String, + hadoopConf: Configuration, + factory: JavaStreamingContextFactory, + createOnError: Boolean + ): JavaStreamingContext = { + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + factory.create.ssc + }, hadoopConf, createOnError) + new JavaStreamingContext(ssc) + } +} + +/** + * Factory interface for creating a new JavaStreamingContext + */ +trait JavaStreamingContextFactory { + def create(): JavaStreamingContext } -- cgit v1.2.3 From 7b748b83a124a2b9c692da4a0c9285f4efa431b2 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 9 Jan 2014 20:42:48 -0800 Subject: Minor clean-up --- .../java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'examples/src') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index c37b0cacc9..2e616b1ab2 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -38,7 +38,7 @@ import java.util.regex.Pattern; * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` + * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` */ public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); -- cgit v1.2.3 From e4bb845238d0df48f8258e925caf9af5a107af46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:17:09 -0800 Subject: Updated docs based on Patrick's comments in PR 383. --- .../org/apache/spark/util/TimeStampedHashMap.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../examples/RecoverableNetworkWordCount.scala | 49 +++++++++++++++++----- .../org/apache/spark/streaming/Checkpoint.scala | 13 ++++-- .../streaming/api/java/JavaStreamingContext.scala | 14 +++---- 5 files changed, 58 insertions(+), 25 deletions(-) (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index dde504fc52..8e07a0f29a 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -27,8 +27,8 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion * timestamp along with each key-value pair. If specified, the timestamp of each pair can be - * updated every it is accessed. Key-value pairs whose timestamp are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to + * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can then be removed using the clearOldValues method. This is intended to * be a drop-in replacement of scala.collection.mutable.HashMap. * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be * updated when it is accessed diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index aba1704825..4b896eaccb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in text encoded with UTF8 received from the network every second. + * * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 739f805e87..d51e6e9418 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -26,18 +26,41 @@ import com.google.common.io.Files import java.nio.charset.Charset /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. - * directory in a Hadoop compatible file system to which checkpoint - * data will be saved to; this must be a fault-tolerant file system - * like HDFS for the system to recover from driver failures - * directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * In local mode, should be 'local[n]' with n > 1 + * and must be absolute paths + * + * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * local[2] localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. */ object RecoverableNetworkWordCount { @@ -52,7 +75,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -74,9 +97,13 @@ object RecoverableNetworkWordCount { System.err.println( """ |Usage: RecoverableNetworkWordCount + | is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + | and describe the TCP server that Spark Streaming would connect to receive data. + | directory to HDFS-compatible file system which checkpoint data + | file to which the word counts will be appended | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and must be absolute paths """.stripMargin ) System.exit(1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 62b225382e..1249ef4c3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf - - // do not save these configurations + + // These should be unset when a checkpoint is deserialized, + // otherwise the SparkContext won't initialize correctly. sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { @@ -102,8 +103,12 @@ object Checkpoint extends Logging { * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) - extends Logging { +class CheckpointWriter( + jobGenerator: JobGenerator, + conf: SparkConf, + checkpointDir: String, + hadoopConf: Configuration + ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d96e9ac7b7..523173d45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * JavaStreamingContext object contains a number of static utility functions. + * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -515,8 +515,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -537,8 +537,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext -- cgit v1.2.3 From 93a65e5fde64ffed3dbd2a050c1007e077ecd004 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Sun, 12 Jan 2014 10:30:04 -0800 Subject: Remove simple redundant return statement for Scala methods/functions: -) Only change simple return statements at the end of method -) Ignore the complex if-else check -) Ignore the ones inside synchronized --- .../main/scala/org/apache/spark/Accumulators.scala | 2 +- .../main/scala/org/apache/spark/CacheManager.scala | 4 +-- .../scala/org/apache/spark/HttpFileServer.scala | 6 ++--- core/src/main/scala/org/apache/spark/Logging.scala | 2 +- .../scala/org/apache/spark/MapOutputTracker.scala | 4 +-- .../main/scala/org/apache/spark/Partitioner.scala | 4 +-- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++--- .../scala/org/apache/spark/SparkHadoopWriter.scala | 13 +++++----- .../org/apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 6 ++--- .../org/apache/spark/network/BufferMessage.scala | 2 +- .../org/apache/spark/network/Connection.scala | 6 ++--- .../scala/org/apache/spark/network/Message.scala | 6 ++--- .../apache/spark/network/netty/ShuffleSender.scala | 2 +- .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 4 +-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +++--- .../main/scala/org/apache/spark/rdd/PipedRDD.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +-- .../apache/spark/scheduler/InputFormatInfo.scala | 8 +++--- .../scala/org/apache/spark/scheduler/Pool.scala | 8 +++--- .../spark/scheduler/SchedulingAlgorithm.scala | 11 ++++---- .../apache/spark/scheduler/SparkListenerBus.scala | 2 +- .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 2 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 14 +++++------ .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerBackend.scala | 6 ++--- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../apache/spark/storage/BlockManagerWorker.scala | 14 +++++------ .../org/apache/spark/storage/BlockMessage.scala | 2 +- .../apache/spark/storage/BlockMessageArray.scala | 2 +- .../org/apache/spark/storage/MemoryStore.scala | 2 +- .../org/apache/spark/storage/StorageLevel.scala | 2 +- .../org/apache/spark/util/AppendOnlyMap.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala | 10 ++++---- .../org/apache/spark/util/SizeEstimator.scala | 10 ++++---- .../main/scala/org/apache/spark/util/Utils.scala | 19 +++++++------- .../main/scala/org/apache/spark/util/Vector.scala | 12 ++++----- .../spark/scheduler/ClusterSchedulerSuite.scala | 9 ++++--- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../apache/spark/scheduler/JobLoggerSuite.scala | 2 +- .../apache/spark/util/ClosureCleanerSuite.scala | 14 +++++------ .../scala/org/apache/spark/examples/LocalALS.scala | 8 +++--- .../org/apache/spark/examples/LocalFileLR.scala | 2 +- .../org/apache/spark/examples/LocalKMeans.scala | 2 +- .../scala/org/apache/spark/examples/SparkALS.scala | 6 ++--- .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- .../org/apache/spark/examples/SparkKMeans.scala | 12 ++++----- .../examples/clickstream/PageViewGenerator.scala | 2 +- .../spark/mllib/api/python/PythonMLLibAPI.scala | 29 +++++++++++----------- .../org/apache/spark/streaming/Checkpoint.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 8 +++--- .../streaming/scheduler/StreamingListenerBus.scala | 2 +- .../org/apache/spark/streaming/util/Clock.scala | 4 +-- .../spark/streaming/util/RawTextHelper.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 9 ++++--- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 8 +++--- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 7 +++--- .../yarn/ClientDistributedCacheManager.scala | 10 ++++---- .../yarn/ClientDistributedCacheManagerSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 5 ++-- 63 files changed, 187 insertions(+), 186 deletions(-) (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 5f73d234aa..e89ac28b8e 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -218,7 +218,7 @@ private object Accumulators { def newId: Long = synchronized { lastId += 1 - return lastId + lastId } def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 519ecde50a..8e5dd8a850 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { blockManager.get(key) match { case Some(values) => // Partition is already materialized, so just return its values - return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => // Mark the split as loading (unless someone else marks it first) @@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val elements = new ArrayBuffer[Any] elements ++= computedValues blockManager.put(key, elements, storageLevel, tellMaster = true) - return elements.iterator.asInstanceOf[Iterator[T]] + elements.iterator.asInstanceOf[Iterator[T]] } finally { loading.synchronized { loading.remove(key) diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index ad1ee20045..a885898ad4 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging { def addFile(file: File) : String = { addFileToDir(file, fileDir) - return serverUri + "/files/" + file.getName + serverUri + "/files/" + file.getName } def addJar(file: File) : String = { addFileToDir(file, jarDir) - return serverUri + "/jars/" + file.getName + serverUri + "/jars/" + file.getName } def addFileToDir(file: File, dir: File) : String = { Files.copy(file, new File(dir, file.getName)) - return dir + "/" + file.getName + dir + "/" + file.getName } } diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 4a34989e50..9063cae87e 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -41,7 +41,7 @@ trait Logging { } log_ = LoggerFactory.getLogger(className) } - return log_ + log_ } // Log methods that take only a String diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 77b8ca1cce..57bdf22b9c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } - else{ + else { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing all output locations for shuffle " + shuffleId)) } @@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { - // Cache a serialized version of the output statuses for each shuffle to send them out faster + // Cache a serialized version of the output statuses for each shuffle to send them out faster return private var cacheEpoch = epoch private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9b043f06dd..fc0a749882 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -53,9 +53,9 @@ object Partitioner { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { - return new HashPartitioner(rdd.context.defaultParallelism) + new HashPartitioner(rdd.context.defaultParallelism) } else { - return new HashPartitioner(bySize.head.partitions.size) + new HashPartitioner(bySize.head.partitions.size) } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f91392b351..3d82bfc019 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -756,8 +756,11 @@ class SparkContext( private[spark] def getCallSite(): String = { val callSite = getLocalProperty("externalCallSite") - if (callSite == null) return Utils.formatSparkCallSite - callSite + if (callSite == null) { + Utils.formatSparkCallSite + } else { + callSite + } } /** @@ -907,7 +910,7 @@ class SparkContext( */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) - return f + f } /** @@ -919,7 +922,7 @@ class SparkContext( val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) - fs.getFileStatus(path).getPath().toString + fs.getFileStatus(path).getPath.toString } } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 618d95015f..bba873a0b6 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf) format = conf.value.getOutputFormat() .asInstanceOf[OutputFormat[AnyRef,AnyRef]] } - return format + format } private def getOutputCommitter(): OutputCommitter = { if (committer == null) { committer = conf.value.getOutputCommitter } - return committer + committer } private def getJobContext(): JobContext = { if (jobContext == null) { jobContext = newJobContext(conf.value, jID.value) } - return jobContext + jobContext } private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { taskContext = newTaskAttemptContext(conf.value, taID.value) } - return taskContext + taskContext } private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { @@ -182,7 +182,7 @@ object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) - return new JobID(jobtrackerID, id) + new JobID(jobtrackerID, id) } def createPathFromString(path: String, conf: JobConf): Path = { @@ -194,7 +194,6 @@ object SparkHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath = outputPath.makeQualified(fs) - return outputPath + outputPath.makeQualified(fs) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 40c519b5bd..8830de7273 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) - return new Iterator[Array[Byte]] { + new Iterator[Array[Byte]] { def next(): Array[Byte] = { val obj = _nextObj if (hasNext) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index fdf92eca4f..1d295c62bc 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -203,16 +203,16 @@ extends Logging { } bais.close() - var tInfo = TorrentInfo(retVal, blockNum, byteArray.length) + val tInfo = TorrentInfo(retVal, blockNum, byteArray.length) tInfo.hasBlocks = blockNum - return tInfo + tInfo } def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, totalBlocks: Int): T = { - var retByteArray = new Array[Byte](totalBytes) + val retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala index f736bb3713..fb4c65909a 100644 --- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala @@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: throw new Exception("Max chunk size is " + maxChunkSize) } - if (size == 0 && gotChunkForSendingOnce == false) { + if (size == 0 && !gotChunkForSendingOnce) { val newChunk = new MessageChunk( new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null) gotChunkForSendingOnce = true diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 95cb0206ac..cba8477ed5 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, // Is highly unlikely unless there was an unclean close of socket, etc registerInterest() logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending") - return true + true } catch { case e: Exception => { logWarning("Error finishing connection to " + address, e) @@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, } } // should not happen - to keep scala compiler happy - return true + true } // This is a hack to determine if remote socket was closed or not. @@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S } } // should not happen - to keep scala compiler happy - return true + true } def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback} diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala index f2ecc6d439..2612884bdb 100644 --- a/core/src/main/scala/org/apache/spark/network/Message.scala +++ b/core/src/main/scala/org/apache/spark/network/Message.scala @@ -61,7 +61,7 @@ private[spark] object Message { if (dataBuffers.exists(_ == null)) { throw new Exception("Attempting to create buffer message with null buffer") } - return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) + new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId) } def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage = @@ -69,9 +69,9 @@ private[spark] object Message { def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = { if (dataBuffer == null) { - return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) + createBufferMessage(Array(ByteBuffer.allocate(0)), ackId) } else { - return createBufferMessage(Array(dataBuffer), ackId) + createBufferMessage(Array(dataBuffer), ackId) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 546d921067..44204a8c46 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -64,7 +64,7 @@ private[spark] object ShuffleSender { val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = new File(localDirs(dirId), "%02x".format(subDirId)) val file = new File(subDir, blockId.name) - return new FileSegment(file, 0, file.length()) + new FileSegment(file, 0, file.length()) } } val sender = new ShuffleSender(port, pResovler) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 98da35763b..a5394a28e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc val prefPartActual = prefPart.get if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows - return minPowerOfTwo // prefer balance over locality + minPowerOfTwo // prefer balance over locality else { - return prefPartActual // prefer locality over balance + prefPartActual // prefer locality over balance } } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 53f77a38f5..20db7db5ed 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -99,11 +99,11 @@ class HadoopRDD[K, V]( val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. - return conf.asInstanceOf[JobConf] + conf.asInstanceOf[JobConf] } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // getJobConf() has been called previously, so there is already a local cache of the JobConf // needed by this RDD. - return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] } else { // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). @@ -111,7 +111,7 @@ class HadoopRDD[K, V]( val newJobConf = new JobConf(broadcastedConf.value.value) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - return newJobConf + newJobConf } } @@ -127,7 +127,7 @@ class HadoopRDD[K, V]( newInputFormat.asInstanceOf[Configurable].setConf(conf) } HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) - return newInputFormat + newInputFormat } override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 1dbbe39898..d4f396afb5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag]( // Return an iterator that read lines from the process's stdout val lines = Source.fromInputStream(proc.getInputStream).getLines - return new Iterator[String] { + new Iterator[String] { def next() = lines.next() def hasNext = { if (lines.hasNext) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f9dc12eee3..edd4f381db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag]( val entry = iter.next() m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) } - return m1 + m1 } val myResult = mapPartitions(countPartition).reduce(mergeMaps) myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map @@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag]( partsScanned += numPartsToTry } - return buf.toArray + buf.toArray } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 90eb8a747f..cc10cc0849 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split) } - return retval.toSet + retval.toSet } // This method does not expect failures, since validate has already passed ... @@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem) ) - return retval.toSet + retval.toSet } private def findPreferredLocations(): Set[SplitInfo] = { logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + ", inputFormatClazz : " + inputFormatClazz) if (mapreduceInputFormat) { - return prefLocsFromMapreduceInputFormat() + prefLocsFromMapreduceInputFormat() } else { assert(mapredInputFormat) - return prefLocsFromMapredInputFormat() + prefLocsFromMapredInputFormat() } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 1791242215..4bc13c23d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -75,12 +75,12 @@ private[spark] class Pool( return schedulableNameToSchedulable(schedulableName) } for (schedulable <- schedulableQueue) { - var sched = schedulable.getSchedulableByName(schedulableName) + val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched } } - return null + null } override def executorLost(executorId: String, host: String) { @@ -92,7 +92,7 @@ private[spark] class Pool( for (schedulable <- schedulableQueue) { shouldRevive |= schedulable.checkSpeculatableTasks() } - return shouldRevive + shouldRevive } override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { @@ -101,7 +101,7 @@ private[spark] class Pool( for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue() } - return sortedTaskSetQueue + sortedTaskSetQueue } def increaseRunningTasks(taskNum: Int) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 3418640b8c..5e62c8468f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -37,9 +37,9 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { res = math.signum(stageId1 - stageId2) } if (res < 0) { - return true + true } else { - return false + false } } } @@ -56,7 +56,6 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble - var res:Boolean = true var compare:Int = 0 if (s1Needy && !s2Needy) { @@ -70,11 +69,11 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { } if (compare < 0) { - return true + true } else if (compare > 0) { - return false + false } else { - return s1.name < s2.name + s1.name < s2.name } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e7defd768b..e551c11f72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -88,6 +88,6 @@ private[spark] class SparkListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 7cb3fe46e5..c60e9896de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -96,7 +96,7 @@ private[spark] class Stage( def newAttemptId(): Int = { val id = nextAttemptId nextAttemptId += 1 - return id + id } val name = callSite.getOrElse(rdd.origin) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index e80cc6b0f6..9d3e615826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -74,6 +74,6 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long def value(): T = { val resultSer = SparkEnv.get.serializer.newInstance() - return resultSer.deserialize(valueBytes) + resultSer.deserialize(valueBytes) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index c52d6175d2..35e9544718 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -37,7 +37,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { - return sparkEnv.closureSerializer.newInstance() + sparkEnv.closureSerializer.newInstance() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a10e5397ad..fc0ee07089 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -228,7 +228,7 @@ private[spark] class TaskSetManager( return Some(index) } } - return None + None } /** Check whether a task is currently running an attempt on a given host */ @@ -291,7 +291,7 @@ private[spark] class TaskSetManager( } } - return None + None } /** @@ -332,7 +332,7 @@ private[spark] class TaskSetManager( } // Finally, if all else has failed, find a speculative task - return findSpeculativeTask(execId, host, locality) + findSpeculativeTask(execId, host, locality) } /** @@ -387,7 +387,7 @@ private[spark] class TaskSetManager( case _ => } } - return None + None } /** @@ -584,7 +584,7 @@ private[spark] class TaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def addSchedulable(schedulable: Schedulable) {} @@ -594,7 +594,7 @@ private[spark] class TaskSetManager( override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this) sortedTaskSetQueue += this - return sortedTaskSetQueue + sortedTaskSetQueue } /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */ @@ -669,7 +669,7 @@ private[spark] class TaskSetManager( } } } - return foundTasks + foundTasks } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index e16d60c54c..c27049bdb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -140,7 +140,7 @@ private[spark] class CoarseMesosSchedulerBackend( .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } - return command.build() + command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b428c82a48..49781485d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -141,13 +141,13 @@ private[spark] class MesosSchedulerBackend( // Serialize the map as an array of (String, String) pairs execArgs = Utils.serialize(props.toArray) } - return execArgs + execArgs } private def setClassLoader(): ClassLoader = { val oldClassLoader = Thread.currentThread.getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) - return oldClassLoader + oldClassLoader } private def restoreClassLoader(oldClassLoader: ClassLoader) { @@ -255,7 +255,7 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(1).build()) .build() - return MesosTaskInfo.newBuilder() + MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(createExecutorInfo(slaveId)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index c56e2ca2df..a716b1d577 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -412,7 +412,7 @@ private[spark] class BlockManager( logDebug("The value of block " + blockId + " is null") } logDebug("Block " + blockId + " not found") - return None + None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala index 21f003609b..a36abe0670 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala @@ -42,7 +42,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage) logDebug("Parsed as a block message array") val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get) - return Some(new BlockMessageArray(responseMessages).toBufferMessage) + Some(new BlockMessageArray(responseMessages).toBufferMessage) } catch { case e: Exception => logError("Exception handling buffer message", e) return None @@ -50,7 +50,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } case otherMessage: Any => { logError("Unknown type message received: " + otherMessage) - return None + None } } } @@ -61,7 +61,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logDebug("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) - return None + None } case BlockMessage.TYPE_GET_BLOCK => { val gB = new GetBlock(blockMessage.getId) @@ -70,9 +70,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends if (buffer == null) { return None } - return Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) + Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer))) } - case _ => return None + case _ => None } } @@ -93,7 +93,7 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends } logDebug("GetBlock " + id + " used " + Utils.getUsedTimeMs(startTimeMs) + " and got buffer " + buffer) - return buffer + buffer } } @@ -132,6 +132,6 @@ private[spark] object BlockManagerWorker extends Logging { } case None => logDebug("No response message received"); return null } - return null + null } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala index 80dcb5a207..fbafcf79d2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessage.scala @@ -154,7 +154,7 @@ private[spark] class BlockMessage() { println() */ val finishTime = System.currentTimeMillis - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } override def toString: String = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala index a06f50a0ac..59329361f3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala @@ -96,7 +96,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM println() println() */ - return Message.createBufferMessage(buffers) + Message.createBufferMessage(buffers) } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 05f676c6e2..27f057b9f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -245,7 +245,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return false } } - return true + true } override def contains(blockId: BlockId): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index b5596dffd3..0f84810d6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -74,7 +74,7 @@ class StorageLevel private( if (deserialized_) { ret |= 1 } - return ret + ret } override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala index 8bb4ee3bfa..edfa58b2d9 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala @@ -67,7 +67,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi i += 1 } } - return null.asInstanceOf[V] + null.asInstanceOf[V] } /** Set the value for a key */ diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 7108595e3e..1df6b87fb0 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -61,7 +61,7 @@ private[spark] object ClosureCleaner extends Logging { return f.getType :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } // Get a list of the outer objects for a given closure object. @@ -74,7 +74,7 @@ private[spark] object ClosureCleaner extends Logging { return f.get(obj) :: Nil // Stop at the first $outer that is not a closure } } - return Nil + Nil } private def getInnerClasses(obj: AnyRef): List[Class[_]] = { @@ -174,7 +174,7 @@ private[spark] object ClosureCleaner extends Logging { field.setAccessible(true) field.set(obj, outer) } - return obj + obj } } } @@ -182,7 +182,7 @@ private[spark] object ClosureCleaner extends Logging { private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) { if (op == GETFIELD) { for (cl <- output.keys if cl.getName == owner.replace('/', '.')) { @@ -215,7 +215,7 @@ private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisi override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { - return new MethodVisitor(ASM4) { + new MethodVisitor(ASM4) { override def visitMethodInsn(op: Int, owner: String, name: String, desc: String) { val argTypes = Type.getArgumentTypes(desc) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index bddb3bb735..3cf94892e9 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -108,7 +108,7 @@ private[spark] object SizeEstimator extends Logging { val bean = ManagementFactory.newPlatformMXBeanProxy(server, hotSpotMBeanName, hotSpotMBeanClass) // TODO: We could use reflection on the VMOption returned ? - return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") + getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true") } catch { case e: Exception => { // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB @@ -141,7 +141,7 @@ private[spark] object SizeEstimator extends Logging { def dequeue(): AnyRef = { val elem = stack.last stack.trimEnd(1) - return elem + elem } } @@ -162,7 +162,7 @@ private[spark] object SizeEstimator extends Logging { while (!state.isFinished) { visitSingleObject(state.dequeue(), state) } - return state.size + state.size } private def visitSingleObject(obj: AnyRef, state: SearchState) { @@ -276,11 +276,11 @@ private[spark] object SizeEstimator extends Logging { // Create and cache a new ClassInfo val newInfo = new ClassInfo(shellSize, pointerFields) classInfos.put(cls, newInfo) - return newInfo + newInfo } private def alignSize(size: Long): Long = { val rem = size % ALIGN_SIZE - return if (rem == 0) size else (size + ALIGN_SIZE - rem) + if (rem == 0) size else (size + ALIGN_SIZE - rem) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f1253100b..f80ed290ab 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -49,14 +49,14 @@ private[spark] object Utils extends Logging { val oos = new ObjectOutputStream(bos) oos.writeObject(o) oos.close() - return bos.toByteArray + bos.toByteArray } /** Deserialize an object using Java serialization */ def deserialize[T](bytes: Array[Byte]): T = { val bis = new ByteArrayInputStream(bytes) val ois = new ObjectInputStream(bis) - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize an object using Java serialization and the given ClassLoader */ @@ -66,7 +66,7 @@ private[spark] object Utils extends Logging { override def resolveClass(desc: ObjectStreamClass) = Class.forName(desc.getName, false, loader) } - return ois.readObject.asInstanceOf[T] + ois.readObject.asInstanceOf[T] } /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */ @@ -144,7 +144,7 @@ private[spark] object Utils extends Logging { i += 1 } } - return buf + buf } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() @@ -428,7 +428,7 @@ private[spark] object Utils extends Logging { def parseHostPort(hostPort: String): (String, Int) = { { // Check cache first. - var cached = hostPortParseResults.get(hostPort) + val cached = hostPortParseResults.get(hostPort) if (cached != null) return cached } @@ -731,7 +731,7 @@ private[spark] object Utils extends Logging { } catch { case ise: IllegalStateException => return true } - return false + false } def isSpace(c: Char): Boolean = { @@ -748,7 +748,7 @@ private[spark] object Utils extends Logging { var inWord = false var inSingleQuote = false var inDoubleQuote = false - var curWord = new StringBuilder + val curWord = new StringBuilder def endWord() { buf += curWord.toString curWord.clear() @@ -794,7 +794,7 @@ private[spark] object Utils extends Logging { if (inWord || inDoubleQuote || inSingleQuote) { endWord() } - return buf + buf } /* Calculates 'x' modulo 'mod', takes to consideration sign of x, @@ -822,8 +822,7 @@ private[spark] object Utils extends Logging { /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { - return System.getProperties().clone() - .asInstanceOf[java.util.Properties].toMap[String, String] + System.getProperties.clone().asInstanceOf[java.util.Properties].toMap[String, String] } /** diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index fe710c58ac..094edcde7e 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -25,7 +25,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def + (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) + other(i)) + Vector(length, i => this(i) + other(i)) } def add(other: Vector) = this + other @@ -33,7 +33,7 @@ class Vector(val elements: Array[Double]) extends Serializable { def - (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) - other(i)) + Vector(length, i => this(i) - other(i)) } def subtract(other: Vector) = this - other @@ -47,7 +47,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += this(i) * other(i) i += 1 } - return ans + ans } /** @@ -67,7 +67,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) + plus(i)) * other(i) i += 1 } - return ans + ans } def += (other: Vector): Vector = { @@ -102,7 +102,7 @@ class Vector(val elements: Array[Double]) extends Serializable { ans += (this(i) - other(i)) * (this(i) - other(i)) i += 1 } - return ans + ans } def dist(other: Vector): Double = math.sqrt(squaredDist(other)) @@ -117,7 +117,7 @@ object Vector { def apply(length: Int, initializer: Int => Double): Vector = { val elements: Array[Double] = Array.tabulate(length)(initializer) - return new Vector(elements) + new Vector(elements) } def zeros(length: Int) = new Vector(new Array[Double](length)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala index 7bf2020fe3..235d31709a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala @@ -64,7 +64,7 @@ class FakeTaskSetManager( } override def getSchedulableByName(name: String): Schedulable = { - return null + null } override def executorLost(executorId: String, host: String): Unit = { @@ -79,13 +79,14 @@ class FakeTaskSetManager( { if (tasksSuccessful + runningTasks < numTasks) { increaseRunningTasks(1) - return Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + Some(new TaskDescription(0, execId, "task 0:0", 0, null)) + } else { + None } - return None } override def checkSpeculatableTasks(): Boolean = { - return true + true } def taskFinished() { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2aa259daf3..14f89d50b7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -122,7 +122,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont locations: Seq[Seq[String]] = Nil ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 5cc48ee00a..3880e68725 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -47,7 +47,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers dependencies: List[Dependency[_]] ): MyRDD = { val maxPartition = numPartitions - 1 - return new MyRDD(sc, dependencies) { + new MyRDD(sc, dependencies) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") override def getPartitions = (0 to maxPartition).map(i => new Partition { diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 0ed366fb70..de4871d043 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -61,8 +61,8 @@ class NonSerializable {} object TestObject { def run(): Int = { var nonSer = new NonSerializable - var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + val x = 5 + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -76,7 +76,7 @@ class TestClass extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -88,7 +88,7 @@ class TestClassWithoutDefaultConstructor(x: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + getX).reduce(_ + _) } @@ -103,7 +103,7 @@ class TestClassWithoutFieldAccess { def run(): Int = { var nonSer2 = new NonSerializable var x = 5 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) } @@ -115,7 +115,7 @@ object TestObjectWithNesting { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) var y = 1 for (i <- 1 to 4) { @@ -134,7 +134,7 @@ class TestClassWithNesting(val y: Int) extends Serializable { def run(): Int = { var nonSer = new NonSerializable var answer = 0 - return withSpark(new SparkContext("local", "test")) { sc => + withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) for (i <- 1 to 4) { var nonSer2 = new NonSerializable diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 83db8b9e26..c8ecbb8e41 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -43,7 +43,7 @@ object LocalALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -56,7 +56,7 @@ object LocalALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -80,7 +80,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], @@ -104,7 +104,7 @@ object LocalALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index fb130ea198..9ab5f5a486 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -28,7 +28,7 @@ object LocalFileLR { def parsePoint(line: String): DataPoint = { val nums = line.split(' ').map(_.toDouble) - return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index f90ea35cd4..a730464ea1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -55,7 +55,7 @@ object LocalKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 30c86d83e6..17bafc2218 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -44,7 +44,7 @@ object SparkALS { def generateR(): DoubleMatrix2D = { val mh = factory2D.random(M, F) val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) + algebra.mult(mh, algebra.transpose(uh)) } def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], @@ -57,7 +57,7 @@ object SparkALS { //println("R: " + r) blas.daxpy(-1, targetR, r) val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) + sqrt(sumSqs / (M * U)) } def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], @@ -83,7 +83,7 @@ object SparkALS { val ch = new CholeskyDecomposition(XtX) val Xty2D = factory2D.make(Xty.toArray, F) val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) + solved2D.viewColumn(0) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index ff72532db1..39819064ed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -43,7 +43,7 @@ object SparkHdfsLR { while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 } - return DataPoint(new Vector(x), y) + DataPoint(new Vector(x), y) } def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 8c99025eaa..9fe2465235 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -30,7 +30,7 @@ object SparkKMeans { val rand = new Random(42) def parseVector(line: String): Vector = { - return new Vector(line.split(' ').map(_.toDouble)) + new Vector(line.split(' ').map(_.toDouble)) } def closestPoint(p: Vector, centers: Array[Vector]): Int = { @@ -46,7 +46,7 @@ object SparkKMeans { } } - return bestIndex + bestIndex } def main(args: Array[String]) { @@ -61,15 +61,15 @@ object SparkKMeans { val K = args(2).toInt val convergeDist = args(3).toDouble - var kPoints = data.takeSample(false, K, 42).toArray + val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} + val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() + val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() tempDist = 0.0 for (i <- 0 until K) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 4fe57de4a4..a2600989ca 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -65,7 +65,7 @@ object PageViewGenerator { return item } } - return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 } def getNextClickEvent() : String = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 2d8623392e..c972a71349 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -48,7 +48,7 @@ class PythonMLLibAPI extends Serializable { val db = bb.asDoubleBuffer() val ans = new Array[Double](length.toInt) db.get(ans) - return ans + ans } private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = { @@ -60,7 +60,7 @@ class PythonMLLibAPI extends Serializable { bb.putLong(len) val db = bb.asDoubleBuffer() db.put(doubles) - return bytes + bytes } private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = { @@ -86,7 +86,7 @@ class PythonMLLibAPI extends Serializable { ans(i) = new Array[Double](cols.toInt) db.get(ans(i)) } - return ans + ans } private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = { @@ -102,11 +102,10 @@ class PythonMLLibAPI extends Serializable { bb.putLong(rows) bb.putLong(cols) val db = bb.asDoubleBuffer() - var i = 0 for (i <- 0 until rows) { db.put(doubles(i)) } - return bytes + bytes } private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel, @@ -121,7 +120,7 @@ class PythonMLLibAPI extends Serializable { val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleVector(model.weights)) ret.add(model.intercept: java.lang.Double) - return ret + ret } /** @@ -130,7 +129,7 @@ class PythonMLLibAPI extends Serializable { def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LinearRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -142,7 +141,7 @@ class PythonMLLibAPI extends Serializable { def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LassoWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -154,7 +153,7 @@ class PythonMLLibAPI extends Serializable { def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -166,7 +165,7 @@ class PythonMLLibAPI extends Serializable { def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, regParam: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => SVMWithSGD.train(data, numIterations, stepSize, regParam, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -178,7 +177,7 @@ class PythonMLLibAPI extends Serializable { def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int, stepSize: Double, miniBatchFraction: Double, initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = { - return trainRegressionModel((data, initialWeights) => + trainRegressionModel((data, initialWeights) => LogisticRegressionWithSGD.train(data, numIterations, stepSize, miniBatchFraction, initialWeights), dataBytesJRDD, initialWeightsBA) @@ -194,7 +193,7 @@ class PythonMLLibAPI extends Serializable { val model = KMeans.train(data, k, maxIterations, runs, initializationMode) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(serializeDoubleMatrix(model.clusterCenters)) - return ret + ret } /** Unpack a Rating object from an array of bytes */ @@ -204,7 +203,7 @@ class PythonMLLibAPI extends Serializable { val user = bb.getInt() val product = bb.getInt() val rating = bb.getDouble() - return new Rating(user, product, rating) + new Rating(user, product, rating) } /** Unpack a tuple of Ints from an array of bytes */ @@ -245,7 +244,7 @@ class PythonMLLibAPI extends Serializable { def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.train(ratings, rank, iterations, lambda, blocks) + ALS.train(ratings, rank, iterations, lambda, blocks) } /** @@ -257,6 +256,6 @@ class PythonMLLibAPI extends Serializable { def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int, iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) - return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) + ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ca0115f90e..ebfb8dba8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -203,6 +203,6 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade } catch { case e: Exception => } - return super.resolveClass(desc) + super.resolveClass(desc) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fb9eda8996..a7ba2339c0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -219,7 +219,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas reset() return false } - return true + true } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index e0ff3ccba4..b34ba7b9b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -65,7 +65,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime) - return Some(stateRDD) + Some(stateRDD) } case None => { // If parent RDD does not exist @@ -76,7 +76,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( updateFuncLocal(i) } val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) - return Some(stateRDD) + Some(stateRDD) } } } @@ -98,11 +98,11 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( val groupedRDD = parentRDD.groupByKey(partitioner) val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning) //logDebug("Generating state RDD for time " + validTime + " (first)") - return Some(sessionRDD) + Some(sessionRDD) } case None => { // If parent RDD does not exist, then nothing to do! //logDebug("Not generating state RDD (no previous state, no parent)") - return None + None } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 110a20f282..73dc52023a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,6 +76,6 @@ private[spark] class StreamingListenerBus() extends Logging { * add overhead in the general case. */ Thread.sleep(10) } - return true + true } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala index f67bb2f6ac..c3a849d276 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/Clock.scala @@ -66,7 +66,7 @@ class SystemClock() extends Clock { } Thread.sleep(sleepTime) } - return -1 + -1 } } @@ -96,6 +96,6 @@ class ManualClock() extends Clock { this.wait(100) } } - return currentTime() + currentTime() } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index 4e6ce6eabd..5b6c048a39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -90,7 +90,7 @@ object RawTextHelper { } } } - return taken.toIterator + taken.toIterator } /** diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 23781ea35c..e1fe09e3e2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -158,7 +158,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) - return appContext + appContext } /** See if two file systems are the same or not. */ @@ -191,9 +191,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ @@ -299,7 +300,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } UserGroupInformation.getCurrentUser().addCredentials(credentials) - return localResources + localResources } def setupLaunchEnv( diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ddfec1a4ac..0138d7ade1 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -125,7 +125,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val containerId = ConverterUtils.toContainerId(containerIdString) val appAttemptId = containerId.getApplicationAttemptId() logInfo("ApplicationAttemptId: " + appAttemptId) - return appAttemptId + appAttemptId } private def registerWithResourceManager(): AMRMProtocol = { @@ -133,7 +133,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) logInfo("Connecting to ResourceManager at " + rmAddress) - return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] + rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { @@ -147,7 +147,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? appMasterRequest.setTrackingUrl("") - return resourceManager.registerApplicationMaster(appMasterRequest) + resourceManager.registerApplicationMaster(appMasterRequest) } private def waitForSparkMaster() { @@ -220,7 +220,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar t.setDaemon(true) t.start() logInfo("Started progress reporter thread - sleep time : " + sleepTime) - return t + t } private def sendProgress() { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 132630e5ef..d32cdcc879 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -195,7 +195,7 @@ class WorkerRunnable( } logInfo("Prepared Local resources " + localResources) - return localResources + localResources } def prepareEnvironment: HashMap[String, String] = { @@ -207,7 +207,7 @@ class WorkerRunnable( Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - return env + env } def connectToCM: ContainerManager = { @@ -226,8 +226,7 @@ class WorkerRunnable( val proxy = user .doAs(new PrivilegedExceptionAction[ContainerManager] { def run: ContainerManager = { - return rpc.getProxy(classOf[ContainerManager], - cmAddress, conf).asInstanceOf[ContainerManager] + rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } }) proxy diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 5f159b073f..535abbfb7f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -143,7 +143,7 @@ class ClientDistributedCacheManager() extends Logging { if (isPublic(conf, uri, statCache)) { return LocalResourceVisibility.PUBLIC } - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } /** @@ -161,7 +161,7 @@ class ClientDistributedCacheManager() extends Logging { if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { return false } - return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) + ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) } /** @@ -183,7 +183,7 @@ class ClientDistributedCacheManager() extends Logging { } current = current.getParent() } - return true + true } /** @@ -203,7 +203,7 @@ class ClientDistributedCacheManager() extends Logging { if (otherAction.implies(action)) { return true } - return false + false } /** @@ -223,6 +223,6 @@ class ClientDistributedCacheManager() extends Logging { statCache.put(uri, newStat) newStat } - return stat + stat } } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index 2941356bc5..458df4fa3c 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -42,7 +42,7 @@ class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { class MockClientDistributedCacheManager extends ClientDistributedCacheManager { override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): LocalResourceVisibility = { - return LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PRIVATE } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index be323d7783..efeee31acd 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -205,9 +205,10 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ -- cgit v1.2.3 From f4d77f8cb8a9eab43bea35e8e6c9bc0d2c2b53a8 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 11 Jan 2014 10:50:14 -0800 Subject: Rename DStream.foreach to DStream.foreachRDD `foreachRDD` makes it clear that the granularity of this operator is per-RDD. As it stands, `foreach` is inconsistent with with `map`, `filter`, and the other DStream operators which get pushed down to individual records within each RDD. --- docs/streaming-programming-guide.md | 4 ++-- .../org/apache/spark/streaming/examples/RawNetworkGrep.scala | 2 +- .../apache/spark/streaming/examples/TwitterAlgebirdCMS.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterAlgebirdHLL.scala | 4 ++-- .../apache/spark/streaming/examples/TwitterPopularTags.scala | 4 ++-- .../spark/streaming/examples/clickstream/PageViewStream.scala | 2 +- .../src/main/scala/org/apache/spark/streaming/DStream.scala | 10 +++++----- .../org/apache/spark/streaming/PairDStreamFunctions.scala | 4 ++-- .../org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 8 ++++---- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 2 +- 10 files changed, 22 insertions(+), 22 deletions(-) (limited to 'examples/src') diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1c9ece6270..3273817c78 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -175,7 +175,7 @@ When an output operator is called, it triggers the computation of a stream. Curr - + @@ -375,7 +375,7 @@ There are two failure behaviors based on which input sources are used. 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. 1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. -Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. +Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. ## Failure of the Driver Node A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint()` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index 3d08d86567..99b79c3949 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -58,7 +58,7 @@ object RawNetworkGrep { val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) - union.filter(_.contains("the")).count().foreach(r => + union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 80b5a98b14..483c4d3118 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -81,7 +81,7 @@ object TwitterAlgebirdCMS { val exactTopUsers = users.map(id => (id, 1)) .reduceByKey((a, b) => a + b) - approxTopUsers.foreach(rdd => { + approxTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => @@ -96,7 +96,7 @@ object TwitterAlgebirdCMS { } }) - exactTopUsers.foreach(rdd => { + exactTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partialMap = rdd.collect().toMap val partialTopK = rdd.map( diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index cb2f2c51a0..94c2bf29ac 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -67,7 +67,7 @@ object TwitterAlgebirdHLL { val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - approxUsers.foreach(rdd => { + approxUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() globalHll += partial @@ -76,7 +76,7 @@ object TwitterAlgebirdHLL { } }) - exactUsers.foreach(rdd => { + exactUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() userSet ++= partial diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 16c10feaba..8a70d4a978 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -56,13 +56,13 @@ object TwitterPopularTags { // Print popular hashtags - topCounts60.foreach(rdd => { + topCounts60.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) - topCounts10.foreach(rdd => { + topCounts10.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index da6b67bcce..bb44bc3d06 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -91,7 +91,7 @@ object PageViewStream { case "popularUsersSeen" => // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) - .foreach((rdd, time) => rdd.join(userList) + .foreachRDD((rdd, time) => rdd.join(userList) .map(_._2._2) .take(10) .foreach(u => println("Saw user %s at time %s".format(u, time)))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index b98f4a5101..93d57db494 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -487,15 +487,15 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + def foreachRDD(foreachFunc: RDD[T] => Unit) { + this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } @@ -719,7 +719,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } /** @@ -732,7 +732,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } def register() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 56dbcbda23..69d80c3711 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -582,7 +582,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } /** @@ -612,7 +612,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0..4b5d5ece52 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -244,16 +244,16 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction[R, Void]) { - dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) + def foreachRDD(foreachFunc: JFunction[R, Void]) { + dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: JFunction2[R, Time, Void]) { - dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) + def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f..9a187ce031 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase { val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) - stream.foreach(_ => {}) // Dummy output stream + stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) def getInputFromSlice(fromMillis: Long, toMillis: Long) = { -- cgit v1.2.3
OperatorMeaning
foreach(func) foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system.