aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-19 23:16:44 -0700
committerReynold Xin <rxin@apache.org>2013-10-19 23:16:44 -0700
commiteb9bf694620ec437c07f01b6245e7b47ceb9ea89 (patch)
tree052e98b06451d1c22f25d222fe4432558843ba4a /core
parentf628804c023a44bf7506bdca017d9a895dd7eb32 (diff)
downloadspark-eb9bf694620ec437c07f01b6245e7b47ceb9ea89.tar.gz
spark-eb9bf694620ec437c07f01b6245e7b47ceb9ea89.tar.bz2
spark-eb9bf694620ec437c07f01b6245e7b47ceb9ea89.zip
Added documentation for setJobGroup. Also some minor cleanup in SparkContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala55
1 files changed, 36 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 48bbc78795..0ceb580913 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,25 +51,20 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend,
- ClusterScheduler}
-import org.apache.spark.scheduler.local.LocalScheduler
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
+ SparkDeploySchedulerBackend, ClusterScheduler}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
+import org.apache.spark.scheduler.local.LocalScheduler
+import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util._
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
-import scala.Some
-import org.apache.spark.scheduler.StageInfo
-import org.apache.spark.storage.RDDInfo
-import org.apache.spark.storage.StorageStatus
+import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
+ TimeStampedHashMap, Utils}
+
+
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -125,7 +120,7 @@ class SparkContext(
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup)
- // Initalize the Spark UI
+ // Initialize the Spark UI
private[spark] val ui = new SparkUI(this)
ui.bind()
@@ -292,11 +287,31 @@ class SparkContext(
setJobGroup("", value)
}
+ /**
+ * Assigns a group id to all the jobs started by this thread until the group id is set to a
+ * different value or cleared.
+ *
+ * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
+ * Application programmers can use this method to group all those jobs together and give the
+ * group a description. Once set, the Spark web UI will associate such jobs with this group.
+ *
+ * The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all
+ * running jobs in this group. For example,
+ * {{{
+ * // In the main thread:
+ * sc.setJobGroup("some_job_to_cancel", "some job description")
+ * sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
+ *
+ * // In a separate thread:
+ * sc.cancelJobGroup("some_job_to_cancel")
+ * }}}
+ */
def setJobGroup(groupId: String, description: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
}
+ /** Clear the job group id and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
@@ -881,13 +896,15 @@ class SparkContext(
new SimpleFutureAction(waiter, resultFunc)
}
+ /**
+ * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
+ * for more information.
+ */
def cancelJobGroup(groupId: String) {
dagScheduler.cancelJobGroup(groupId)
}
- /**
- * Cancel all jobs that have been scheduled or are running.
- */
+ /** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs() {
dagScheduler.cancelAllJobs()
}
@@ -949,9 +966,9 @@ class SparkContext(
*/
object SparkContext {
- val SPARK_JOB_DESCRIPTION = "spark.job.description"
+ private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
- val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
+ private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2