aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/api
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/api')
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala47
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala7
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala14
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala22
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala2
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala60
7 files changed, 95 insertions, 64 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 843e1bd18b..da3cb2cd31 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -44,7 +44,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
+ def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
@@ -53,6 +53,11 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
fromRDD(srdd.filter(x => f(x).booleanValue()))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 8ce32e0e2f..df3af3817d 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -54,7 +54,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
+ def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
@@ -63,6 +63,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
@@ -117,8 +122,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
- numSplits: Int): JavaPairRDD[K, C] =
- combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
+ numPartitions: Int): JavaPairRDD[K, C] =
+ combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
/**
* Merge the values for each key using an associative reduce function. This will also perform
@@ -157,10 +162,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
+ * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
- def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] =
- fromRDD(rdd.reduceByKey(func, numSplits))
+ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
+ fromRDD(rdd.reduceByKey(func, numPartitions))
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
@@ -171,10 +176,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with into `numSplits` partitions.
+ * resulting RDD with into `numPartitions` partitions.
*/
- def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] =
- fromRDD(groupByResultToJava(rdd.groupByKey(numSplits)))
+ def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
+ fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
@@ -256,8 +261,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
- def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] =
- fromRDD(rdd.join(other, numSplits))
+ def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] =
+ fromRDD(rdd.join(other, numPartitions))
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -272,10 +277,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
* resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
- * into `numSplits` partitions.
+ * into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other, numSplits))
+ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] =
+ fromRDD(rdd.leftOuterJoin(other, numPartitions))
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -292,8 +297,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other, numSplits))
+ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] =
+ fromRDD(rdd.rightOuterJoin(other, numPartitions))
/**
* Return the key-value pairs in this RDD to the master as a Map.
@@ -357,16 +362,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])]
- = fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits)))
+ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])]
+ = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
/**
* For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
- def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int)
+ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
: JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
- fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits)))
+ fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
/** Alias for cogroup. */
def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
@@ -447,7 +452,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
*/
def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
- sortByKey(comp, true)
+ sortByKey(comp, ascending)
}
/**
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index ac31350ec3..3ccd6f055e 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -30,7 +30,7 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
+ def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
@@ -39,6 +39,11 @@ JavaRDDLike[T, JavaRDD[T]] {
wrapRDD(rdd.filter((x => f(x).booleanValue())))
/**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 60025b459c..90b45cf875 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -4,7 +4,7 @@ import java.util.{List => JList}
import scala.Tuple2
import scala.collection.JavaConversions._
-import spark.{SparkContext, Split, RDD, TaskContext}
+import spark.{SparkContext, Partition, RDD, TaskContext}
import spark.api.java.JavaPairRDD._
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import spark.partial.{PartialResult, BoundedDouble}
@@ -20,7 +20,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
def rdd: RDD[T]
/** Set of partitions in this RDD. */
- def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq)
+ def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq)
/** The [[spark.SparkContext]] that this RDD was created on. */
def context: SparkContext = rdd.context
@@ -36,7 +36,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
- def iterator(split: Split, taskContext: TaskContext): java.util.Iterator[T] =
+ def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
asJavaIterator(rdd.iterator(split, taskContext))
// Transformations (return a new RDD)
@@ -146,12 +146,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = {
+ def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val vcm: ClassManifest[JList[T]] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm)
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
}
/**
@@ -201,7 +201,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
}
/**
- * Reduces the elements of this RDD using the specified associative binary operator.
+ * Reduces the elements of this RDD using the specified commutative and associative binary operator.
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)
@@ -333,6 +333,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString(): String = {
- rdd.toDebugString()
+ rdd.toDebugString
}
}
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index 50b8970cd8..f75fc27c7b 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -23,41 +23,41 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param jobName A name for your job, to display on the cluster web UI
+ * @param appName A name for your application, to display on the cluster web UI
*/
- def this(master: String, jobName: String) = this(new SparkContext(master, jobName))
+ def this(master: String, appName: String) = this(new SparkContext(master, appName))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param jobName A name for your job, to display on the cluster web UI
+ * @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
- def this(master: String, jobName: String, sparkHome: String, jarFile: String) =
- this(new SparkContext(master, jobName, sparkHome, Seq(jarFile)))
+ def this(master: String, appName: String, sparkHome: String, jarFile: String) =
+ this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param jobName A name for your job, to display on the cluster web UI
+ * @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
- def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) =
- this(new SparkContext(master, jobName, sparkHome, jars.toSeq))
+ def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
+ this(new SparkContext(master, appName, sparkHome, jars.toSeq))
/**
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param jobName A name for your job, to display on the cluster web UI
+ * @param appName A name for your application, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
* @param environment Environment variables to set on worker nodes
*/
- def this(master: String, jobName: String, sparkHome: String, jars: Array[String],
+ def this(master: String, appName: String, sparkHome: String, jars: Array[String],
environment: JMap[String, String]) =
- this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment))
+ this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
private[spark] val env = sc.env
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 519e310323..d618c098c2 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -9,7 +9,7 @@ import java.util.Arrays
*
* Stores the unique id() of the Python-side partitioning function so that it is incorporated into
* equality comparisons. Correctness requires that the id is a unique identifier for the
- * lifetime of the job (i.e. that it is not re-used as the id of a different partitioning
+ * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
* function). This can be ensured by using the Python id() function and maintaining a reference
* to the Python partitioning function so that its id() is not reused.
*/
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index f43a152ca7..8c73477384 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -32,11 +32,11 @@ private[spark] class PythonRDD[T: ClassManifest](
this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
broadcastVars, accumulator)
- override def getSplits = parent.splits
+ override def getPartitions = parent.partitions
override val partitioner = if (preservePartitoning) parent.partitioner else None
- override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
@@ -65,7 +65,7 @@ private[spark] class PythonRDD[T: ClassManifest](
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
val dOut = new DataOutputStream(proc.getOutputStream)
- // Split index
+ // Partition index
dOut.writeInt(split.index)
// sparkFilesDir
PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dOut)
@@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest](
private def read(): Array[Byte] = {
try {
- val length = stream.readInt()
- if (length != -1) {
- val obj = new Array[Byte](length)
- stream.readFully(obj)
- obj
- } else {
- // We've finished the data section of the output, but we can still read some
- // accumulator updates; let's do that, breaking when we get EOFException
- while (true) {
- val len2 = stream.readInt()
- val update = new Array[Byte](len2)
- stream.readFully(update)
- accumulator += Collections.singletonList(update)
- }
- new Array[Byte](0)
+ stream.readInt() match {
+ case length if length > 0 =>
+ val obj = new Array[Byte](length)
+ stream.readFully(obj)
+ obj
+ case -2 =>
+ // Signals that an exception has been thrown in python
+ val exLength = stream.readInt()
+ val obj = new Array[Byte](exLength)
+ stream.readFully(obj)
+ throw new PythonException(new String(obj))
+ case -1 =>
+ // We've finished the data section of the output, but we can still read some
+ // accumulator updates; let's do that, breaking when we get EOFException
+ while (true) {
+ val len2 = stream.readInt()
+ val update = new Array[Byte](len2)
+ stream.readFully(update)
+ accumulator += Collections.singletonList(update)
+ }
+ new Array[Byte](0)
}
} catch {
case eof: EOFException => {
@@ -140,14 +146,17 @@ private[spark] class PythonRDD[T: ClassManifest](
val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
}
+/** Thrown for exceptions in user Python code. */
+private class PythonException(msg: String) extends Exception(msg)
+
/**
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
RDD[(Array[Byte], Array[Byte])](prev) {
- override def getSplits = prev.splits
- override def compute(split: Split, context: TaskContext) =
+ override def getPartitions = prev.partitions
+ override def compute(split: Partition, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (a, b)
case x => throw new Exception("PairwiseRDD: unexpected value: " + x)
@@ -229,6 +238,11 @@ private[spark] object PythonRDD {
}
def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
+ import scala.collection.JavaConverters._
+ writeIteratorToPickleFile(items.asScala, filename)
+ }
+
+ def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
for (item <- items) {
writeAsPickle(item, file)
@@ -236,8 +250,10 @@ private[spark] object PythonRDD {
file.close()
}
- def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] =
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head
+ def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
+ implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+ rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
+ }
}
private object Pickle {