aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-09 19:03:55 -0800
committerReynold Xin <rxin@apache.org>2014-01-09 19:03:55 -0800
commit4b074fac054848ebd3397a3cce0a3e7871d3860c (patch)
tree23eb68dc8e20b0e3fc04d90dbee18e68f203bb6e /core/src/main
parenta9d533333da63160c5b7507705b1b6e9c3dbf71e (diff)
parent142921c6c0d0d9134d5fa0e307575d8f6749fb9a (diff)
downloadspark-4b074fac054848ebd3397a3cce0a3e7871d3860c.tar.gz
spark-4b074fac054848ebd3397a3cce0a3e7871d3860c.tar.bz2
spark-4b074fac054848ebd3397a3cce0a3e7871d3860c.zip
Merge pull request #374 from mateiz/completeness
Add some missing Java API methods These are primarily for setting job groups, canceling jobs, and setting names on RDDs. Seemed like useful stuff to expose in Java.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala54
6 files changed, 90 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f91392b351..66c226e491 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -244,6 +244,10 @@ class SparkContext(
localProperties.set(new Properties())
}
+ /**
+ * Set a local property that affects jobs submitted from this thread, such as the
+ * Spark fair scheduler pool.
+ */
def setLocalProperty(key: String, value: String) {
if (localProperties.get() == null) {
localProperties.set(new Properties())
@@ -255,6 +259,10 @@ class SparkContext(
}
}
+ /**
+ * Get a local property set in this thread, or null if it is missing. See
+ * [[org.apache.spark.SparkContext.setLocalProperty]].
+ */
def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
@@ -265,7 +273,7 @@ class SparkContext(
}
/**
- * Assigns a group id to all the jobs started by this thread until the group id is set to a
+ * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
*
* Often, a unit of execution in an application consists of multiple Spark actions or jobs.
@@ -288,7 +296,7 @@ class SparkContext(
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
}
- /** Clear the job group id and its description. */
+ /** Clear the current thread's job group ID and its description. */
def clearJobGroup() {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index da30cf619a..b0dedc6f4e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -207,13 +207,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
* e.g. for the array
* [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
* e.g 1<=x<10 , 10<=x<20, 20<=x<50
- * And on the input of 1 and 50 we would have a histogram of 1,0,0
- *
+ * And on the input of 1 and 50 we would have a histogram of 1,0,0
+ *
* Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
* from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
* to true.
* buckets must be sorted and not contain any duplicates.
- * buckets array must be at least two elements
+ * buckets array must be at least two elements
* All NaN entries are treated the same. If you have a NaN bucket it must be
* the maximum value of the last position and all NaN entries will be counted
* in that bucket.
@@ -225,6 +225,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {
srdd.histogram(buckets.map(_.toDouble), evenBuckets)
}
+
+ /** Assign a name to this RDD */
+ def setName(name: String): JavaDoubleRDD = {
+ srdd.setName(name)
+ this
+ }
}
object JavaDoubleRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 55c87450ac..0fb7e195b3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -647,6 +647,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
}
+
+ /** Assign a name to this RDD */
+ def setName(name: String): JavaPairRDD[K, V] = {
+ rdd.setName(name)
+ this
+ }
}
object JavaPairRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 037cd1c774..7d48ce01cf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -127,6 +127,12 @@ JavaRDDLike[T, JavaRDD[T]] {
wrapRDD(rdd.subtract(other, p))
override def toString = rdd.toString
+
+ /** Assign a name to this RDD */
+ def setName(name: String): JavaRDD[T] = {
+ rdd.setName(name)
+ this
+ }
}
object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 924d8af060..ebbbbd8806 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -245,6 +245,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return an array that contains all of the elements in this RDD.
+ */
+ def toArray(): JList[T] = collect()
+
+ /**
* Return an array that contains all of the elements in a specific partition of this RDD.
*/
def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = {
@@ -455,4 +460,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
+ def name(): String = rdd.name
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index e93b10fd7e..7a6f044965 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -425,6 +425,51 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def clearCallSite() {
sc.clearCallSite()
}
+
+ /**
+ * Set a local property that affects jobs submitted from this thread, such as the
+ * Spark fair scheduler pool.
+ */
+ def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
+
+ /**
+ * Get a local property set in this thread, or null if it is missing. See
+ * [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
+ */
+ def getLocalProperty(key: String): String = sc.getLocalProperty(key)
+
+ /**
+ * Assigns a group ID to all the jobs started by this thread until the group ID is set to a
+ * different value or cleared.
+ *
+ * Often, a unit of execution in an application consists of multiple Spark actions or jobs.
+ * Application programmers can use this method to group all those jobs together and give a
+ * group description. Once set, the Spark web UI will associate such jobs with this group.
+ *
+ * The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]]
+ * to cancel all running jobs in this group. For example,
+ * {{{
+ * // In the main thread:
+ * sc.setJobGroup("some_job_to_cancel", "some job description");
+ * rdd.map(...).count();
+ *
+ * // In a separate thread:
+ * sc.cancelJobGroup("some_job_to_cancel");
+ * }}}
+ */
+ def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)
+
+ /** Clear the current thread's job group ID and its description. */
+ def clearJobGroup(): Unit = sc.clearJobGroup()
+
+ /**
+ * Cancel active jobs for the specified group. See
+ * [[org.apache.spark.api.java.JavaSparkContext.setJobGroup]] for more information.
+ */
+ def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId)
+
+ /** Cancel all jobs that have been scheduled or are running. */
+ def cancelAllJobs(): Unit = sc.cancelAllJobs()
}
object JavaSparkContext {
@@ -436,5 +481,12 @@ object JavaSparkContext {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
*/
- def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
+ def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
+
+ /**
+ * Find the JAR that contains the class of a particular object, to make it easy for users
+ * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
+ * your driver program.
+ */
+ def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray
}