From c4aa10154ec7270360328b19303b24806e6b4244 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 23 Oct 2012 13:49:52 -0700 Subject: Fix minor typos in quick start guide. --- docs/quick-start.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/quick-start.md b/docs/quick-start.md index defdb34836..dbc232b6e0 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -6,7 +6,7 @@ title: Quick Start * This will become a table of contents (this text will be scraped). {:toc} -This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a fuller reference. +This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone jobs in Scala and Java. See the [programming guide](scala-programming-guide.html) for a more complete reference. To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run: @@ -60,7 +60,7 @@ scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a res4: Long = 16 {% endhighlight %} -This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand: +This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand: {% highlight scala %} scala> import java.lang.Math @@ -98,10 +98,10 @@ scala> linesWithSpark.count() res9: Long = 15 {% endhighlight %} -It may seem silly to use a Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark). +It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark). # A Standalone Job in Scala -Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you using other build systems, consider using the Spark assembly JAR described in the developer guide. +Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you are using other build systems, consider using the Spark assembly JAR described in the developer guide. We'll create a very simple Spark job in Scala. So simple, in fact, that it's named `SimpleJob.scala`: @@ -112,7 +112,7 @@ import SparkContext._ object SimpleJob extends Application { val logFile = "/var/log/syslog" // Should be some file on your system - val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", + val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", "target/scala-{{site.SCALA_VERSION}}/simple-project_{{site.SCALA_VERSION}}-1.0.jar") val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() @@ -139,10 +139,10 @@ resolvers ++= Seq( "Spray Repository" at "http://repo.spray.cc/") {% endhighlight %} -Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job. +Of course, for sbt to work correctly, we'll need to layout `SimpleJob.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the job's code, then use `sbt run` to execute our example job. {% highlight bash %} -$ find . +$ find . . ./simple.sbt ./src @@ -159,7 +159,7 @@ Lines with a: 8422, Lines with b: 1836 This example only runs the job locally; for a tutorial on running jobs across several machines, see the [Standalone Mode](spark-standalone.html) documentation, and consider using a distributed input source, such as HDFS. # A Standalone Job In Java -Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you using other build systems, consider using the Spark assembly JAR described in the developer guide. +Now say we wanted to write a standalone job using the Java API. We will walk through doing this with Maven. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide. We'll create a very simple Spark job, `SimpleJob.java`: @@ -171,7 +171,7 @@ import spark.api.java.function.Function; public class SimpleJob { public static void main(String[] args) { String logFile = "/var/log/syslog"; // Should be some file on your system - JavaSparkContext sc = new JavaSparkContext("local", "Simple Job", + JavaSparkContext sc = new JavaSparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", "target/simple-project-1.0.jar"); JavaRDD logData = sc.textFile(logFile).cache(); -- cgit v1.2.3 From e782187b4af3b2ffe83e67fee7c783b5dfcd09e5 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 26 Oct 2012 07:31:08 +0000 Subject: Don't throw an error in the block manager when a block is cached on the master due to a locally computed operation Conflicts: core/src/main/scala/spark/storage/BlockManagerMaster.scala --- core/src/main/scala/spark/storage/BlockManagerMaster.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index b3345623b3..ace27e758c 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -243,6 +243,12 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor val startTimeMs = System.currentTimeMillis() val tmp = " " + blockManagerId + " " + blockId + " " + if (!blockManagerInfo.contains(blockManagerId)) { + // Can happen if this is from a locally cached partition on the master + sender ! true + return + } + if (blockId == null) { blockManagerInfo(blockManagerId).updateLastSeenMs() logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs)) -- cgit v1.2.3 From 33bea24f8e5e7b26cc40cef5836b58401845487c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 26 Oct 2012 15:01:28 -0700 Subject: Fix Spark groupId in Scala Programming Guide. --- docs/scala-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 73f8b123be..7350eca837 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -19,7 +19,7 @@ This guide shows each of these features and walks through some samples. It assum To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. If you use sbt or Maven, Spark is available through Maven Central at: - groupId = org.spark_project + groupId = org.spark-project artifactId = spark-core_{{site.SCALA_VERSION}} version = {{site.SPARK_VERSION}} -- cgit v1.2.3 From 96c9bcfd8d3f45fe43b3857a80fa1a42f983970b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 30 Oct 2012 23:32:38 -0700 Subject: Cancel spot instance requests when exiting spark-ec2. --- ec2/spark_ec2.py | 52 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 6a3647b218..c0926e214f 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name): block_device_map = block_map) my_req_ids = [req.id for req in slave_reqs] print "Waiting for spot instances to be granted..." - while True: - time.sleep(10) - reqs = conn.get_all_spot_instance_requests() - id_to_req = {} - for r in reqs: - id_to_req[r.id] = r - active = 0 - instance_ids = [] - for i in my_req_ids: - if id_to_req[i].state == "active": - active += 1 - instance_ids.append(id_to_req[i].instance_id) - if active == opts.slaves: - print "All %d slaves granted" % opts.slaves - reservations = conn.get_all_instances(instance_ids) - slave_nodes = [] - for r in reservations: - slave_nodes += r.instances - break - else: - print "%d of %d slaves granted, waiting longer" % (active, opts.slaves) + try: + while True: + time.sleep(10) + reqs = conn.get_all_spot_instance_requests() + id_to_req = {} + for r in reqs: + id_to_req[r.id] = r + active_instance_ids = [] + for i in my_req_ids: + if i in id_to_req and id_to_req[i].state == "active": + active_instance_ids.append(id_to_req[i].instance_id) + if len(active_instance_ids) == opts.slaves: + print "All %d slaves granted" % opts.slaves + reservations = conn.get_all_instances(active_instance_ids) + slave_nodes = [] + for r in reservations: + slave_nodes += r.instances + break + else: + print "%d of %d slaves granted, waiting longer" % ( + len(active_instance_ids), opts.slaves) + except: + print "Canceling spot instance requests" + conn.cancel_spot_instance_requests(my_req_ids) + # Log a warning if any of these requests actually launched instances: + (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( + conn, opts, cluster_name, die_on_error=False) + running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes) + if running: + print >> stderr, ("WARNING: %d instances are still running" % running) + sys.exit(0) else: # Launch non-spot instances slave_res = image.run(key_name = opts.key_pair, -- cgit v1.2.3 From a7d967a1cad1a1786b77553eb9ae218423f0dfce Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 1 Nov 2012 10:46:38 -0700 Subject: Remove unnecessary hash-map put in MemoryStore --- core/src/main/scala/spark/storage/MemoryStore.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 773970446a..074ca2b8a4 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -37,9 +37,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) tryToPut(blockId, elements, sizeEstimate, true) } else { - val entry = new Entry(bytes, bytes.limit, false) - ensureFreeSpace(blockId, bytes.limit) - synchronized { entries.put(blockId, entry) } tryToPut(blockId, bytes, bytes.limit, false) } } -- cgit v1.2.3 From 594eed31c43ecbefed069d827b388cdd54456277 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 3 Nov 2012 17:02:47 -0700 Subject: Fix check for existing instances during EC2 launch. --- ec2/spark_ec2.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index c0926e214f..2ca4d8020d 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -180,16 +180,12 @@ def launch_cluster(conn, opts, cluster_name): zoo_group.authorize('tcp', 3888, 3888, '0.0.0.0/0') # Check if instances are already running in our groups - print "Checking for running cluster..." - reservations = conn.get_all_instances() - for res in reservations: - group_names = [g.id for g in res.groups] - if master_group.name in group_names or slave_group.name in group_names or zoo_group.name in group_names: - active = [i for i in res.instances if is_active(i)] - if len(active) > 0: - print >> stderr, ("ERROR: There are already instances running in " + - "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) - sys.exit(1) + active_nodes = get_existing_cluster(conn, opts, cluster_name, + die_on_error=False) + if any(active_nodes): + print >> stderr, ("ERROR: There are already instances running in " + + "group %s, %s or %s" % (master_group.name, slave_group.name, zoo_group.name)) + sys.exit(1) # Figure out the latest AMI from our static URL if opts.ami == "latest": -- cgit v1.2.3 From e2b8477487fd6edfabfbaaea8dca97bffb6d0d40 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 6 Nov 2012 15:57:38 -0800 Subject: Made Akka timeout and message frame size configurable, and upped the defaults --- core/src/main/scala/spark/util/AkkaUtils.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index b466b5239c..e67cb0336d 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -25,6 +25,8 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt + val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt + val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] @@ -32,10 +34,11 @@ private[spark] object AkkaUtils { akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d - akka.remote.netty.connection-timeout = 1s + akka.remote.netty.connection-timeout = %ds + akka.remote.netty.message-frame-size = %d MiB akka.remote.netty.execution-pool-size = %d akka.actor.default-dispatcher.throughput = %d - """.format(host, port, akkaThreads, akkaBatchSize)) + """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) -- cgit v1.2.3 From bb1bce79240da22c2677d9f8159683cdf73158c2 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 20 Oct 2012 23:33:37 -0700 Subject: Various fixes to standalone mode and web UI: - Don't report a job as finishing multiple times - Don't show state of workers as LOADING when they're running - Show start and finish times in web UI - Sort web UI tables by ID and time by default --- .../main/scala/spark/deploy/DeployMessage.scala | 6 ++-- core/src/main/scala/spark/deploy/WebUI.scala | 30 ++++++++++++++++++++ .../main/scala/spark/deploy/master/JobInfo.scala | 23 +++++++++++++-- .../main/scala/spark/deploy/master/Master.scala | 33 ++++++++++++---------- .../scala/spark/deploy/master/MasterWebUI.scala | 2 +- .../main/scala/spark/deploy/worker/Worker.scala | 2 +- .../twirl/spark/deploy/master/index.scala.html | 15 ++++++---- .../twirl/spark/deploy/master/job_row.scala.html | 11 ++++++-- .../twirl/spark/deploy/master/job_table.scala.html | 9 +++--- .../spark/deploy/master/worker_row.scala.html | 6 ++-- .../spark/deploy/master/worker_table.scala.html | 4 +-- .../spark/deploy/worker/executor_row.scala.html | 8 +++--- .../twirl/spark/deploy/worker/index.scala.html | 6 ++-- 13 files changed, 110 insertions(+), 45 deletions(-) create mode 100644 core/src/main/scala/spark/deploy/WebUI.scala diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index d2b63d6e0d..7a1089c816 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -67,8 +67,8 @@ private[spark] case object RequestMasterState // Master to MasterWebUI private[spark] -case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], - completedJobs: List[JobInfo]) +case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo], + completedJobs: Array[JobInfo]) // WorkerWebUI to Worker private[spark] case object RequestWorkerState @@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, - coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file + coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala new file mode 100644 index 0000000000..ad1a1092b2 --- /dev/null +++ b/core/src/main/scala/spark/deploy/WebUI.scala @@ -0,0 +1,30 @@ +package spark.deploy + +import java.text.SimpleDateFormat +import java.util.Date + +/** + * Utilities used throughout the web UI. + */ +private[spark] object WebUI { + val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + def formatDate(date: Date): String = DATE_FORMAT.format(date) + + def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + return "%.1f h".format(hours) + } +} diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index 8795c09cc1..130b031a2a 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -5,11 +5,17 @@ import java.util.Date import akka.actor.ActorRef import scala.collection.mutable -private[spark] -class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { +private[spark] class JobInfo( + val startTime: Long, + val id: String, + val desc: JobDescription, + val submitDate: Date, + val actor: ActorRef) +{ var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] var coresGranted = 0 + var endTime = -1L private var nextExecutorId = 0 @@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va _retryCount += 1 _retryCount } + + def markFinished(endState: JobState.Value) { + state = endState + endTime = System.currentTimeMillis() + } + + def duration: Long = { + if (endTime != -1) { + endTime - startTime + } else { + System.currentTimeMillis() - startTime + } + } } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 6010f7cff2..5ef7411f4d 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -123,7 +123,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } case RequestMasterState => { - sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList) + sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray) } } @@ -179,8 +179,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { - val date = new Date - val job = new JobInfo(newJobId(date), desc, date, actor) + val now = System.currentTimeMillis() + val date = new Date(now) + val job = new JobInfo(now, newJobId(date), desc, date, actor) jobs += job idToJob(job.id) = job actorToJob(sender) = job @@ -189,19 +190,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def removeJob(job: JobInfo) { - logInfo("Removing job " + job.id) - jobs -= job - idToJob -= job.id - actorToJob -= job.actor - addressToWorker -= job.actor.path.address - completedJobs += job // Remember it in our history - waitingJobs -= job - for (exec <- job.executors.values) { - exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.job.id, exec.id) + if (jobs.contains(job)) { + logInfo("Removing job " + job.id) + jobs -= job + idToJob -= job.id + actorToJob -= job.actor + addressToWorker -= job.actor.path.address + completedJobs += job // Remember it in our history + waitingJobs -= job + for (exec <- job.executors.values) { + exec.worker.removeExecutor(exec) + exec.worker.actor ! KillExecutor(exec.job.id, exec.id) + } + job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed + schedule() } - job.state = JobState.FINISHED - schedule() } /** Generate a new job ID given a job's submission date */ diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 700a41c770..3cdd3721f5 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct // A bit ugly an inefficient, but we won't have a number of jobs // so large that it will make a significant difference. - (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match { + (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match { case Some(job) => spark.deploy.master.html.job_details.render(job) case _ => null } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 474c9364fd..67d41dda29 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -123,7 +123,7 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None) + master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None) case ExecutorStateChanged(jobId, execId, state, message) => master ! ExecutorStateChanged(jobId, execId, state, message) diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html index 7562076b00..18c32e5a1f 100644 --- a/core/src/main/twirl/spark/deploy/master/index.scala.html +++ b/core/src/main/twirl/spark/deploy/master/index.scala.html @@ -1,5 +1,6 @@ @(state: spark.deploy.MasterState) @import spark.deploy.master._ +@import spark.Utils @spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) { @@ -8,9 +9,11 @@
  • URL: spark://@(state.uri)
  • -
  • Number of Workers: @state.workers.size
  • -
  • Cores: @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used
  • -
  • Memory: @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used
  • +
  • Workers: @state.workers.size
  • +
  • Cores: @{state.workers.map(_.cores).sum} Total, + @{state.workers.map(_.coresUsed).sum} Used
  • +
  • Memory: @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, + @{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used
  • Jobs: @state.activeJobs.size Running, @state.completedJobs.size Completed
@@ -21,7 +24,7 @@

Cluster Summary


- @worker_table(state.workers) + @worker_table(state.workers.sortBy(_.id))
@@ -32,7 +35,7 @@

Running Jobs


- @job_table(state.activeJobs) + @job_table(state.activeJobs.sortBy(_.startTime).reverse)
@@ -43,7 +46,7 @@

Completed Jobs


- @job_table(state.completedJobs) + @job_table(state.completedJobs.sortBy(_.endTime).reverse)
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html index 7c4865bb6e..fff7953e7d 100644 --- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html @@ -1,5 +1,9 @@ @(job: spark.deploy.master.JobInfo) +@import spark.Utils +@import spark.deploy.WebUI.formatDate +@import spark.deploy.WebUI.formatDuration + @job.id @@ -13,8 +17,9 @@ , @job.coresLeft } - @job.desc.memoryPerSlave - @job.submitDate + @Utils.memoryMegabytesToString(job.desc.memoryPerSlave) + @formatDate(job.submitDate) @job.desc.user @job.state.toString() - \ No newline at end of file + @formatDuration(job.duration) + diff --git a/core/src/main/twirl/spark/deploy/master/job_table.scala.html b/core/src/main/twirl/spark/deploy/master/job_table.scala.html index 52bad6c4b8..d267d6e85e 100644 --- a/core/src/main/twirl/spark/deploy/master/job_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_table.scala.html @@ -1,4 +1,4 @@ -@(jobs: List[spark.deploy.master.JobInfo]) +@(jobs: Array[spark.deploy.master.JobInfo]) @@ -6,10 +6,11 @@ - - + + + @@ -17,4 +18,4 @@ @job_row(j) } -
JobID Description CoresMemory per SlaveSubmit DateMemory per NodeSubmit Time User StateDuration
\ No newline at end of file + diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html index 017cc4859e..3dcba3a545 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html @@ -1,11 +1,13 @@ @(worker: spark.deploy.master.WorkerInfo) +@import spark.Utils + @worker.id @{worker.host}:@{worker.port} @worker.cores (@worker.coresUsed Used) - @{spark.Utils.memoryMegabytesToString(worker.memory)} - (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used) + @{Utils.memoryMegabytesToString(worker.memory)} + (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used) diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html index 2028842297..fad1af41dc 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html @@ -1,4 +1,4 @@ -@(workers: List[spark.deploy.master.WorkerInfo]) +@(workers: Array[spark.deploy.master.WorkerInfo]) @@ -14,4 +14,4 @@ @worker_row(w) } -
\ No newline at end of file + diff --git a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html index c3842dbf85..ea9542461e 100644 --- a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html @@ -1,20 +1,20 @@ @(executor: spark.deploy.worker.ExecutorRunner) +@import spark.Utils + @executor.execId @executor.cores - @executor.memory + @Utils.memoryMegabytesToString(executor.memory)
  • ID: @executor.jobId
  • Name: @executor.jobDesc.name
  • User: @executor.jobDesc.user
  • -
  • Cores: @executor.jobDesc.cores
  • -
  • Memory per Slave: @executor.jobDesc.memoryPerSlave
