diff options
Diffstat (limited to 'core/src/main/scala/spark/api')
7 files changed, 95 insertions, 64 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 843e1bd18b..da3cb2cd31 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -44,7 +44,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -53,6 +53,11 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav fromRDD(srdd.filter(x => f(x).booleanValue())) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions)) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD = diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 8ce32e0e2f..df3af3817d 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -54,7 +54,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -63,6 +63,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.coalesce(numPartitions)) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] = @@ -117,8 +122,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], - numSplits: Int): JavaPairRDD[K, C] = - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) + numPartitions: Int): JavaPairRDD[K, C] = + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) /** * Merge the values for each key using an associative reduce function. This will also perform @@ -157,10 +162,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a - * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits. + * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] = - fromRDD(rdd.reduceByKey(func, numSplits)) + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] = + fromRDD(rdd.reduceByKey(func, numPartitions)) /** * Group the values for each key in the RDD into a single sequence. Allows controlling the @@ -171,10 +176,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with into `numSplits` partitions. + * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] = - fromRDD(groupByResultToJava(rdd.groupByKey(numSplits))) + def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = + fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) /** * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` @@ -256,8 +261,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] = - fromRDD(rdd.join(other, numSplits)) + def join[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, W)] = + fromRDD(rdd.join(other, numPartitions)) /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the @@ -272,10 +277,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output - * into `numSplits` partitions. + * into `numPartitions` partitions. */ - def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] = - fromRDD(rdd.leftOuterJoin(other, numSplits)) + def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] = + fromRDD(rdd.leftOuterJoin(other, numPartitions)) /** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the @@ -292,8 +297,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] = - fromRDD(rdd.rightOuterJoin(other, numSplits)) + def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] = + fromRDD(rdd.rightOuterJoin(other, numPartitions)) /** * Return the key-value pairs in this RDD to the master as a Map. @@ -357,16 +362,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])] - = fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits))) + def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] + = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int) + def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = - fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits))) + fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) /** Alias for cogroup. */ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = @@ -447,7 +452,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif */ def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] - sortByKey(comp, true) + sortByKey(comp, ascending) } /** diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index ac31350ec3..3ccd6f055e 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -30,7 +30,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits)) + def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions)) /** * Return a new RDD containing only the elements that satisfy a predicate. @@ -39,6 +39,11 @@ JavaRDDLike[T, JavaRDD[T]] { wrapRDD(rdd.filter((x => f(x).booleanValue()))) /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int): JavaRDD[T] = rdd.coalesce(numPartitions) + + /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 60025b459c..90b45cf875 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -4,7 +4,7 @@ import java.util.{List => JList} import scala.Tuple2 import scala.collection.JavaConversions._ -import spark.{SparkContext, Split, RDD, TaskContext} +import spark.{SparkContext, Partition, RDD, TaskContext} import spark.api.java.JavaPairRDD._ import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import spark.partial.{PartialResult, BoundedDouble} @@ -20,7 +20,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround def rdd: RDD[T] /** Set of partitions in this RDD. */ - def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq) + def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) /** The [[spark.SparkContext]] that this RDD was created on. */ def context: SparkContext = rdd.context @@ -36,7 +36,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ - def iterator(split: Split, taskContext: TaskContext): java.util.Iterator[T] = + def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split, taskContext)) // Transformations (return a new RDD) @@ -146,12 +146,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] implicit val vcm: ClassManifest[JList[T]] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]] - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm) + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm) } /** @@ -201,7 +201,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround } /** - * Reduces the elements of this RDD using the specified associative binary operator. + * Reduces the elements of this RDD using the specified commutative and associative binary operator. */ def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f) @@ -333,6 +333,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString(): String = { - rdd.toDebugString() + rdd.toDebugString } } diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 50b8970cd8..f75fc27c7b 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -23,41 +23,41 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param jobName A name for your job, to display on the cluster web UI + * @param appName A name for your application, to display on the cluster web UI */ - def this(master: String, jobName: String) = this(new SparkContext(master, jobName)) + def this(master: String, appName: String) = this(new SparkContext(master, appName)) /** * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param jobName A name for your job, to display on the cluster web UI + * @param appName A name for your application, to display on the cluster web UI * @param sparkHome The SPARK_HOME directory on the slave nodes * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. */ - def this(master: String, jobName: String, sparkHome: String, jarFile: String) = - this(new SparkContext(master, jobName, sparkHome, Seq(jarFile))) + def this(master: String, appName: String, sparkHome: String, jarFile: String) = + this(new SparkContext(master, appName, sparkHome, Seq(jarFile))) /** * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param jobName A name for your job, to display on the cluster web UI + * @param appName A name for your application, to display on the cluster web UI * @param sparkHome The SPARK_HOME directory on the slave nodes * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. */ - def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) = - this(new SparkContext(master, jobName, sparkHome, jars.toSeq)) + def this(master: String, appName: String, sparkHome: String, jars: Array[String]) = + this(new SparkContext(master, appName, sparkHome, jars.toSeq)) /** * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param jobName A name for your job, to display on the cluster web UI + * @param appName A name for your application, to display on the cluster web UI * @param sparkHome The SPARK_HOME directory on the slave nodes * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. * @param environment Environment variables to set on worker nodes */ - def this(master: String, jobName: String, sparkHome: String, jars: Array[String], + def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment)) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment)) private[spark] val env = sc.env diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala index 519e310323..d618c098c2 100644 --- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala +++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala @@ -9,7 +9,7 @@ import java.util.Arrays * * Stores the unique id() of the Python-side partitioning function so that it is incorporated into * equality comparisons. Correctness requires that the id is a unique identifier for the - * lifetime of the job (i.e. that it is not re-used as the id of a different partitioning + * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning * function). This can be ensured by using the Python id() function and maintaining a reference * to the Python partitioning function so that its id() is not reused. */ diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index f43a152ca7..8c73477384 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -32,11 +32,11 @@ private[spark] class PythonRDD[T: ClassManifest]( this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, broadcastVars, accumulator) - override def getSplits = parent.splits + override def getPartitions = parent.partitions override val partitioner = if (preservePartitoning) parent.partitioner else None - override def compute(split: Split, context: TaskContext): Iterator[Array[Byte]] = { + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME") val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py")) @@ -65,7 +65,7 @@ private[spark] class PythonRDD[T: ClassManifest]( SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) val dOut = new DataOutputStream(proc.getOutputStream) - // Split index + // Partition index dOut.writeInt(split.index) // sparkFilesDir PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dOut) @@ -103,21 +103,27 @@ private[spark] class PythonRDD[T: ClassManifest]( private def read(): Array[Byte] = { try { - val length = stream.readInt() - if (length != -1) { - val obj = new Array[Byte](length) - stream.readFully(obj) - obj - } else { - // We've finished the data section of the output, but we can still read some - // accumulator updates; let's do that, breaking when we get EOFException - while (true) { - val len2 = stream.readInt() - val update = new Array[Byte](len2) - stream.readFully(update) - accumulator += Collections.singletonList(update) - } - new Array[Byte](0) + stream.readInt() match { + case length if length > 0 => + val obj = new Array[Byte](length) + stream.readFully(obj) + obj + case -2 => + // Signals that an exception has been thrown in python + val exLength = stream.readInt() + val obj = new Array[Byte](exLength) + stream.readFully(obj) + throw new PythonException(new String(obj)) + case -1 => + // We've finished the data section of the output, but we can still read some + // accumulator updates; let's do that, breaking when we get EOFException + while (true) { + val len2 = stream.readInt() + val update = new Array[Byte](len2) + stream.readFully(update) + accumulator += Collections.singletonList(update) + } + new Array[Byte](0) } } catch { case eof: EOFException => { @@ -140,14 +146,17 @@ private[spark] class PythonRDD[T: ClassManifest]( val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) } +/** Thrown for exceptions in user Python code. */ +private class PythonException(msg: String) extends Exception(msg) + /** * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python. * This is used by PySpark's shuffle operations. */ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Array[Byte], Array[Byte])](prev) { - override def getSplits = prev.splits - override def compute(split: Split, context: TaskContext) = + override def getPartitions = prev.partitions + override def compute(split: Partition, context: TaskContext) = prev.iterator(split, context).grouped(2).map { case Seq(a, b) => (a, b) case x => throw new Exception("PairwiseRDD: unexpected value: " + x) @@ -229,6 +238,11 @@ private[spark] object PythonRDD { } def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeIteratorToPickleFile(items.asScala, filename) + } + + def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) { val file = new DataOutputStream(new FileOutputStream(filename)) for (item <- items) { writeAsPickle(item, file) @@ -236,8 +250,10 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] = - rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head + def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { + implicit val cm : ClassManifest[T] = rdd.elementClassManifest + rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator + } } private object Pickle { |