From 5013fb64b27e46bbc5daf4f06fdc70938c06cf29 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Tue, 10 Dec 2013 00:38:16 -0800 Subject: Expose numPartitions parameter in JavaPairRDD.sortByKey() This change make Java and Scala API on sortByKey() the same. --- .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 2142fd7327..a191dfd1dc 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -583,12 +583,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ - def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { + def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = sortByKey(comp, ascending, rdd.partitions.size) + + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ + def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { class KeyOrdering(val a: K) extends Ordered[K] { override def compare(b: K) = comp.compare(a, b) } implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) - fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) } /** -- cgit v1.2.3 From c82d4f079bf84da06801419defe644f574d0b8d1 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Tue, 10 Dec 2013 01:04:52 -0800 Subject: Use braces to shorten the line. --- core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index a191dfd1dc..36bd3e673f 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -583,7 +583,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ - def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = sortByKey(comp, ascending, rdd.partitions.size) + def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { + sortByKey(comp, ascending, rdd.partitions.size) + } /** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling -- cgit v1.2.3 From e85af507671d417724c28ee2db499fc019feb1d8 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Tue, 10 Dec 2013 11:01:56 -0800 Subject: Leave default value of numPartitions to Scala code. --- .../src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 36bd3e673f..2d2b3847de 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -584,7 +584,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * order of the keys). */ def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { - sortByKey(comp, ascending, rdd.partitions.size) + // numPartitions should never be negative in practice so we can use -1 here to indicate that + // we want to use implementation's default value. + sortByKey(comp, ascending, -1) } /** @@ -598,7 +600,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif override def compare(b: K) = comp.compare(a, b) } implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) - fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) + if (numPartitions < 0) { + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) + } else { + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) + } } /** -- cgit v1.2.3 From 0b494f7db47cf1de35aaed046f21bbb3592c3d97 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Tue, 10 Dec 2013 11:17:52 -0800 Subject: Hook directly to Scala API --- .../main/scala/org/apache/spark/api/java/JavaPairRDD.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 2d2b3847de..d0ca289a6c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -584,9 +584,11 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif * order of the keys). */ def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { - // numPartitions should never be negative in practice so we can use -1 here to indicate that - // we want to use implementation's default value. - sortByKey(comp, ascending, -1) + class KeyOrdering(val a: K) extends Ordered[K] { + override def compare(b: K) = comp.compare(a, b) + } + implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) } /** @@ -600,11 +602,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif override def compare(b: K) = comp.compare(a, b) } implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) - if (numPartitions < 0) { - fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) - } else { - fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) - } + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) } /** -- cgit v1.2.3