aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-04 12:33:47 -0800
committerYin Huai <yhuai@databricks.com>2015-11-04 12:33:47 -0800
commitabf5e4285d97b148a32cf22f5287511198175cb6 (patch)
tree8a83128b5ee1f77c34ae934c007793d474e4e02c /sql
parentde289bf279e14e47859b5fbcd70e97b9d0759f14 (diff)
downloadspark-abf5e4285d97b148a32cf22f5287511198175cb6.tar.gz
spark-abf5e4285d97b148a32cf22f5287511198175cb6.tar.bz2
spark-abf5e4285d97b148a32cf22f5287511198175cb6.zip
[SPARK-11504][SQL] API audit for distributeBy and localSort
1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local" 2. distributeBy -> repartition to match the existing repartition. Author: Reynold Xin <rxin@databricks.com> Closes #9470 from rxin/SPARK-11504.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala132
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala20
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala44
3 files changed, 113 insertions, 83 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5e9c7efbbf..d3a2249d70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -241,18 +241,6 @@ class DataFrame private[sql](
sb.toString()
}
- private[sql] def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame = {
- val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
- col.expr match {
- case expr: SortOrder =>
- expr
- case expr: Expression =>
- SortOrder(expr, Ascending)
- }
- }
- Sort(sortOrder, global = global, logicalPlan)
- }
-
override def toString: String = {
try {
schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]")
@@ -620,6 +608,32 @@ class DataFrame private[sql](
}
/**
+ * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+ *
+ * This is the same operation as "SORT BY" in SQL (Hive QL).
+ *
+ * @group dfops
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def sortWithinPartitions(sortCol: String, sortCols: String*): DataFrame = {
+ sortWithinPartitions(sortCol, sortCols : _*)
+ }
+
+ /**
+ * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+ *
+ * This is the same operation as "SORT BY" in SQL (Hive QL).
+ *
+ * @group dfops
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def sortWithinPartitions(sortExprs: Column*): DataFrame = {
+ sortInternal(global = false, sortExprs)
+ }
+
+ /**
* Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
* {{{
* // The following 3 are equivalent
@@ -645,7 +659,7 @@ class DataFrame private[sql](
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): DataFrame = {
- sortInternal(true, sortExprs)
+ sortInternal(global = true, sortExprs)
}
/**
@@ -667,44 +681,6 @@ class DataFrame private[sql](
def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*)
/**
- * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
- * `numPartitions`. The resulting DataFrame is hash partitioned.
- * @group dfops
- * @since 1.6.0
- */
- def distributeBy(partitionExprs: Seq[Column], numPartitions: Int): DataFrame = {
- RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, Some(numPartitions))
- }
-
- /**
- * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
- * the existing number of partitions. The resulting DataFrame is hash partitioned.
- * @group dfops
- * @since 1.6.0
- */
- def distributeBy(partitionExprs: Seq[Column]): DataFrame = {
- RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, None)
- }
-
- /**
- * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
- * @group dfops
- * @since 1.6.0
- */
- @scala.annotation.varargs
- def localSort(sortCol: String, sortCols: String*): DataFrame = localSort(sortCol, sortCols : _*)
-
- /**
- * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
- * @group dfops
- * @since 1.6.0
- */
- @scala.annotation.varargs
- def localSort(sortExprs: Column*): DataFrame = {
- sortInternal(false, sortExprs)
- }
-
- /**
* Selects column based on the column name and return it as a [[Column]].
* Note that the column name can also reference to a nested column like `a.b`.
* @group dfops
@@ -798,7 +774,9 @@ class DataFrame private[sql](
* SQL expressions.
*
* {{{
+ * // The following are equivalent:
* df.selectExpr("colA", "colB as newName", "abs(colC)")
+ * df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
* }}}
* @group dfops
* @since 1.3.0
@@ -1524,7 +1502,7 @@ class DataFrame private[sql](
/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
- * @group rdd
+ * @group dfops
* @since 1.3.0
*/
def repartition(numPartitions: Int): DataFrame = {
@@ -1532,6 +1510,34 @@ class DataFrame private[sql](
}
/**
+ * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
+ * `numPartitions`. The resulting DataFrame is hash partitioned.
+ *
+ * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+ *
+ * @group dfops
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = {
+ RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
+ }
+
+ /**
+ * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
+ * the existing number of partitions. The resulting DataFrame is hash partitioned.
+ *
+ * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+ *
+ * @group dfops
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def repartition(partitionExprs: Column*): DataFrame = {
+ RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
+ }
+
+ /**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
@@ -2016,6 +2022,12 @@ class DataFrame private[sql](
write.mode(SaveMode.Append).insertInto(tableName)
}
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ // End of deprecated methods
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
/**
* Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with
* an execution.
@@ -2045,10 +2057,16 @@ class DataFrame private[sql](
}
}
- ////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////
- // End of deprecated methods
- ////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////
+ private def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame = {
+ val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
+ col.expr match {
+ case expr: SortOrder =>
+ expr
+ case expr: Expression =>
+ SortOrder(expr, Ascending)
+ }
+ }
+ Sort(sortOrder, global = global, logicalPlan)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 605954b105..dbcb011f60 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -379,8 +379,8 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
// Set up two tables distributed in the same way. Try this with the data distributed into
// different number of partitions.
for (numPartitions <- 1 until 10 by 4) {
- testData.distributeBy(Column("key") :: Nil, numPartitions).registerTempTable("t1")
- testData2.distributeBy(Column("a") :: Nil, numPartitions).registerTempTable("t2")
+ testData.repartition(numPartitions, $"key").registerTempTable("t1")
+ testData2.repartition(numPartitions, $"a").registerTempTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")
@@ -401,8 +401,20 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
}
// Distribute the tables into non-matching number of partitions. Need to shuffle.
- testData.distributeBy(Column("key") :: Nil, 6).registerTempTable("t1")
- testData2.distributeBy(Column("a") :: Nil, 3).registerTempTable("t2")
+ testData.repartition(6, $"key").registerTempTable("t1")
+ testData2.repartition(3, $"a").registerTempTable("t2")
+ sqlContext.cacheTable("t1")
+ sqlContext.cacheTable("t2")
+
+ verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"), 2)
+ sqlContext.uncacheTable("t1")
+ sqlContext.uncacheTable("t2")
+ sqlContext.dropTempTable("t1")
+ sqlContext.dropTempTable("t2")
+
+ // One side of join is not partitioned in the desired way. Need to shuffle.
+ testData.repartition(6, $"value").registerTempTable("t1")
+ testData2.repartition(6, $"a").registerTempTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a9e6413423..84a616d0b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1044,79 +1044,79 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("distributeBy and localSort") {
val original = testData.repartition(1)
assert(original.rdd.partitions.length == 1)
- val df = original.distributeBy(Column("key") :: Nil, 5)
- assert(df.rdd.partitions.length == 5)
+ val df = original.repartition(5, $"key")
+ assert(df.rdd.partitions.length == 5)
checkAnswer(original.select(), df.select())
- val df2 = original.distributeBy(Column("key") :: Nil, 10)
- assert(df2.rdd.partitions.length == 10)
+ val df2 = original.repartition(10, $"key")
+ assert(df2.rdd.partitions.length == 10)
checkAnswer(original.select(), df2.select())
// Group by the column we are distributed by. This should generate a plan with no exchange
// between the aggregates
- val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count()
+ val df3 = testData.repartition($"key").groupBy("key").count()
verifyNonExchangingAgg(df3)
- verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
+ verifyNonExchangingAgg(testData.repartition($"key", $"value")
.groupBy("key", "value").count())
// Grouping by just the first distributeBy expr, need to exchange.
- verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil)
+ verifyExchangingAgg(testData.repartition($"key", $"value")
.groupBy("key").count())
val data = sqlContext.sparkContext.parallelize(
(1 to 100).map(i => TestData2(i % 10, i))).toDF()
// Distribute and order by.
- val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
+ val df4 = data.repartition($"a").sortWithinPartitions($"b".desc)
// Walk each partition and verify that it is sorted descending and does not contain all
// the values.
- df4.rdd.foreachPartition(p => {
+ df4.rdd.foreachPartition { p =>
var previousValue: Int = -1
var allSequential: Boolean = true
- p.foreach(r => {
+ p.foreach { r =>
val v: Int = r.getInt(1)
if (previousValue != -1) {
if (previousValue < v) throw new SparkException("Partition is not ordered.")
if (v + 1 != previousValue) allSequential = false
}
previousValue = v
- })
+ }
if (allSequential) throw new SparkException("Partition should not be globally ordered")
- })
+ }
// Distribute and order by with multiple order bys
- val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc)
+ val df5 = data.repartition(2, $"a").sortWithinPartitions($"b".asc, $"a".asc)
// Walk each partition and verify that it is sorted ascending
- df5.rdd.foreachPartition(p => {
+ df5.rdd.foreachPartition { p =>
var previousValue: Int = -1
var allSequential: Boolean = true
- p.foreach(r => {
+ p.foreach { r =>
val v: Int = r.getInt(1)
if (previousValue != -1) {
if (previousValue > v) throw new SparkException("Partition is not ordered.")
if (v - 1 != previousValue) allSequential = false
}
previousValue = v
- })
+ }
if (allSequential) throw new SparkException("Partition should not be all sequential")
- })
+ }
// Distribute into one partition and order by. This partition should contain all the values.
- val df6 = data.distributeBy(Column("a") :: Nil, 1).localSort($"b".asc)
+ val df6 = data.repartition(1, $"a").sortWithinPartitions($"b".asc)
// Walk each partition and verify that it is sorted descending and not globally sorted.
- df6.rdd.foreachPartition(p => {
+ df6.rdd.foreachPartition { p =>
var previousValue: Int = -1
var allSequential: Boolean = true
- p.foreach(r => {
+ p.foreach { r =>
val v: Int = r.getInt(1)
if (previousValue != -1) {
if (previousValue > v) throw new SparkException("Partition is not ordered.")
if (v - 1 != previousValue) allSequential = false
}
previousValue = v
- })
+ }
if (!allSequential) throw new SparkException("Partition should contain all sequential values")
- })
+ }
}
test("fix case sensitivity of partition by") {