diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-05-24 18:55:23 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-24 18:55:23 -0700 |
commit | f08bf587b1913c6cc8ecb34c45331cf4750961c9 (patch) | |
tree | aaa71096ce0da7d546ae373aed76eab84ddc2f7f /sql | |
parent | e631b819fe348729aab062207a452b8f1d1511bd (diff) | |
download | spark-f08bf587b1913c6cc8ecb34c45331cf4750961c9.tar.gz spark-f08bf587b1913c6cc8ecb34c45331cf4750961c9.tar.bz2 spark-f08bf587b1913c6cc8ecb34c45331cf4750961c9.zip |
[SPARK-15512][CORE] repartition(0) should raise IllegalArgumentException
## What changes were proposed in this pull request?
Previously, SPARK-8893 added the constraints on positive number of partitions for repartition/coalesce operations in general. This PR adds one missing part for that and adds explicit two testcases.
**Before**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0).collect()
res1: Array[Int] = Array() // empty
scala> spark.sql("select 1").coalesce(0)
res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").coalesce(0).collect()
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
scala> spark.sql("select 1").repartition(0)
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").repartition(0).collect()
res4: Array[org.apache.spark.sql.Row] = Array() // empty
```
**After**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> spark.sql("select 1").coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> spark.sql("select 1").repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
```
## How was this patch tested?
Pass the Jenkins tests with new testcases.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13282 from dongjoon-hyun/SPARK-15512.
Diffstat (limited to 'sql')
4 files changed, 18 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ca0096eeb2..0a9250b71f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -734,6 +734,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { */ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode { + require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index a5bdee1b85..28cbce8748 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -43,7 +43,7 @@ case class RepartitionByExpression( child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData { numPartitions match { - case Some(n) => require(n > 0, "numPartitions must be greater than 0.") + case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.") case None => // Ok } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f573abf859..0614747352 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -259,12 +259,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("repartition") { + intercept[IllegalArgumentException] { + testData.select('key).repartition(0) + } + checkAnswer( testData.select('key).repartition(10).select('key), testData.select('key).collect().toSeq) } test("coalesce") { + intercept[IllegalArgumentException] { + testData.select('key).coalesce(0) + } + assert(testData.select('key).coalesce(1).rdd.partitions.size === 1) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0ffbd6db12..05de79eb2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -81,6 +81,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val data = (1 to 100).map(i => ClassData(i.toString, i)) val ds = data.toDS() + intercept[IllegalArgumentException] { + ds.coalesce(0) + } + + intercept[IllegalArgumentException] { + ds.repartition(0) + } + assert(ds.repartition(10).rdd.partitions.length == 10) checkDataset( ds.repartition(10), |