stdout stderr - \ No newline at end of file + diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html index 69746ed02c..b247307dab 100644 --- a/core/src/main/twirl/spark/deploy/worker/index.scala.html +++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html @@ -1,5 +1,7 @@ @(worker: spark.deploy.WorkerState) +@import spark.Utils + @spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) { @@ -12,8 +14,8 @@ (WebUI at @worker.masterWebUiUrl)
  • Cores: @worker.cores (@worker.coresUsed Used)
  • -
  • Memory: @{spark.Utils.memoryMegabytesToString(worker.memory)} - (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
  • +
  • Memory: @{Utils.memoryMegabytesToString(worker.memory)} + (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
  • -- cgit v1.2.3 From 809b2bb1fe92c8ce733ce082c5f6e31316e05a61 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 7 Nov 2012 15:35:51 -0800 Subject: fix bug in getting slave id out of mesos --- core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index cdfe1f2563..814443fa52 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -272,7 +272,7 @@ private[spark] class MesosSchedulerBackend( synchronized { slaveIdsWithExecutors -= slaveId.getValue } - scheduler.slaveLost(slaveId.toString) + scheduler.slaveLost(slaveId.getValue) } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { -- cgit v1.2.3 From 66cbdee941ee12eac5eea38709d542938bba575a Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 8 Nov 2012 09:53:40 -0800 Subject: Fix for connections not being reused (from Josh Rosen) --- core/src/main/scala/spark/network/ConnectionManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index da39108164..642fa4b525 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -304,7 +304,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging { connectionRequests += newConnection newConnection } - val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection()) + val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress) + val connection = connectionsById.getOrElse(lookupKey, startNewConnection()) message.senderAddress = id.toSocketAddress() logDebug("Sending [" + message + "] to [" + connectionManagerId + "]") /*connection.send(message)*/ -- cgit v1.2.3 From 6607f546ccadf307b0a862f1b52ab0b12316420d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 8 Nov 2012 23:13:12 -0800 Subject: Added an option to spread out jobs in the standalone mode. --- .../main/scala/spark/deploy/master/Master.scala | 63 +++++++++++++++++----- .../scala/spark/deploy/master/WorkerInfo.scala | 4 ++ .../twirl/spark/deploy/master/job_row.scala.html | 7 +-- 3 files changed, 56 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 5ef7411f4d..7e5cd6b171 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -31,6 +31,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor val waitingJobs = new ArrayBuffer[JobInfo] val completedJobs = new ArrayBuffer[JobInfo] + // As a temporary workaround before better ways of configuring memory, we allow users to set + // a flag that will perform round-robin scheduling across the nodes (spreading out each job + // among all the nodes) instead of trying to consolidate each job onto a small # of nodes. + val spreadOutJobs = System.getProperty("spark.deploy.spreadOut", "false").toBoolean + override def preStart() { logInfo("Starting Spark master at spark://" + ip + ":" + port) // Listen for remote client disconnection events, since they don't go through Akka's watch() @@ -127,24 +132,58 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } + /** + * Can a job use the given worker? True if the worker has enough memory and we haven't already + * launched an executor for the job on it (right now the standalone backend doesn't like having + * two executors on the same worker). + */ + def canUse(job: JobInfo, worker: WorkerInfo): Boolean = { + worker.memoryFree >= job.desc.memoryPerSlave && !worker.hasExecutor(job) + } + /** * Schedule the currently available resources among waiting jobs. This method will be called * every time a new job joins or resource availability changes. */ def schedule() { - // Right now this is a very simple FIFO scheduler. We keep looking through the jobs - // in order of submission time and launching the first one that fits on each node. - for (worker <- workers if worker.coresFree > 0) { - for (job <- waitingJobs.clone()) { - val jobMemory = job.desc.memoryPerSlave - if (worker.memoryFree >= jobMemory) { - val coresToUse = math.min(worker.coresFree, job.coresLeft) - val exec = job.addExecutor(worker, coresToUse) - launchExecutor(worker, exec) + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first job + // in the queue, then the second job, etc. + if (spreadOutJobs) { + // Try to spread out each job among all the nodes, until it has all its cores + for (job <- waitingJobs if job.coresLeft > 0) { + val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse + val numUsable = usableWorkers.length + val assigned = new Array[Int](numUsable) // Number of cores to give on each node + var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum) + var pos = 0 + while (toAssign > 0) { + if (usableWorkers(pos).coresFree - assigned(pos) > 0) { + toAssign -= 1 + assigned(pos) += 1 + } + pos = (pos + 1) % numUsable } - if (job.coresLeft == 0) { - waitingJobs -= job - job.state = JobState.RUNNING + // Now that we've decided how many cores to give on each node, let's actually give them + for (pos <- 0 until numUsable) { + if (assigned(pos) > 0) { + val exec = job.addExecutor(usableWorkers(pos), assigned(pos)) + launchExecutor(usableWorkers(pos), exec) + job.state = JobState.RUNNING + } + } + } + } else { + // Pack each job into as few nodes as possible until we've assigned all its cores + for (worker <- workers if worker.coresFree > 0) { + for (job <- waitingJobs if job.coresLeft > 0) { + if (canUse(job, worker)) { + val coresToUse = math.min(worker.coresFree, job.coresLeft) + if (coresToUse > 0) { + val exec = job.addExecutor(worker, coresToUse) + launchExecutor(worker, exec) + job.state = JobState.RUNNING + } + } } } } diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index 16b3f9b653..706b1453aa 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -33,6 +33,10 @@ private[spark] class WorkerInfo( memoryUsed -= exec.memory } } + + def hasExecutor(job: JobInfo): Boolean = { + executors.values.exists(_.job == job) + } def webUiAddress : String = { "http://" + this.host + ":" + this.webUiPort diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html index fff7953e7d..7c466a6a2c 100644 --- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html @@ -10,12 +10,7 @@ @job.desc.name - @job.coresGranted Granted - @if(job.desc.cores == Integer.MAX_VALUE) { - - } else { - , @job.coresLeft - } + @job.coresGranted @Utils.memoryMegabytesToString(job.desc.memoryPerSlave) @formatDate(job.submitDate) -- cgit v1.2.3 From de00bc63dbc8db334f28fcb428e578919a9df7a1 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 9 Nov 2012 14:09:37 -0800 Subject: Fixed deadlock in BlockManager. 1. Changed the lock structure of BlockManager by replacing the 337 coarse-grained locks to use BlockInfo objects as per-block fine-grained locks. 2. Changed the MemoryStore lock structure by making the block putting threads lock on a different object (not the memory store) thus making sure putting threads minimally blocks to the getting treads. 3. Added spark.storage.ThreadingTest to stress test the BlockManager using 5 block producer and 5 block consumer threads. --- .../main/scala/spark/storage/BlockManager.scala | 111 ++++++++++----------- .../src/main/scala/spark/storage/MemoryStore.scala | 79 +++++++++------ .../main/scala/spark/storage/ThreadingTest.scala | 77 ++++++++++++++ 3 files changed, 180 insertions(+), 87 deletions(-) create mode 100644 core/src/main/scala/spark/storage/ThreadingTest.scala diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bd9155ef29..bf52b510b4 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -50,16 +50,6 @@ private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) - -private[spark] class BlockLocker(numLockers: Int) { - private val hashLocker = Array.fill(numLockers)(new Object()) - - def getLock(blockId: String): Object = { - return hashLocker(math.abs(blockId.hashCode % numLockers)) - } -} - - private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { @@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - private val NUM_LOCKS = 337 - private val locker = new BlockLocker(NUM_LOCKS) - - private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() + private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000) private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = @@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val maxBytesInFlight = System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + // Whether to compress broadcast variables that are stored val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean + // Whether to compress shuffle output that are stored val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean // Whether to compress RDD partitions that are stored serialized val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean @@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. */ def reportBlockStatus(blockId: String) { - locker.getLock(blockId).synchronized { - val curLevel = blockInfo.get(blockId) match { - case null => - StorageLevel.NONE - case info => + + val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { + case null => + (StorageLevel.NONE, 0L, 0L) + case info => + info.synchronized { info.level match { case null => - StorageLevel.NONE + (StorageLevel.NONE, 0L, 0L) case level => val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) - new StorageLevel(onDisk, inMem, level.deserialized, level.replication) + ( + new StorageLevel(onDisk, inMem, level.deserialized, level.replication), + if (inMem) memoryStore.getSize(blockId) else 0L, + if (onDisk) diskStore.getSize(blockId) else 0L + ) } - } - master.mustHeartBeat(HeartBeat( - blockManagerId, - blockId, - curLevel, - if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L, - if (curLevel.useDisk) diskStore.getSize(blockId) else 0L)) - logDebug("Told master about block " + blockId) + } } + master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize)) + logDebug("Told master about block " + blockId) } /** @@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { info.waitForReady() // In case the block is still being put() by another thread val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } } - } else { - logDebug("Block " + blockId + " not registered locally") } + } else { + logDebug("Block " + blockId + " not registered locally") } return None } @@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { info.waitForReady() // In case the block is still being put() by another thread val level = info.level logDebug("Level for block " + blockId + " is " + level) @@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new Exception("Block " + blockId + " not found on disk, though it should be") } } - } else { - logDebug("Block " + blockId + " not registered locally") } + } else { + logDebug("Block " + blockId + " not registered locally") } return None } @@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Size of the block in bytes (to return to caller) var size = 0L - locker.getLock(blockId).synchronized { + myInfo.synchronized { logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m null } - locker.getLock(blockId).synchronized { + myInfo.synchronized { logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") @@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") - locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - val level = info.level - if (level.useDisk && !diskStore.contains(blockId)) { - logInfo("Writing block " + blockId + " to disk") - data match { - case Left(elements) => - diskStore.putValues(blockId, elements, level, false) - case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) + val info = blockInfo.get(blockId) + if (info != null) { + info.synchronized { + val level = info.level + if (level.useDisk && !diskStore.contains(blockId)) { + logInfo("Writing block " + blockId + " to disk") + data match { + case Left(elements) => + diskStore.putValues(blockId, elements, level, false) + case Right(bytes) => + diskStore.putBytes(blockId, bytes, level) + } + } + memoryStore.remove(blockId) + if (info.tellMaster) { + reportBlockStatus(blockId) + } + if (!level.useDisk) { + // The block is completely gone from this node; forget it so we can put() it again later. + blockInfo.remove(blockId) } } - memoryStore.remove(blockId) - if (info.tellMaster) { - reportBlockStatus(blockId) - } - if (!level.useDisk) { - // The block is completely gone from this node; forget it so we can put() it again later. - blockInfo.remove(blockId) - } + } else { + // The block has already been dropped } } diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 074ca2b8a4..241200c07f 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L + // Object used to ensure that only one thread is putting blocks and if necessary, dropping + // blocks from the memory store. + private val putLock = new Object() + logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) def freeMemory: Long = maxMemory - currentMemory override def getSize(blockId: String): Long = { - synchronized { + entries.synchronized { entries.get(blockId).size } } @@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getBytes(blockId: String): Option[ByteBuffer] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getValues(blockId: String): Option[Iterator[Any]] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def remove(blockId: String) { - synchronized { + entries.synchronized { val entry = entries.get(blockId) if (entry != null) { entries.remove(blockId) @@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def clear() { - synchronized { + entries.synchronized { entries.clear() } logInfo("MemoryStore cleared") @@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Try to put in a set of values, if we can free up enough space. The value should either be * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) * size must also be passed by the caller. + * + * Locks on the object putLock to ensure that all the put requests and its associated block + * dropping is done by only on thread at a time. Otherwise while one thread is dropping + * blocks to free memory for one block, another thread may use up the freed space for + * another block. */ private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = { - synchronized { + // TODO: Its possible to optimize the locking by locking entries only when selecting blocks + // to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been + // released, it must be ensured that those to-be-dropped blocks are not double counted for + // freeing up more space for another block that needs to be put. Only then the actually dropping + // of blocks (and writing to disk if necessary) can proceed in parallel. + putLock.synchronized { if (ensureFreeSpace(blockId, size)) { val entry = new Entry(value, size, deserialized) - entries.put(blockId, entry) + entries.synchronized { entries.put(blockId, entry) } currentMemory += size if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( @@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space - * might fill up before the caller puts in their new value.) + * Assumes that a lock is held by the caller to ensure only one thread is dropping blocks. + * Otherwise, the freed space may fill up before the caller puts in their new value. */ private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { + logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) @@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return false } - // TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks - // in order to allow parallelism in writing to disk if (maxMemory - currentMemory < space) { val rddToAdd = getRddId(blockIdToAdd) val selectedBlocks = new ArrayBuffer[String]() var selectedMemory = 0L - val iterator = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd != null && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + // This is synchronized to ensure that the set of entries is not changed + // (because of getValue or getBytes) while traversing the iterator, as that + // can lead to exceptions. + entries.synchronized { + val iterator = entries.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + if (rddToAdd != null && rddToAdd == getRddId(blockId)) { + logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + + "block from the same RDD") + return false + } + selectedBlocks += blockId + selectedMemory += pair.getValue.size } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } if (maxMemory - (currentMemory - selectedMemory) >= space) { logInfo(selectedBlocks.size + " blocks selected for dropping") for (blockId <- selectedBlocks) { - val entry = entries.get(blockId) - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + val entry = entries.synchronized { entries.get(blockId) } + // This should never be null as only one thread should be dropping + // blocks and removing entries. However the check is still here for + // future safety. + if (entries != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + blockManager.dropFromMemory(blockId, data) } - blockManager.dropFromMemory(blockId, data) } return true } else { @@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def contains(blockId: String): Boolean = { - synchronized { entries.containsKey(blockId) } + entries.synchronized { entries.containsKey(blockId) } } } diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala new file mode 100644 index 0000000000..13e2f20e64 --- /dev/null +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -0,0 +1,77 @@ +package spark.storage + +import akka.actor._ + +import spark.KryoSerializer +import java.util.concurrent.ArrayBlockingQueue +import util.Random + +/** + * This class tests the BlockManager and MemoryStore for thread safety and + * deadlocks. It spawns a number of producer and consumer threads. Producer + * threads continuously pushes blocks into the BlockManager and consumer + * threads continuously retrieves the blocks form the BlockManager and tests + * whether the block is correct or not. + */ +private[spark] object ThreadingTest { + + val numProducers = 5 + val numBlocksPerProducer = 10000 + + private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { + val queue = new ArrayBlockingQueue[(String, Seq[Int])](100) + + override def run() { + for (i <- 1 to numBlocksPerProducer) { + val blockId = "b-" + id + "-" + i + val blockSize = Random.nextInt(1000) + val block = (1 to blockSize).map(_ => Random.nextInt()) + val level = if (Random.nextBoolean()) StorageLevel.MEMORY_ONLY_SER else StorageLevel.MEMORY_AND_DISK + val startTime = System.currentTimeMillis() + manager.put(blockId, block.iterator, level, true) + println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") + queue.add((blockId, block)) + } + println("Producer thread " + id + " terminated") + } + } + + private[spark] class ConsumerThread(manager: BlockManager, queue: ArrayBlockingQueue[(String, Seq[Int])]) extends Thread { + var numBlockConsumed = 0 + + override def run() { + println("Consumer thread started") + while(numBlockConsumed < numBlocksPerProducer) { + val (blockId, block) = queue.take() + val startTime = System.currentTimeMillis() + manager.get(blockId) match { + case Some(retrievedBlock) => + assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match") + println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") + case None => + assert(false, "Block " + blockId + " could not be retrieved") + } + numBlockConsumed += 1 + } + println("Consumer thread terminated") + } + } + + def main(args: Array[String]) { + System.setProperty("spark.kryoserializer.buffer.mb", "1") + val actorSystem = ActorSystem("test") + val serializer = new KryoSerializer + val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true) + val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024) + val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) + val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue)) + producers.foreach(_.start) + consumers.foreach(_.start) + producers.foreach(_.join) + consumers.foreach(_.join) + blockManager.stop() + blockManagerMaster.stop() + actorSystem.shutdown() + actorSystem.awaitTermination() + } +} -- cgit v1.2.3 From 9915989bfa242a6f82a7b847ad25e434067da5cf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 9 Nov 2012 15:46:15 -0800 Subject: Incorporated Matei's suggestions. Tested with 5 producer(consumer) threads each doing 50k puts (gets), took 15 minutes to run, no errors or deadlocks. --- core/src/main/scala/spark/storage/MemoryStore.scala | 2 +- .../src/main/scala/spark/storage/ThreadingTest.scala | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 241200c07f..02098b82fe 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) // This should never be null as only one thread should be dropping // blocks and removing entries. However the check is still here for // future safety. - if (entries != null) { + if (entry != null) { val data = if (entry.deserialized) { Left(entry.value.asInstanceOf[ArrayBuffer[Any]]) } else { diff --git a/core/src/main/scala/spark/storage/ThreadingTest.scala b/core/src/main/scala/spark/storage/ThreadingTest.scala index 13e2f20e64..e4a5b8ffdf 100644 --- a/core/src/main/scala/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/spark/storage/ThreadingTest.scala @@ -16,7 +16,7 @@ import util.Random private[spark] object ThreadingTest { val numProducers = 5 - val numBlocksPerProducer = 10000 + val numBlocksPerProducer = 20000 private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread { val queue = new ArrayBlockingQueue[(String, Seq[Int])](100) @@ -26,7 +26,7 @@ private[spark] object ThreadingTest { val blockId = "b-" + id + "-" + i val blockSize = Random.nextInt(1000) val block = (1 to blockSize).map(_ => Random.nextInt()) - val level = if (Random.nextBoolean()) StorageLevel.MEMORY_ONLY_SER else StorageLevel.MEMORY_AND_DISK + val level = randomLevel() val startTime = System.currentTimeMillis() manager.put(blockId, block.iterator, level, true) println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") @@ -34,9 +34,21 @@ private[spark] object ThreadingTest { } println("Producer thread " + id + " terminated") } + + def randomLevel(): StorageLevel = { + math.abs(Random.nextInt()) % 4 match { + case 0 => StorageLevel.MEMORY_ONLY + case 1 => StorageLevel.MEMORY_ONLY_SER + case 2 => StorageLevel.MEMORY_AND_DISK + case 3 => StorageLevel.MEMORY_AND_DISK_SER + } + } } - private[spark] class ConsumerThread(manager: BlockManager, queue: ArrayBlockingQueue[(String, Seq[Int])]) extends Thread { + private[spark] class ConsumerThread( + manager: BlockManager, + queue: ArrayBlockingQueue[(String, Seq[Int])] + ) extends Thread { var numBlockConsumed = 0 override def run() { @@ -73,5 +85,7 @@ private[spark] object ThreadingTest { blockManagerMaster.stop() actorSystem.shutdown() actorSystem.awaitTermination() + println("Everything stopped.") + println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.") } } -- cgit v1.2.3 From acf827232458e87773a71a38f88cb7ba9a6ab77e Mon Sep 17 00:00:00 2001 From: root Date: Sun, 11 Nov 2012 07:05:22 +0000 Subject: Fix K-means example a little --- core/src/main/scala/spark/util/Vector.scala | 3 ++- .../main/scala/spark/examples/SparkKMeans.scala | 27 +++++++++------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala index 4e95ac2ac6..03559751bc 100644 --- a/core/src/main/scala/spark/util/Vector.scala +++ b/core/src/main/scala/spark/util/Vector.scala @@ -49,7 +49,7 @@ class Vector(val elements: Array[Double]) extends Serializable { return ans } - def +=(other: Vector) { + def += (other: Vector): Vector = { if (length != other.length) throw new IllegalArgumentException("Vectors of different length") var ans = 0.0 @@ -58,6 +58,7 @@ class Vector(val elements: Array[Double]) extends Serializable { elements(i) += other(i) i += 1 } + this } def * (scale: Double): Vector = Vector(length, i => this(i) * scale) diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index adce551322..6375961390 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -15,14 +15,13 @@ object SparkKMeans { return new Vector(line.split(' ').map(_.toDouble)) } - def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + def closestPoint(p: Vector, centers: Array[Vector]): Int = { var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity - for (i <- 1 to centers.size) { - val vCurr = centers.get(i).get - val tempDist = p.squaredDist(vCurr) + for (i <- 0 until centers.length) { + val tempDist = p.squaredDist(centers(i)) if (tempDist < closest) { closest = tempDist bestIndex = i @@ -43,32 +42,28 @@ object SparkKMeans { val K = args(2).toInt val convergeDist = args(3).toDouble - var points = data.takeSample(false, K, 42) - var kPoints = new HashMap[Int, Vector] + var kPoints = data.takeSample(false, K, 42).toArray var tempDist = 1.0 - - for (i <- 1 to points.size) { - kPoints.put(i, points(i-1)) - } while(tempDist > convergeDist) { var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} + var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect() + var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() tempDist = 0.0 - for (pair <- newPoints) { - tempDist += kPoints.get(pair._1).get.squaredDist(pair._2) + for (i <- 0 until K) { + tempDist += kPoints(i).squaredDist(newPoints(i)) } for (newP <- newPoints) { - kPoints.put(newP._1, newP._2) + kPoints(newP._1) = newP._2 } } - println("Final centers: " + kPoints) + println("Final centers:") + kPoints.foreach(println) System.exit(0) } } -- cgit v1.2.3 From 173e0354c0fc95d63112c7ff7121d8ff39f961b7 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 11 Nov 2012 21:06:57 -0800 Subject: Detect correctly when one has disconnected from a standalone cluster. SPARK-617 #resolve --- core/src/main/scala/spark/deploy/client/Client.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index e51b0c5c15..c57a1d33e9 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -35,6 +35,7 @@ private[spark] class Client( class ClientActor extends Actor with Logging { var master: ActorRef = null + var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { @@ -43,6 +44,7 @@ private[spark] class Client( val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) + masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing @@ -72,7 +74,17 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse("")) } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case Terminated(actor_) if actor_ == master => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientDisconnected(transport, address) if address == masterAddress => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientShutdown(transport, address) if address == masterAddress => logError("Connection to master failed; stopping client") markDisconnected() context.stop(self) -- cgit v1.2.3 From 59e648c08190056605002f5b79bc09b12919934a Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 14 Nov 2012 22:37:05 -0800 Subject: Fix Java/Scala home having spaces on Windows --- run2.cmd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run2.cmd b/run2.cmd index 097718b526..333d0506b0 100644 --- a/run2.cmd +++ b/run2.cmd @@ -63,5 +63,5 @@ if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner set EXTRA_ARGS=%JAVA_OPTS% :run_spark -%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %* +"%RUNNER%" -cp "%CLASSPATH%" %EXTRA_ARGS% %* :exit -- cgit v1.2.3 From c23a74df0ab1ab105a3ad6b70e93bc0aa614771d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 15 Nov 2012 00:10:52 -0800 Subject: Use DNS names instead of IP addresses in standalone mode, to allow matching with data locality hints from storage systems. --- core/src/main/scala/spark/deploy/master/MasterArguments.scala | 4 ++-- core/src/main/scala/spark/deploy/worker/WorkerArguments.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/spark/deploy/master/MasterArguments.scala index 1b1c3dd0ad..4ceab3fc03 100644 --- a/core/src/main/scala/spark/deploy/master/MasterArguments.scala +++ b/core/src/main/scala/spark/deploy/master/MasterArguments.scala @@ -7,7 +7,7 @@ import spark.Utils * Command-line parser for the master. */ private[spark] class MasterArguments(args: Array[String]) { - var ip = Utils.localIpAddress() + var ip = Utils.localHostName() var port = 7077 var webUiPort = 8080 @@ -59,4 +59,4 @@ private[spark] class MasterArguments(args: Array[String]) { " --webui-port PORT Port for web UI (default: 8080)") System.exit(exitCode) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala index 60dc107a4c..340920025b 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerArguments.scala @@ -9,7 +9,7 @@ import java.lang.management.ManagementFactory * Command-line parser for the master. */ private[spark] class WorkerArguments(args: Array[String]) { - var ip = Utils.localIpAddress() + var ip = Utils.localHostName() var port = 0 var webUiPort = 8081 var cores = inferDefaultCores() @@ -110,4 +110,4 @@ private[spark] class WorkerArguments(args: Array[String]) { // Leave out 1 GB for the operating system, but don't return a negative memory size math.max(totalMb - 1024, 512) } -} \ No newline at end of file +} -- cgit v1.2.3 From 1f5a7e0e647c15be54a8cce0e2f5f3f83d4ea541 Mon Sep 17 00:00:00 2001 From: mbautin Date: Thu, 15 Nov 2012 13:44:13 -0800 Subject: SPARK-624: make the default local IP customizable --- core/src/main/scala/spark/Utils.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 567c4b1475..9805105ea8 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -199,7 +199,13 @@ private object Utils extends Logging { /** * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4). */ - def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress + def localIpAddress(): String = { + val defaultIpOverride = System.getenv("SPARK_DEFAULT_LOCAL_IP") + if (defaultIpOverride != null) + defaultIpOverride + else + InetAddress.getLocalHost.getHostAddress + } private var customHostname: Option[String] = None -- cgit v1.2.3 From 6d22f7ccb80f21f0622a3740d8fb3acd66a5b29e Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Fri, 16 Nov 2012 14:02:43 -0800 Subject: Delete security groups when deleting the cluster. As many operations are done on instances in specific security groups, this seems like a reasonable thing to clean up. --- ec2/spark_ec2.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ca4d8020d..17276db6e5 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -509,6 +509,20 @@ def main(): print "Terminating zoo..." for inst in zoo_nodes: inst.terminate() + # Delete security groups as well + group_names = [cluster_name + "-master", cluster_name + "-slaves", cluster_name + "-zoo"] + groups = conn.get_all_security_groups() + for group in groups: + if group.name in group_names: + print "Deleting security group " + group.name + # Delete individual rules before deleting group to remove dependencies + for rule in group.rules: + for grant in rule.grants: + group.revoke(ip_protocol=rule.ip_protocol, + from_port=rule.from_port, + to_port=rule.to_port, + src_group=grant) + conn.delete_security_group(group.name) elif action == "login": (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster( -- cgit v1.2.3 From 32442ee1e109d834d2359506f0161df8df8caf03 Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Fri, 16 Nov 2012 17:25:28 -0800 Subject: Giving the Spark EC2 script the ability to launch instances spread across multiple availability zones in order to make the cluster more resilient to failure --- ec2/spark_ec2.py | 80 ++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 2ca4d8020d..a3138d6ef7 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -61,7 +61,8 @@ def parse_args(): parser.add_option("-r", "--region", default="us-east-1", help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", - help="Availability zone to launch instances in") + help="Availability zone to launch instances in, or 'all' to spread " + + "slaves across multiple") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") @@ -217,17 +218,25 @@ def launch_cluster(conn, opts, cluster_name): # Launch spot instances with the requested price print ("Requesting %d slaves as spot instances with price $%.3f" % (opts.slaves, opts.spot_price)) - slave_reqs = conn.request_spot_instances( - price = opts.spot_price, - image_id = opts.ami, - launch_group = "launch-group-%s" % cluster_name, - placement = opts.zone, - count = opts.slaves, - key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - block_device_map = block_map) - my_req_ids = [req.id for req in slave_reqs] + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + my_req_ids = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_reqs = conn.request_spot_instances( + price = opts.spot_price, + image_id = opts.ami, + launch_group = "launch-group-%s" % cluster_name, + placement = zone, + count = num_slaves_this_zone, + key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + block_device_map = block_map) + my_req_ids += [req.id for req in slave_reqs] + i += 1 + print "Waiting for spot instances to be granted..." try: while True: @@ -262,20 +271,30 @@ def launch_cluster(conn, opts, cluster_name): sys.exit(0) else: # Launch non-spot instances - slave_res = image.run(key_name = opts.key_pair, - security_groups = [slave_group], - instance_type = opts.instance_type, - placement = opts.zone, - min_count = opts.slaves, - max_count = opts.slaves, - block_device_map = block_map) - slave_nodes = slave_res.instances - print "Launched slaves, regid = " + slave_res.id + zones = get_zones(conn, opts) + num_zones = len(zones) + i = 0 + slave_nodes = [] + for zone in zones: + num_slaves_this_zone = get_partition(opts.slaves, num_zones, i) + slave_res = image.run(key_name = opts.key_pair, + security_groups = [slave_group], + instance_type = opts.instance_type, + placement = zone, + min_count = num_slaves_this_zone, + max_count = num_slaves_this_zone, + block_device_map = block_map) + slave_nodes += slave_res.instances + print "Launched %d slaves in %s, regid = %s" % (num_slaves_this_zone, + zone, slave_res.id) + i += 1 # Launch masters master_type = opts.master_instance_type if master_type == "": master_type = opts.instance_type + if opts.zone == 'all': + opts.zone = random.choice(conn.get_all_zones()).name master_res = image.run(key_name = opts.key_pair, security_groups = [master_group], instance_type = master_type, @@ -284,7 +303,7 @@ def launch_cluster(conn, opts, cluster_name): max_count = 1, block_device_map = block_map) master_nodes = master_res.instances - print "Launched master, regid = " + master_res.id + print "Launched master in %s, regid = %s" % (zone, master_res.id) zoo_nodes = [] @@ -474,6 +493,23 @@ def ssh(host, opts, command): (opts.identity_file, opts.user, host, command), shell=True) +# Gets a list of zones to launch instances in +def get_zones(conn, opts): + if opts.zone == 'all': + zones = [z.name for z in conn.get_all_zones()] + else: + zones = [opts.zone] + return zones + + +# Gets the number of items in a partition +def get_partition(total, num_partitions, current_partitions): + num_slaves_this_zone = total / num_partitions + if (total % num_partitions) - current_partitions > 0: + num_slaves_this_zone += 1 + return num_slaves_this_zone + + def main(): (opts, action, cluster_name) = parse_args() conn = boto.ec2.connect_to_region(opts.region) -- cgit v1.2.3 From 12c24e786c9f2eec02131a2bc7a5bb463797aa2a Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Thu, 15 Nov 2012 16:43:17 -0800 Subject: Set default uncaught exception handler to exit. Among other things, should prevent OutOfMemoryErrors in some daemon threads (such as the network manager) from causing a spark executor to enter a state where it cannot make progress but does not report an error. --- core/src/main/scala/spark/SparkEnv.scala | 1 - core/src/main/scala/spark/executor/Executor.scala | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 4c6ec6cc6e..9f2b0c42c7 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -68,7 +68,6 @@ object SparkEnv extends Logging { isMaster: Boolean, isLocal: Boolean ) : SparkEnv = { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port) // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index dfdb22024e..cb29a6b8b4 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -43,6 +43,21 @@ private[spark] class Executor extends Logging { urlClassLoader = createClassLoader() Thread.currentThread.setContextClassLoader(urlClassLoader) + // Make any thread terminations due to uncaught exceptions kill the entire + // executor process to avoid surprising stalls. + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + System.exit(1) + } catch { + case t: Throwable => System.exit(2) + } + } + } + ) + // Initialize Spark environment (using system properties read above) env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) SparkEnv.set(env) -- cgit v1.2.3 From 6adc7c965f35ede8fb09452e278b2f17981ff600 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 16 Nov 2012 20:48:35 -0800 Subject: Doc fix --- docs/running-on-mesos.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 97564d7426..f4a3eb667c 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -15,7 +15,7 @@ Spark can run on private clusters managed by the [Apache Mesos](http://incubator 6. Copy Spark and Mesos to the _same_ paths on all the nodes in the cluster (or, for Mesos, `make install` on every node). 7. Configure Mesos for deployment: * On your master node, edit `/var/mesos/deploy/masters` to list your master and `/var/mesos/deploy/slaves` to list the slaves, where `` is the prefix where you installed Mesos (`/usr/local` by default). - * On all nodes, edit `/var/mesos/deploy/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node. + * On all nodes, edit `/var/mesos/conf/mesos.conf` and add the line `master=HOST:5050`, where HOST is your master node. * Run `/sbin/mesos-start-cluster.sh` on your master to start Mesos. If all goes well, you should see Mesos's web UI on port 8080 of the master machine. * See Mesos's README file for more information on deploying it. 8. To run a Spark job against the cluster, when you create your `SparkContext`, pass the string `mesos://HOST:5050` as the first parameter, where `HOST` is the machine running your Mesos master. In addition, pass the location of Spark on your nodes as the third parameter, and a list of JAR files containing your JAR's code as the fourth (these will automatically get copied to the workers). For example: -- cgit v1.2.3 From 606d252d264b75943983915b20a8d0e7a8a7d20f Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Sat, 17 Nov 2012 23:09:11 -0800 Subject: Adding comment about additional bandwidth charges --- ec2/spark_ec2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index a3138d6ef7..2f48439549 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -62,7 +62,8 @@ def parse_args(): help="EC2 region zone to launch instances in") parser.add_option("-z", "--zone", default="", help="Availability zone to launch instances in, or 'all' to spread " + - "slaves across multiple") + "slaves across multiple (an additional $0.01/Gb for bandwidth" + + "between zones applies)") parser.add_option("-a", "--ami", default="latest", help="Amazon Machine Image ID to use, or 'latest' to use latest " + "available AMI (default: latest)") -- cgit v1.2.3 From 00f4e3ff9c5d7cf36c00ea66c9610d457670d2a0 Mon Sep 17 00:00:00 2001 From: mbautin Date: Mon, 19 Nov 2012 11:52:10 -0800 Subject: Addressing Matei's comment: SPARK_LOCAL_IP environment variable --- core/src/main/scala/spark/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 9805105ea8..c8799e6de3 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -200,7 +200,7 @@ private object Utils extends Logging { * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4). */ def localIpAddress(): String = { - val defaultIpOverride = System.getenv("SPARK_DEFAULT_LOCAL_IP") + val defaultIpOverride = System.getenv("SPARK_LOCAL_IP") if (defaultIpOverride != null) defaultIpOverride else -- cgit v1.2.3 From dc2fb3c4b69cd2c5b6a11a08f642d72330b294d4 Mon Sep 17 00:00:00 2001 From: Peter Sankauskas Date: Mon, 19 Nov 2012 14:21:16 -0800 Subject: Allow Boto to use the other config options it supports, and gracefully handling Boto connection exceptions (like AuthFailure) --- ec2/spark_ec2.py | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 17276db6e5..05c06d32bf 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -19,7 +19,6 @@ from __future__ import with_statement -import boto import logging import os import random @@ -32,7 +31,7 @@ import urllib2 from optparse import OptionParser from sys import stderr from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType - +from boto import ec2 # A static URL from which to figure out the latest Mesos EC2 AMI LATEST_AMI_URL = "https://s3.amazonaws.com/mesos-images/ids/latest-spark-0.6" @@ -97,14 +96,20 @@ def parse_args(): if opts.cluster_type not in ["mesos", "standalone"] and action == "launch": print >> stderr, ("ERROR: Invalid cluster type: " + opts.cluster_type) sys.exit(1) - if os.getenv('AWS_ACCESS_KEY_ID') == None: - print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + - "must be set") - sys.exit(1) - if os.getenv('AWS_SECRET_ACCESS_KEY') == None: - print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + - "must be set") - sys.exit(1) + + # Boto config check + # http://boto.cloudhackers.com/en/latest/boto_config_tut.html + home_dir = os.getenv('HOME') + if home_dir == None or not os.path.isfile(home_dir + '/.boto'): + if not os.path.isfile('/etc/boto.cfg'): + if os.getenv('AWS_ACCESS_KEY_ID') == None: + print >> stderr, ("ERROR: The environment variable AWS_ACCESS_KEY_ID " + + "must be set") + sys.exit(1) + if os.getenv('AWS_SECRET_ACCESS_KEY') == None: + print >> stderr, ("ERROR: The environment variable AWS_SECRET_ACCESS_KEY " + + "must be set") + sys.exit(1) return (opts, action, cluster_name) @@ -476,7 +481,11 @@ def ssh(host, opts, command): def main(): (opts, action, cluster_name) = parse_args() - conn = boto.ec2.connect_to_region(opts.region) + try: + conn = ec2.connect_to_region(opts.region) + except Exception as e: + print >> stderr, (e) + sys.exit(1) # Select an AZ at random if it was not specified. if opts.zone == "": -- cgit v1.2.3 From c97ebf64377e853ab7c616a103869a4417f25954 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 19 Nov 2012 23:22:07 +0000 Subject: Fixed bug in the number of splits in RDD after checkpointing. Modified reduceByKeyAndWindow (naive) computation from window+reduceByKey to reduceByKey+window+reduceByKey. --- conf/streaming-env.sh.template | 2 +- core/src/main/scala/spark/RDD.scala | 3 ++- streaming/src/main/scala/spark/streaming/DStream.scala | 3 ++- streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala | 6 +++++- streaming/src/main/scala/spark/streaming/Scheduler.scala | 2 +- streaming/src/main/scala/spark/streaming/WindowedDStream.scala | 3 +++ 6 files changed, 14 insertions(+), 5 deletions(-) diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template index 6b4094c515..1ea9ba5541 100755 --- a/conf/streaming-env.sh.template +++ b/conf/streaming-env.sh.template @@ -11,7 +11,7 @@ SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC" -# Using of Kryo serialization can improve serialization performance +# Using Kryo serialization can improve serialization performance # and therefore the throughput of the Spark Streaming programs. However, # using Kryo serialization with custom classes may required you to # register the classes with Kryo. Refer to the Spark documentation diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6af8c377b5..8af6c9bd6a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -222,12 +222,13 @@ abstract class RDD[T: ClassManifest]( rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString rdd.saveAsObjectFile(checkpointFile) rdd.synchronized { - rdd.checkpointRDD = context.objectFile[T](checkpointFile) + rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) rdd.checkpointRDDSplits = rdd.checkpointRDD.splits rdd.changeDependencies(rdd.checkpointRDD) rdd.shouldCheckpoint = false rdd.isCheckpointInProgress = false rdd.isCheckpointed = true + println("Done checkpointing RDD " + rdd.id + ", " + rdd) } } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 13770aa8fd..26d5ce9198 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -321,7 +321,8 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data for time " + currentTime) + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, " + + "[" + checkpointData.mkString(",") + "]") } /** diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index e09d27d34f..720e63bba0 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -4,6 +4,7 @@ import spark.streaming.StreamingContext._ import spark.{Manifests, RDD, Partitioner, HashPartitioner} import spark.SparkContext._ +import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer @@ -115,7 +116,10 @@ extends Serializable { slideTime: Time, partitioner: Partitioner ): DStream[(K, V)] = { - self.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), partitioner) + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + self.reduceByKey(cleanedReduceFunc, partitioner) + .window(windowTime, slideTime) + .reduceByKey(cleanedReduceFunc, partitioner) } // This method is the efficient sliding window reduce operation, diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index e2dca91179..014021be61 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -17,7 +17,7 @@ extends Logging { val graph = ssc.graph - val concurrentJobs = System.getProperty("spark.stream.concurrentJobs", "1").toInt + val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index ce89a3f99b..e4d2a634f5 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -2,6 +2,7 @@ package spark.streaming import spark.RDD import spark.rdd.UnionRDD +import spark.storage.StorageLevel class WindowedDStream[T: ClassManifest]( @@ -18,6 +19,8 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + parent.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(parent) -- cgit v1.2.3 From fd11d23bb3a817dabd414bceddebc35ad731f626 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 19 Nov 2012 19:04:39 -0800 Subject: Modified StreamingContext API to make constructor accept the batch size (since it is always needed, Patrick's suggestion). Added description to DStream and StreamingContext. --- .../src/main/scala/spark/streaming/DStream.scala | 26 ++++++++++-- .../main/scala/spark/streaming/DStreamGraph.scala | 4 +- .../scala/spark/streaming/StreamingContext.scala | 49 +++++++++++++++------- .../scala/spark/streaming/examples/CountRaw.scala | 32 -------------- .../spark/streaming/examples/FileStream.scala | 7 ++-- .../examples/FileStreamWithCheckpoint.scala | 5 +-- .../scala/spark/streaming/examples/GrepRaw.scala | 7 ++-- .../spark/streaming/examples/QueueStream.scala | 10 ++--- .../streaming/examples/TopKWordCountRaw.scala | 7 ++-- .../spark/streaming/examples/WordCountHdfs.scala | 5 +-- .../streaming/examples/WordCountNetwork.scala | 6 +-- .../spark/streaming/examples/WordCountRaw.scala | 7 ++-- .../examples/clickstream/PageViewStream.scala | 5 +-- .../spark/streaming/BasicOperationsSuite.scala | 2 +- .../scala/spark/streaming/InputStreamsSuite.scala | 12 ++---- .../test/scala/spark/streaming/TestSuiteBase.scala | 6 +-- 16 files changed, 92 insertions(+), 98 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/examples/CountRaw.scala diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 26d5ce9198..8efda2074d 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -17,6 +17,26 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS. Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.PairDStreamFunctions]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext) extends Serializable with Logging { @@ -28,7 +48,7 @@ extends Serializable with Logging { * ---------------------------------------------- */ - // Time by which the window slides in this DStream + // Time interval at which the DStream generates an RDD def slideTime: Time // List of parent DStreams on which this DStream depends on @@ -186,12 +206,12 @@ extends Serializable with Logging { dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def setRememberDuration(duration: Time) { + protected[streaming] def remember(duration: Time) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) } - dependencies.foreach(_.setRememberDuration(parentRememberDuration)) + dependencies.foreach(_.remember(parentRememberDuration)) } /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index bd8c033eab..d0a9ade61d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -22,7 +22,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } zeroTime = time outputStreams.foreach(_.initialize(zeroTime)) - outputStreams.foreach(_.setRememberDuration(rememberDuration)) + outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validate) inputStreams.par.foreach(_.start()) } @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def setRememberDuration(duration: Time) { + private[streaming] def remember(duration: Time) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7a9a71f303..4a41f2f516 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -18,19 +18,39 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID -final class StreamingContext ( +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class StreamingContext private ( sc_ : SparkContext, - cp_ : Checkpoint + cp_ : Checkpoint, + batchDur_ : Time ) extends Logging { - def this(sparkContext: SparkContext) = this(sparkContext, null) - - def this(master: String, frameworkName: String, sparkHome: String = null, jars: Seq[String] = Nil) = - this(new SparkContext(master, frameworkName, sparkHome, jars), null) + /** + * Creates a StreamingContext using an existing SparkContext. + * @param sparkContext Existing SparkContext + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) - def this(path: String) = this(null, CheckpointReader.read(path)) + /** + * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Time) = + this(new SparkContext(master, frameworkName), null, batchDuration) - def this(cp_ : Checkpoint) = this(null, cp_) + /** + * Recreates the StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this(null, CheckpointReader.read(path), null) initLogging() @@ -57,7 +77,10 @@ final class StreamingContext ( cp_.graph.restoreCheckpointData() cp_.graph } else { - new DStreamGraph() + assert(batchDur_ != null, "Batch duration for streaming context cannot be null") + val newGraph = new DStreamGraph() + newGraph.setBatchDuration(batchDur_) + newGraph } } @@ -77,12 +100,8 @@ final class StreamingContext ( private[streaming] var receiverJobThread: Thread = null private[streaming] var scheduler: Scheduler = null - def setBatchDuration(duration: Time) { - graph.setBatchDuration(duration) - } - - def setRememberDuration(duration: Time) { - graph.setRememberDuration(duration) + def remember(duration: Time) { + graph.remember(duration) } def checkpoint(dir: String, interval: Time = null) { diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala deleted file mode 100644 index d2fdabd659..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.streaming.examples - -import spark.util.IntParam -import spark.storage.StorageLevel -import spark.streaming._ -import spark.streaming.StreamingContext._ - -object CountRaw { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: CountRaw ") - System.exit(1) - } - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "CountRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) - - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) - ssc.start() - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala index d68611abd6..81938d30d4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala @@ -14,10 +14,9 @@ object FileStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "FileStream") - ssc.setBatchDuration(Seconds(2)) - + // Create the context + val ssc = new StreamingContext(args(0), "FileStream", Seconds(1)) + // Create the new directory val directory = new Path(args(1)) val fs = directory.getFileSystem(new Configuration()) diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala index 21a83c0fde..b7bc15a1d5 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -32,9 +32,8 @@ object FileStreamWithCheckpoint { if (!fs.exists(directory)) fs.mkdirs(directory) // Create new streaming context - val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint") - ssc_.setBatchDuration(Seconds(1)) - ssc_.checkpoint(checkpointDir, Seconds(1)) + val ssc_ = new StreamingContext(args(0), "FileStreamWithCheckpoint", Seconds(1)) + ssc_.checkpoint(checkpointDir) // Setup the streaming computation val inputStream = ssc_.textFileStream(directory.toString) diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index ffbea6e55d..6cb2b4c042 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -16,9 +16,10 @@ object GrepRaw { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - // Create the context and set the batch size - val ssc = new StreamingContext(master, "GrepRaw") - ssc.setBatchDuration(Milliseconds(batchMillis)) + // Create the context + val ssc = new StreamingContext(master, "GrepRaw", Milliseconds(batchMillis)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala index 2af51bad28..2a265d021d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -1,9 +1,8 @@ package spark.streaming.examples import spark.RDD -import spark.streaming.StreamingContext +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.Seconds import scala.collection.mutable.SynchronizedQueue @@ -15,10 +14,9 @@ object QueueStream { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "QueueStream") - ssc.setBatchDuration(Seconds(1)) - + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + // Create the queue through which RDDs can be pushed to // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 0411bde1a7..fe4c2bf155 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -20,12 +20,11 @@ object TopKWordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args val k = 10 - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "TopKWordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in /*warmUp(ssc.sc)*/ diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala index 591cb141c3..867a8f42c4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -10,9 +10,8 @@ object WordCountHdfs { System.exit(1) } - // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountHdfs") - ssc.setBatchDuration(Seconds(2)) + // Create the context + val ssc = new StreamingContext(args(0), "WordCountHdfs", Seconds(2)) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala index ba1bd1de7c..eadda60563 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -6,13 +6,13 @@ import spark.streaming.StreamingContext._ object WordCountNetwork { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: WordCountNetwork ") + System.err.println("Usage: WordCountNetwork \n" + + "In local mode, should be 'local[n]' with n > 1") System.exit(1) } // Create the context and set the batch size - val ssc = new StreamingContext(args(0), "WordCountNetwork") - ssc.setBatchDuration(Seconds(2)) + val ssc = new StreamingContext(args(0), "WordCountNetwork", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 571428c0fe..a29c81d437 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -19,12 +19,11 @@ object WordCountRaw { val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context, set the batch size and checkpoint directory. + // Create the context, and set the checkpoint directory. // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts // periodically to HDFS - val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Seconds(1)) - ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + val ssc = new StreamingContext(master, "WordCountRaw", Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) // Warm up the JVMs on master and slave for JIT compilation to kick in warmUp(ssc.sc) diff --git a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 1a51fb66cd..68be6b7893 100644 --- a/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -23,9 +23,8 @@ object PageViewStream { val host = args(1) val port = args(2).toInt - // Create the context and set the batch size - val ssc = new StreamingContext("local[2]", "PageViewStream") - ssc.setBatchDuration(Seconds(1)) + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index d0aaac0f2e..dc38ef4912 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -175,7 +175,7 @@ class BasicOperationsSuite extends TestSuiteBase { } val ssc = setupStreams(input, operation _) - ssc.setRememberDuration(rememberDuration) + ssc.remember(rememberDuration) runStreams[(Int, Int)](ssc, input.size, input.size / 2) val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 3e99440226..e98c096725 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -40,8 +40,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) @@ -89,8 +88,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { testServer.start() // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) @@ -137,8 +135,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) val filestream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] def output = outputBuffer.flatMap(x => x) @@ -198,8 +195,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Set up the streaming context and input streams - var ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + var ssc = new StreamingContext(master, framework, batchDuration) ssc.checkpoint(checkpointDir, checkpointInterval) val filestream = ssc.textFileStream(testDir.toString) var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 5fb5cc504c..8cc2f8ccfc 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -76,8 +76,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } @@ -98,8 +97,7 @@ trait TestSuiteBase extends FunSuite with Logging { ): StreamingContext = { // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) + val ssc = new StreamingContext(master, framework, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir, checkpointInterval) } -- cgit v1.2.3 From b18d70870a33a4783c6b3b787bef9b0eec30bce0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 27 Nov 2012 15:08:49 -0800 Subject: Modified bunch HashMaps in Spark to use TimeStampedHashMap and made various modules use CleanupTask to periodically clean up metadata. --- core/src/main/scala/spark/CacheTracker.scala | 6 +- core/src/main/scala/spark/MapOutputTracker.scala | 27 ++++--- .../main/scala/spark/scheduler/DAGScheduler.scala | 13 +++- .../scala/spark/scheduler/ShuffleMapTask.scala | 6 +- core/src/main/scala/spark/util/CleanupTask.scala | 31 ++++++++ .../main/scala/spark/util/TimeStampedHashMap.scala | 87 ++++++++++++++++++++++ .../scala/spark/streaming/StreamingContext.scala | 13 +++- 7 files changed, 165 insertions(+), 18 deletions(-) create mode 100644 core/src/main/scala/spark/util/CleanupTask.scala create mode 100644 core/src/main/scala/spark/util/TimeStampedHashMap.scala diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index c5db6ce63a..0ee59bee0f 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -14,6 +14,7 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel +import util.{CleanupTask, TimeStampedHashMap} private[spark] sealed trait CacheTrackerMessage @@ -30,7 +31,7 @@ private[spark] case object StopCacheTracker extends CacheTrackerMessage private[spark] class CacheTrackerActor extends Actor with Logging { // TODO: Should probably store (String, CacheType) tuples - private val locs = new HashMap[Int, Array[List[String]]] + private val locs = new TimeStampedHashMap[Int, Array[List[String]]] /** * A map from the slave's host name to its cache size. @@ -38,6 +39,8 @@ private[spark] class CacheTrackerActor extends Actor with Logging { private val slaveCapacity = new HashMap[String, Long] private val slaveUsage = new HashMap[String, Long] + private val cleanupTask = new CleanupTask("CacheTracker", locs.cleanup) + private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L) private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L) private def getCacheAvailable(host: String): Long = getCacheCapacity(host) - getCacheUsage(host) @@ -86,6 +89,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { case StopCacheTracker => logInfo("Stopping CacheTrackerActor") sender ! true + cleanupTask.cancel() context.stop(self) } } diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 45441aa5e5..d0be1bb913 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -17,6 +17,7 @@ import scala.collection.mutable.HashSet import scheduler.MapStatus import spark.storage.BlockManagerId import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import util.{CleanupTask, TimeStampedHashMap} private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) @@ -43,7 +44,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea val timeout = 10.seconds - var mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] + var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] // Incremented every time a fetch fails so that client nodes know to clear // their cache of map output locations if this happens. @@ -52,7 +53,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // Cache a serialized version of the output statuses for each shuffle to send them out faster var cacheGeneration = generation - val cachedSerializedStatuses = new HashMap[Int, Array[Byte]] + val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]] var trackerActor: ActorRef = if (isMaster) { val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName) @@ -63,6 +64,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea actorSystem.actorFor(url) } + val cleanupTask = new CleanupTask("MapOutputTracker", this.cleanup) + // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. def askTracker(message: Any): Any = { @@ -83,14 +86,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } def registerShuffle(shuffleId: Int, numMaps: Int) { - if (mapStatuses.get(shuffleId) != null) { + if (mapStatuses.get(shuffleId) != None) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)) } def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { - var array = mapStatuses.get(shuffleId) + var array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status } @@ -107,7 +110,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { - var array = mapStatuses.get(shuffleId) + var array = mapStatuses(shuffleId) if (array != null) { array.synchronized { if (array(mapId).address == bmAddress) { @@ -125,7 +128,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { - val statuses = mapStatuses.get(shuffleId) + val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") fetching.synchronized { @@ -138,7 +141,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea case e: InterruptedException => } } - return mapStatuses.get(shuffleId).map(status => + return mapStatuses(shuffleId).map(status => (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) } else { fetching += shuffleId @@ -164,9 +167,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } } + def cleanup(cleanupTime: Long) { + mapStatuses.cleanup(cleanupTime) + cachedSerializedStatuses.cleanup(cleanupTime) + } + def stop() { communicate(StopMapOutputTracker) mapStatuses.clear() + cleanupTask.cancel() trackerActor = null } @@ -192,7 +201,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea generationLock.synchronized { if (newGen > generation) { logInfo("Updating generation to " + newGen + " and clearing cache") - mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] + mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] generation = newGen } } @@ -210,7 +219,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea case Some(bytes) => return bytes case None => - statuses = mapStatuses.get(shuffleId) + statuses = mapStatuses(shuffleId) generationGotten = generation } } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index aaaed59c4a..3af877b817 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -14,6 +14,7 @@ import spark.partial.ApproximateEvaluator import spark.partial.PartialResult import spark.storage.BlockManagerMaster import spark.storage.BlockManagerId +import util.{CleanupTask, TimeStampedHashMap} /** * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for @@ -61,9 +62,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val nextStageId = new AtomicInteger(0) - val idToStage = new HashMap[Int, Stage] + val idToStage = new TimeStampedHashMap[Int, Stage] - val shuffleToMapStage = new HashMap[Int, Stage] + val shuffleToMapStage = new TimeStampedHashMap[Int, Stage] var cacheLocs = new HashMap[Int, Array[List[String]]] @@ -83,6 +84,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] + val cleanupTask = new CleanupTask("DAGScheduler", this.cleanup) + // Start a thread to run the DAGScheduler event loop new Thread("DAGScheduler") { setDaemon(true) @@ -591,8 +594,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with return Nil } + def cleanup(cleanupTime: Long) { + idToStage.cleanup(cleanupTime) + shuffleToMapStage.cleanup(cleanupTime) + } + def stop() { eventQueue.put(StopDAGScheduler) + cleanupTask.cancel() taskSched.stop() } } diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 60105c42b6..fbf618c906 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -14,17 +14,19 @@ import com.ning.compress.lzf.LZFOutputStream import spark._ import spark.storage._ +import util.{TimeStampedHashMap, CleanupTask} private[spark] object ShuffleMapTask { // A simple map between the stage id to the serialized byte array of a task. // Served as a cache for task serialization because serialization can be // expensive on the master node if it needs to launch thousands of tasks. - val serializedInfoCache = new JHashMap[Int, Array[Byte]] + val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] + val cleanupTask = new CleanupTask("ShuffleMapTask", serializedInfoCache.cleanup) def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { - val old = serializedInfoCache.get(stageId) + val old = serializedInfoCache.get(stageId).orNull if (old != null) { return old } else { diff --git a/core/src/main/scala/spark/util/CleanupTask.scala b/core/src/main/scala/spark/util/CleanupTask.scala new file mode 100644 index 0000000000..ccc28803e0 --- /dev/null +++ b/core/src/main/scala/spark/util/CleanupTask.scala @@ -0,0 +1,31 @@ +package spark.util + +import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors} +import java.util.{TimerTask, Timer} +import spark.Logging + +class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging { + val delayMins = System.getProperty("spark.cleanup.delay", "-100").toInt + val periodMins = System.getProperty("spark.cleanup.period", (delayMins / 10).toString).toInt + val timer = new Timer(name + " cleanup timer", true) + val task = new TimerTask { + def run() { + try { + if (delayMins > 0) { + + cleanupFunc(System.currentTimeMillis() - (delayMins * 60 * 1000)) + logInfo("Ran cleanup task for " + name) + } + } catch { + case e: Exception => logError("Error running cleanup task for " + name, e) + } + } + } + if (periodMins > 0) { + timer.schedule(task, periodMins * 60 * 1000, periodMins * 60 * 1000) + } + + def cancel() { + timer.cancel() + } +} diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala new file mode 100644 index 0000000000..7a22b80a20 --- /dev/null +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -0,0 +1,87 @@ +package spark.util + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, Map} +import java.util.concurrent.ConcurrentHashMap + +/** + * This is a custom implementation of scala.collection.mutable.Map which stores the insertion + * time stamp along with each key-value pair. Key-value pairs that are older than a particular + * threshold time can them be removed using the cleanup method. This is intended to be a drop-in + * replacement of scala.collection.mutable.HashMap. + */ +class TimeStampedHashMap[A, B] extends Map[A, B]() { + val internalMap = new ConcurrentHashMap[A, (B, Long)]() + + def get(key: A): Option[B] = { + val value = internalMap.get(key) + if (value != null) Some(value._1) else None + } + + def iterator: Iterator[(A, B)] = { + val jIterator = internalMap.entrySet().iterator() + jIterator.map(kv => (kv.getKey, kv.getValue._1)) + } + + override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = { + val newMap = new TimeStampedHashMap[A, B1] + newMap.internalMap.putAll(this.internalMap) + newMap.internalMap.put(kv._1, (kv._2, currentTime)) + newMap + } + + override def - (key: A): Map[A, B] = { + internalMap.remove(key) + this + } + + override def += (kv: (A, B)): this.type = { + internalMap.put(kv._1, (kv._2, currentTime)) + this + } + + override def -= (key: A): this.type = { + internalMap.remove(key) + this + } + + override def update(key: A, value: B) { + this += ((key, value)) + } + + override def apply(key: A): B = { + val value = internalMap.get(key) + if (value == null) throw new NoSuchElementException() + value._1 + } + + override def filter(p: ((A, B)) => Boolean): Map[A, B] = { + internalMap.map(kv => (kv._1, kv._2._1)).filter(p) + } + + override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() + + override def size(): Int = internalMap.size() + + override def foreach[U](f: ((A, B)) => U): Unit = { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val kv = (entry.getKey, entry.getValue._1) + f(kv) + } + } + + def cleanup(threshTime: Long) { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + if (entry.getValue._2 < threshTime) { + iterator.remove() + } + } + } + + private def currentTime: Long = System.currentTimeMillis() + +} diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 4a41f2f516..58123dc82c 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -43,7 +43,7 @@ class StreamingContext private ( * @param batchDuration The time interval at which streaming data will be divided into batches */ def this(master: String, frameworkName: String, batchDuration: Time) = - this(new SparkContext(master, frameworkName), null, batchDuration) + this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** * Recreates the StreamingContext from a checkpoint file. @@ -214,11 +214,8 @@ class StreamingContext private ( "Checkpoint directory has been set, but the graph checkpointing interval has " + "not been set. Please use StreamingContext.checkpoint() to set the interval." ) - - } - /** * This function starts the execution of the streams. */ @@ -265,6 +262,14 @@ class StreamingContext private ( object StreamingContext { + + def createNewSparkContext(master: String, frameworkName: String): SparkContext = { + if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) { + System.setProperty("spark.cleanup.delay", "60") + } + new SparkContext(master, frameworkName) + } + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { new PairDStreamFunctions[K, V](stream) } -- cgit v1.2.3 From d5e7aad039603a8a02d11f9ebda001422ca4c341 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Nov 2012 08:36:55 +0000 Subject: Bug fixes --- core/src/main/scala/spark/scheduler/DAGScheduler.scala | 11 ++++++++++- core/src/main/scala/spark/util/CleanupTask.scala | 17 +++++++++-------- .../main/scala/spark/streaming/StreamingContext.scala | 2 +- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 3af877b817..affacb43ca 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -78,7 +78,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val running = new HashSet[Stage] // Stages we are running right now val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures - val pendingTasks = new HashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage + val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] @@ -595,8 +595,17 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } def cleanup(cleanupTime: Long) { + var sizeBefore = idToStage.size idToStage.cleanup(cleanupTime) + logInfo("idToStage " + sizeBefore + " --> " + idToStage.size) + + sizeBefore = shuffleToMapStage.size shuffleToMapStage.cleanup(cleanupTime) + logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size) + + sizeBefore = pendingTasks.size + pendingTasks.cleanup(cleanupTime) + logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) } def stop() { diff --git a/core/src/main/scala/spark/util/CleanupTask.scala b/core/src/main/scala/spark/util/CleanupTask.scala index ccc28803e0..a4357c62c6 100644 --- a/core/src/main/scala/spark/util/CleanupTask.scala +++ b/core/src/main/scala/spark/util/CleanupTask.scala @@ -5,24 +5,25 @@ import java.util.{TimerTask, Timer} import spark.Logging class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging { - val delayMins = System.getProperty("spark.cleanup.delay", "-100").toInt - val periodMins = System.getProperty("spark.cleanup.period", (delayMins / 10).toString).toInt + val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt + val periodSeconds = math.max(10, delaySeconds / 10) val timer = new Timer(name + " cleanup timer", true) val task = new TimerTask { def run() { try { - if (delayMins > 0) { - - cleanupFunc(System.currentTimeMillis() - (delayMins * 60 * 1000)) + if (delaySeconds > 0) { + cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) logInfo("Ran cleanup task for " + name) - } + } } catch { case e: Exception => logError("Error running cleanup task for " + name, e) } } } - if (periodMins > 0) { - timer.schedule(task, periodMins * 60 * 1000, periodMins * 60 * 1000) + if (periodSeconds > 0) { + logInfo("Starting cleanup task for " + name + " with delay of " + delaySeconds + " seconds and " + + "period of " + periodSeconds + " secs") + timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000) } def cancel() { diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 58123dc82c..90dd560752 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -264,7 +264,7 @@ class StreamingContext private ( object StreamingContext { def createNewSparkContext(master: String, frameworkName: String): SparkContext = { - if (System.getProperty("spark.cleanup.delay", "-1").toInt < 0) { + if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) { System.setProperty("spark.cleanup.delay", "60") } new SparkContext(master, frameworkName) -- cgit v1.2.3 From e463ae492068d2922e1d50c051a87f8010953dff Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Nov 2012 14:05:01 -0800 Subject: Modified StorageLevel and BlockManagerId to cache common objects and use cached object while deserializing. --- .../main/scala/spark/storage/BlockManager.scala | 28 +------------ .../main/scala/spark/storage/BlockManagerId.scala | 48 ++++++++++++++++++++++ .../main/scala/spark/storage/StorageLevel.scala | 28 ++++++++++++- .../scala/spark/storage/BlockManagerSuite.scala | 26 ++++++++++++ 4 files changed, 101 insertions(+), 29 deletions(-) create mode 100644 core/src/main/scala/spark/storage/BlockManagerId.scala diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 70d6d8369d..e4aa9247a3 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -20,33 +20,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import sun.nio.ch.DirectBuffer -private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { - def this() = this(null, 0) // For deserialization only - - def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) - - override def writeExternal(out: ObjectOutput) { - out.writeUTF(ip) - out.writeInt(port) - } - - override def readExternal(in: ObjectInput) { - ip = in.readUTF() - port = in.readInt() - } - - override def toString = "BlockManagerId(" + ip + ", " + port + ")" - - override def hashCode = ip.hashCode * 41 + port - - override def equals(that: Any) = that match { - case id: BlockManagerId => port == id.port && ip == id.ip - case _ => false - } -} - - -private[spark] +private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) diff --git a/core/src/main/scala/spark/storage/BlockManagerId.scala b/core/src/main/scala/spark/storage/BlockManagerId.scala new file mode 100644 index 0000000000..4933cc6606 --- /dev/null +++ b/core/src/main/scala/spark/storage/BlockManagerId.scala @@ -0,0 +1,48 @@ +package spark.storage + +import java.io.{IOException, ObjectOutput, ObjectInput, Externalizable} +import java.util.concurrent.ConcurrentHashMap + +private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { + def this() = this(null, 0) // For deserialization only + + def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) + + override def writeExternal(out: ObjectOutput) { + out.writeUTF(ip) + out.writeInt(port) + } + + override def readExternal(in: ObjectInput) { + ip = in.readUTF() + port = in.readInt() + } + + @throws(classOf[IOException]) + private def readResolve(): Object = { + BlockManagerId.getCachedBlockManagerId(this) + } + + + override def toString = "BlockManagerId(" + ip + ", " + port + ")" + + override def hashCode = ip.hashCode * 41 + port + + override def equals(that: Any) = that match { + case id: BlockManagerId => port == id.port && ip == id.ip + case _ => false + } +} + +object BlockManagerId { + val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]() + + def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = { + if (blockManagerIdCache.containsKey(id)) { + blockManagerIdCache.get(id) + } else { + blockManagerIdCache.put(id, id) + id + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index c497f03e0c..eb88eb2759 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -1,6 +1,9 @@ package spark.storage -import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput} +import collection.mutable +import util.Random +import collection.mutable.ArrayBuffer /** * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, @@ -17,7 +20,8 @@ class StorageLevel( extends Externalizable { // TODO: Also add fields for caching priority, dataset ID, and flushing. - + assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes") + def this(flags: Int, replication: Int) { this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } @@ -27,6 +31,10 @@ class StorageLevel( override def clone(): StorageLevel = new StorageLevel( this.useDisk, this.useMemory, this.deserialized, this.replication) + override def hashCode(): Int = { + toInt * 41 + replication + } + override def equals(other: Any): Boolean = other match { case s: StorageLevel => s.useDisk == useDisk && @@ -66,6 +74,11 @@ class StorageLevel( replication = in.readByte() } + @throws(classOf[IOException]) + private def readResolve(): Object = { + StorageLevel.getCachedStorageLevel(this) + } + override def toString: String = "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) } @@ -82,4 +95,15 @@ object StorageLevel { val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) + + val storageLevelCache = new java.util.concurrent.ConcurrentHashMap[StorageLevel, StorageLevel]() + + def getCachedStorageLevel(level: StorageLevel): StorageLevel = { + if (storageLevelCache.containsKey(level)) { + storageLevelCache.get(level) + } else { + storageLevelCache.put(level, level) + level + } + } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 0e78228134..a2d5e39859 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -57,6 +57,32 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } + test("StorageLevel object caching") { + val level1 = new StorageLevel(false, false, false, 3) + val level2 = new StorageLevel(false, false, false, 3) + val bytes1 = spark.Utils.serialize(level1) + val level1_ = spark.Utils.deserialize[StorageLevel](bytes1) + val bytes2 = spark.Utils.serialize(level2) + val level2_ = spark.Utils.deserialize[StorageLevel](bytes2) + assert(level1_ === level1, "Deserialized level1 not same as original level1") + assert(level2_ === level2, "Deserialized level2 not same as original level1") + assert(level1_ === level2_, "Deserialized level1 not same as deserialized level2") + assert(level2_.eq(level1_), "Deserialized level2 not the same object as deserialized level1") + } + + test("BlockManagerId object caching") { + val id1 = new StorageLevel(false, false, false, 3) + val id2 = new StorageLevel(false, false, false, 3) + val bytes1 = spark.Utils.serialize(id1) + val id1_ = spark.Utils.deserialize[StorageLevel](bytes1) + val bytes2 = spark.Utils.serialize(id2) + val id2_ = spark.Utils.deserialize[StorageLevel](bytes2) + assert(id1_ === id1, "Deserialized id1 not same as original id1") + assert(id2_ === id2, "Deserialized id2 not same as original id1") + assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2") + assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1") + } + test("master + 1 manager interaction") { store = new BlockManager(master, serializer, 2000) val a1 = new Array[Byte](400) -- cgit v1.2.3 From 9e9e9e1d898387a1996e4c57128bafadb5938a9b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Nov 2012 18:48:14 -0800 Subject: Renamed CleanupTask to MetadataCleaner. --- core/src/main/scala/spark/CacheTracker.scala | 6 ++-- core/src/main/scala/spark/MapOutputTracker.scala | 6 ++-- .../main/scala/spark/scheduler/DAGScheduler.scala | 6 ++-- .../scala/spark/scheduler/ShuffleMapTask.scala | 5 ++-- core/src/main/scala/spark/util/CleanupTask.scala | 32 ---------------------- .../main/scala/spark/util/MetadataCleaner.scala | 32 ++++++++++++++++++++++ 6 files changed, 44 insertions(+), 43 deletions(-) delete mode 100644 core/src/main/scala/spark/util/CleanupTask.scala create mode 100644 core/src/main/scala/spark/util/MetadataCleaner.scala diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 0ee59bee0f..9888f061d9 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -14,7 +14,7 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel -import util.{CleanupTask, TimeStampedHashMap} +import util.{MetadataCleaner, TimeStampedHashMap} private[spark] sealed trait CacheTrackerMessage @@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { private val slaveCapacity = new HashMap[String, Long] private val slaveUsage = new HashMap[String, Long] - private val cleanupTask = new CleanupTask("CacheTracker", locs.cleanup) + private val metadataCleaner = new MetadataCleaner("CacheTracker", locs.cleanup) private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L) private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L) @@ -89,7 +89,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { case StopCacheTracker => logInfo("Stopping CacheTrackerActor") sender ! true - cleanupTask.cancel() + metadataCleaner.cancel() context.stop(self) } } diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index d0be1bb913..20ff5431af 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -17,7 +17,7 @@ import scala.collection.mutable.HashSet import scheduler.MapStatus import spark.storage.BlockManagerId import java.util.zip.{GZIPInputStream, GZIPOutputStream} -import util.{CleanupTask, TimeStampedHashMap} +import util.{MetadataCleaner, TimeStampedHashMap} private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) @@ -64,7 +64,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea actorSystem.actorFor(url) } - val cleanupTask = new CleanupTask("MapOutputTracker", this.cleanup) + val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup) // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. @@ -175,7 +175,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea def stop() { communicate(StopMapOutputTracker) mapStatuses.clear() - cleanupTask.cancel() + metadataCleaner.cancel() trackerActor = null } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index affacb43ca..4b2570fa2b 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -14,7 +14,7 @@ import spark.partial.ApproximateEvaluator import spark.partial.PartialResult import spark.storage.BlockManagerMaster import spark.storage.BlockManagerId -import util.{CleanupTask, TimeStampedHashMap} +import util.{MetadataCleaner, TimeStampedHashMap} /** * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for @@ -84,7 +84,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] - val cleanupTask = new CleanupTask("DAGScheduler", this.cleanup) + val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup) // Start a thread to run the DAGScheduler event loop new Thread("DAGScheduler") { @@ -610,7 +610,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def stop() { eventQueue.put(StopDAGScheduler) - cleanupTask.cancel() + metadataCleaner.cancel() taskSched.stop() } } diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index fbf618c906..683f5ebec3 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -14,7 +14,7 @@ import com.ning.compress.lzf.LZFOutputStream import spark._ import spark.storage._ -import util.{TimeStampedHashMap, CleanupTask} +import util.{TimeStampedHashMap, MetadataCleaner} private[spark] object ShuffleMapTask { @@ -22,7 +22,8 @@ private[spark] object ShuffleMapTask { // Served as a cache for task serialization because serialization can be // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]] - val cleanupTask = new CleanupTask("ShuffleMapTask", serializedInfoCache.cleanup) + + val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup) def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = { synchronized { diff --git a/core/src/main/scala/spark/util/CleanupTask.scala b/core/src/main/scala/spark/util/CleanupTask.scala deleted file mode 100644 index a4357c62c6..0000000000 --- a/core/src/main/scala/spark/util/CleanupTask.scala +++ /dev/null @@ -1,32 +0,0 @@ -package spark.util - -import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors} -import java.util.{TimerTask, Timer} -import spark.Logging - -class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging { - val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt - val periodSeconds = math.max(10, delaySeconds / 10) - val timer = new Timer(name + " cleanup timer", true) - val task = new TimerTask { - def run() { - try { - if (delaySeconds > 0) { - cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) - logInfo("Ran cleanup task for " + name) - } - } catch { - case e: Exception => logError("Error running cleanup task for " + name, e) - } - } - } - if (periodSeconds > 0) { - logInfo("Starting cleanup task for " + name + " with delay of " + delaySeconds + " seconds and " - + "period of " + periodSeconds + " secs") - timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000) - } - - def cancel() { - timer.cancel() - } -} diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala new file mode 100644 index 0000000000..71ac39864e --- /dev/null +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -0,0 +1,32 @@ +package spark.util + +import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors} +import java.util.{TimerTask, Timer} +import spark.Logging + +class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { + val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt + val periodSeconds = math.max(10, delaySeconds / 10) + val timer = new Timer(name + " cleanup timer", true) + val task = new TimerTask { + def run() { + try { + if (delaySeconds > 0) { + cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) + logInfo("Ran metadata cleaner for " + name) + } + } catch { + case e: Exception => logError("Error running cleanup task for " + name, e) + } + } + } + if (periodSeconds > 0) { + logInfo("Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and " + + "period of " + periodSeconds + " secs") + timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000) + } + + def cancel() { + timer.cancel() + } +} -- cgit v1.2.3 From c9789751bfc496d24e8369a0035d57f0ed8dcb58 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 28 Nov 2012 23:18:24 -0800 Subject: Added metadata cleaner to BlockManager to remove old blocks completely. --- .../main/scala/spark/storage/BlockManager.scala | 47 ++++++++++++++++------ .../scala/spark/storage/BlockManagerMaster.scala | 1 + 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index e4aa9247a3..1e36578e1a 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -10,12 +10,12 @@ import java.nio.{MappedByteBuffer, ByteBuffer} import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import scala.collection.JavaConversions._ import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils} import spark.network._ import spark.serializer.Serializer -import spark.util.ByteBufferInputStream +import spark.util.{MetadataCleaner, TimeStampedHashMap, ByteBufferInputStream} + import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import sun.nio.ch.DirectBuffer @@ -51,7 +51,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000) + private val blockInfo = new TimeStampedHashMap[String, BlockInfo]() private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val diskStore: BlockStore = @@ -80,6 +80,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m val host = System.getProperty("spark.hostname", Utils.localHostName()) + val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks) initialize() /** @@ -102,8 +103,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m * Get storage level of local block. If no info exists for the block, then returns null. */ def getLevel(blockId: String): StorageLevel = { - val info = blockInfo.get(blockId) - if (info != null) info.level else null + blockInfo.get(blockId).map(_.level).orNull } /** @@ -113,9 +113,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def reportBlockStatus(blockId: String) { val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match { - case null => + case None => (StorageLevel.NONE, 0L, 0L) - case info => + case Some(info) => info.synchronized { info.level match { case null => @@ -173,7 +173,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { info.waitForReady() // In case the block is still being put() by another thread @@ -258,7 +258,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { info.waitForReady() // In case the block is still being put() by another thread @@ -517,7 +517,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - val oldBlock = blockInfo.get(blockId) + val oldBlock = blockInfo.get(blockId).orNull if (oldBlock != null) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") oldBlock.waitForReady() @@ -618,7 +618,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.containsKey(blockId)) { + if (blockInfo.contains(blockId)) { logWarning("Block " + blockId + " already exists on this machine; not re-adding it") return } @@ -740,7 +740,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m */ def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) { logInfo("Dropping block " + blockId + " from memory") - val info = blockInfo.get(blockId) + val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { val level = info.level @@ -767,6 +767,29 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } + def dropOldBlocks(cleanupTime: Long) { + logInfo("Dropping blocks older than " + cleanupTime) + val iterator = blockInfo.internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val (id, info, time) = (entry.getKey, entry.getValue._1, entry.getValue._2) + if (time < cleanupTime) { + info.synchronized { + val level = info.level + if (level.useMemory) { + memoryStore.remove(id) + } + if (level.useDisk) { + diskStore.remove(id) + } + iterator.remove() + logInfo("Dropped block " + id) + } + reportBlockStatus(id) + } + } + } + def shouldCompress(blockId: String): Boolean = { if (blockId.startsWith("shuffle_")) { compressShuffle diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 397395a65b..af15663621 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -341,6 +341,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor throw new Exception("Self index for " + blockManagerId + " not found") } + // Note that this logic will select the same node multiple times if there aren't enough peers var index = selfIndex while (res.size < size) { index += 1 -- cgit v1.2.3 From 6fcd09f499dca66d255aa7196839156433aae442 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 29 Nov 2012 02:06:33 -0800 Subject: Added TimeStampedHashSet and used that to cleanup the list of registered RDD IDs in CacheTracker. --- core/src/main/scala/spark/CacheTracker.scala | 10 +++- .../main/scala/spark/util/TimeStampedHashMap.scala | 14 +++-- .../main/scala/spark/util/TimeStampedHashSet.scala | 66 ++++++++++++++++++++++ 3 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 core/src/main/scala/spark/util/TimeStampedHashSet.scala diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 9888f061d9..cb54e12257 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -14,7 +14,7 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel -import util.{MetadataCleaner, TimeStampedHashMap} +import util.{TimeStampedHashSet, MetadataCleaner, TimeStampedHashMap} private[spark] sealed trait CacheTrackerMessage @@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { private val slaveCapacity = new HashMap[String, Long] private val slaveUsage = new HashMap[String, Long] - private val metadataCleaner = new MetadataCleaner("CacheTracker", locs.cleanup) + private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup) private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L) private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L) @@ -113,11 +113,15 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b actorSystem.actorFor(url) } - val registeredRddIds = new HashSet[Int] + // TODO: Consider removing this HashSet completely as locs CacheTrackerActor already + // keeps track of registered RDDs + val registeredRddIds = new TimeStampedHashSet[Int] // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[String] + val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup) + // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. def askTracker(message: Any): Any = { diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index 7a22b80a20..9bcc9245c0 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -1,7 +1,7 @@ package spark.util -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, Map} +import scala.collection.JavaConversions +import scala.collection.mutable.Map import java.util.concurrent.ConcurrentHashMap /** @@ -20,7 +20,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { def iterator: Iterator[(A, B)] = { val jIterator = internalMap.entrySet().iterator() - jIterator.map(kv => (kv.getKey, kv.getValue._1)) + JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1)) } override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = { @@ -31,8 +31,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { } override def - (key: A): Map[A, B] = { - internalMap.remove(key) - this + val newMap = new TimeStampedHashMap[A, B] + newMap.internalMap.putAll(this.internalMap) + newMap.internalMap.remove(key) + newMap } override def += (kv: (A, B)): this.type = { @@ -56,7 +58,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { } override def filter(p: ((A, B)) => Boolean): Map[A, B] = { - internalMap.map(kv => (kv._1, kv._2._1)).filter(p) + JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p) } override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala new file mode 100644 index 0000000000..539dd75844 --- /dev/null +++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala @@ -0,0 +1,66 @@ +package spark.util + +import scala.collection.mutable.Set +import scala.collection.JavaConversions +import java.util.concurrent.ConcurrentHashMap + + +class TimeStampedHashSet[A] extends Set[A] { + val internalMap = new ConcurrentHashMap[A, Long]() + + def contains(key: A): Boolean = { + internalMap.contains(key) + } + + def iterator: Iterator[A] = { + val jIterator = internalMap.entrySet().iterator() + JavaConversions.asScalaIterator(jIterator).map(_.getKey) + } + + override def + (elem: A): Set[A] = { + val newSet = new TimeStampedHashSet[A] + newSet ++= this + newSet += elem + newSet + } + + override def - (elem: A): Set[A] = { + val newSet = new TimeStampedHashSet[A] + newSet ++= this + newSet -= elem + newSet + } + + override def += (key: A): this.type = { + internalMap.put(key, currentTime) + this + } + + override def -= (key: A): this.type = { + internalMap.remove(key) + this + } + + override def empty: Set[A] = new TimeStampedHashSet[A]() + + override def size(): Int = internalMap.size() + + override def foreach[U](f: (A) => U): Unit = { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + f(iterator.next.getKey) + } + } + + def cleanup(threshTime: Long) { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + if (entry.getValue < threshTime) { + iterator.remove() + } + } + } + + private def currentTime: Long = System.currentTimeMillis() +} -- cgit v1.2.3 From 62965c5d8e3f4f0246ac2c8814ac75ea82b3f238 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 1 Dec 2012 08:26:10 -0800 Subject: Added ssc.union --- streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala | 3 ++- streaming/src/main/scala/spark/streaming/StreamingContext.scala | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 8b484e6acf..bb852cbcca 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -118,7 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( if (seqOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { - throw new Exception("Neither previous window has value for key, nor new values found") + val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n") + throw new Exception("Neither previous window has value for key, nor new values found\n" + info) } // Reduce the new values newValues.reduce(reduceF) // return diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 90dd560752..63d8766749 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -189,6 +189,10 @@ class StreamingContext private ( inputStream } + def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { + new UnionDStream[T](streams.toArray) + } + /** * This function registers a InputDStream as an input stream that will be * started (InputDStream.start() called) to get the input data streams. -- cgit v1.2.3 From 477de94894b7d8eeed281d33c12bcb2269d117c7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 1 Dec 2012 13:15:06 -0800 Subject: Minor modifications. --- core/src/main/scala/spark/util/MetadataCleaner.scala | 7 ++++++- streaming/src/main/scala/spark/streaming/DStream.scala | 15 ++++++++++++++- .../scala/spark/streaming/ReducedWindowedDStream.scala | 4 ++-- .../src/main/scala/spark/streaming/StreamingContext.scala | 8 ++++++-- 4 files changed, 28 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/spark/util/MetadataCleaner.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala index 71ac39864e..2541b26255 100644 --- a/core/src/main/scala/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/spark/util/MetadataCleaner.scala @@ -5,7 +5,7 @@ import java.util.{TimerTask, Timer} import spark.Logging class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging { - val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt + val delaySeconds = MetadataCleaner.getDelaySeconds val periodSeconds = math.max(10, delaySeconds / 10) val timer = new Timer(name + " cleanup timer", true) val task = new TimerTask { @@ -30,3 +30,8 @@ class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging timer.cancel() } } + +object MetadataCleaner { + def getDelaySeconds = (System.getProperty("spark.cleaner.delay", "-100").toDouble * 60).toInt + def setDelaySeconds(delay: Long) { System.setProperty("spark.cleaner.delay", delay.toString) } +} diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 8efda2074d..28a3e2dfc7 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -146,6 +146,8 @@ extends Serializable with Logging { } protected[streaming] def validate() { + assert(rememberDuration != null, "Remember duration is set to null") + assert( !mustCheckpoint || checkpointInterval != null, "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " + @@ -180,13 +182,24 @@ extends Serializable with Logging { checkpointInterval + "). Please set it to higher than " + checkpointInterval + "." ) + val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble + assert( + metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000, + "It seems you are doing some DStream window operation or setting a checkpoint interval " + + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + + "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " + + "the Java property 'spark.cleanup.delay' to more than " + + math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." + ) + dependencies.foreach(_.validate()) logInfo("Slide time = " + slideTime) logInfo("Storage level = " + storageLevel) logInfo("Checkpoint interval = " + checkpointInterval) logInfo("Remember duration = " + rememberDuration) - logInfo("Initialized " + this) + logInfo("Initialized and validated " + this) } protected[streaming] def setContext(s: StreamingContext) { diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index bb852cbcca..f63a9e0011 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -118,8 +118,8 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( if (seqOfValues(0).isEmpty) { // If previous window's reduce value does not exist, then at least new values should exist if (newValues.isEmpty) { - val info = "seqOfValues =\n" + seqOfValues.map(x => "[" + x.mkString(",") + "]").mkString("\n") - throw new Exception("Neither previous window has value for key, nor new values found\n" + info) + throw new Exception("Neither previous window has value for key, nor new values found. " + + "Are you sure your key class hashes consistently?") } // Reduce the new values newValues.reduce(reduceF) // return diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 63d8766749..9c19f6588d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import java.util.UUID +import spark.util.MetadataCleaner /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -268,8 +269,11 @@ class StreamingContext private ( object StreamingContext { def createNewSparkContext(master: String, frameworkName: String): SparkContext = { - if (System.getProperty("spark.cleanup.delay", "-1").toDouble < 0) { - System.setProperty("spark.cleanup.delay", "60") + + // Set the default cleaner delay to an hour if not already set. + // This should be sufficient for even 1 second interval. + if (MetadataCleaner.getDelaySeconds < 0) { + MetadataCleaner.setDelaySeconds(60) } new SparkContext(master, frameworkName) } -- cgit v1.2.3 From b4dba55f78b0dfda728cf69c9c17e4863010d28d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 2 Dec 2012 02:03:05 +0000 Subject: Made RDD checkpoint not create a new thread. Fixed bug in detecting when spark.cleaner.delay is insufficient. --- core/src/main/scala/spark/RDD.scala | 31 +++++++--------------- .../main/scala/spark/util/TimeStampedHashMap.scala | 3 ++- .../src/main/scala/spark/streaming/DStream.scala | 9 ++++--- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8af6c9bd6a..fbfcfbd704 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -211,28 +211,17 @@ abstract class RDD[T: ClassManifest]( if (startCheckpoint) { val rdd = this - val env = SparkEnv.get - - // Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file - val th = new Thread() { - override def run() { - // Save the RDD to a file, create a new HadoopRDD from it, - // and change the dependencies from the original parents to the new RDD - SparkEnv.set(env) - rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString - rdd.saveAsObjectFile(checkpointFile) - rdd.synchronized { - rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) - rdd.checkpointRDDSplits = rdd.checkpointRDD.splits - rdd.changeDependencies(rdd.checkpointRDD) - rdd.shouldCheckpoint = false - rdd.isCheckpointInProgress = false - rdd.isCheckpointed = true - println("Done checkpointing RDD " + rdd.id + ", " + rdd) - } - } + rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString + rdd.saveAsObjectFile(checkpointFile) + rdd.synchronized { + rdd.checkpointRDD = context.objectFile[T](checkpointFile, rdd.splits.size) + rdd.checkpointRDDSplits = rdd.checkpointRDD.splits + rdd.changeDependencies(rdd.checkpointRDD) + rdd.shouldCheckpoint = false + rdd.isCheckpointInProgress = false + rdd.isCheckpointed = true + println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD) } - th.start() } else { // Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked dependencies.foreach(_.rdd.doCheckpoint()) diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index 9bcc9245c0..52f03784db 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap * threshold time can them be removed using the cleanup method. This is intended to be a drop-in * replacement of scala.collection.mutable.HashMap. */ -class TimeStampedHashMap[A, B] extends Map[A, B]() { +class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { val internalMap = new ConcurrentHashMap[A, (B, Long)]() def get(key: A): Option[B] = { @@ -79,6 +79,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { while(iterator.hasNext) { val entry = iterator.next() if (entry.getValue._2 < threshTime) { + logDebug("Removing key " + entry.getKey) iterator.remove() } } diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 28a3e2dfc7..d2e9de110e 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -182,14 +182,15 @@ extends Serializable with Logging { checkpointInterval + "). Please set it to higher than " + checkpointInterval + "." ) - val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble + val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds + logInfo("metadataCleanupDelay = " + metadataCleanerDelay) assert( - metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000, + metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + - "delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " + - "the Java property 'spark.cleanup.delay' to more than " + + "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + + "the Java property 'spark.cleaner.delay' to more than " + math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes." ) -- cgit v1.2.3 From 609e00d599d3f429a838f598b3f32c5fdbd7ec5e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 2 Dec 2012 02:39:08 +0000 Subject: Minor mods --- streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index d0fef70f7e..ae6692290e 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -58,7 +58,7 @@ class NetworkInputTracker( throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - logInfo("Registered receiver for network stream " + streamId) + logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) sender ! true } case AddBlocks(streamId, blockIds) => { -- cgit v1.2.3 From a69a82be2682148f5d1ebbdede15a47c90eea73d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 3 Dec 2012 22:37:31 -0800 Subject: Added metadata cleaner to HttpBroadcast to clean up old broacast files. --- .../main/scala/spark/broadcast/HttpBroadcast.scala | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 7eb4ddb74f..fef264aab1 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -11,6 +11,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import spark._ import spark.storage.StorageLevel +import util.{MetadataCleaner, TimeStampedHashSet} private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { @@ -64,6 +65,10 @@ private object HttpBroadcast extends Logging { private var serverUri: String = null private var server: HttpServer = null + private val files = new TimeStampedHashSet[String] + private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup) + + def initialize(isMaster: Boolean) { synchronized { if (!initialized) { @@ -85,6 +90,7 @@ private object HttpBroadcast extends Logging { server = null } initialized = false + cleaner.cancel() } } @@ -108,6 +114,7 @@ private object HttpBroadcast extends Logging { val serOut = ser.serializeStream(out) serOut.writeObject(value) serOut.close() + files += file.getAbsolutePath } def read[T](id: Long): T = { @@ -123,4 +130,21 @@ private object HttpBroadcast extends Logging { serIn.close() obj } + + def cleanup(cleanupTime: Long) { + val iterator = files.internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val (file, time) = (entry.getKey, entry.getValue) + if (time < cleanupTime) { + try { + iterator.remove() + new File(file.toString).delete() + logInfo("Deleted broadcast file '" + file + "'") + } catch { + case e: Exception => logWarning("Could not delete broadcast file '" + file + "'", e) + } + } + } + } } -- cgit v1.2.3