diff options
author | Reynold Xin <rxin@databricks.com> | 2015-11-04 12:33:47 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-11-04 12:33:47 -0800 |
commit | abf5e4285d97b148a32cf22f5287511198175cb6 (patch) | |
tree | 8a83128b5ee1f77c34ae934c007793d474e4e02c /sql | |
parent | de289bf279e14e47859b5fbcd70e97b9d0759f14 (diff) | |
download | spark-abf5e4285d97b148a32cf22f5287511198175cb6.tar.gz spark-abf5e4285d97b148a32cf22f5287511198175cb6.tar.bz2 spark-abf5e4285d97b148a32cf22f5287511198175cb6.zip |
[SPARK-11504][SQL] API audit for distributeBy and localSort
1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local"
2. distributeBy -> repartition to match the existing repartition.
Author: Reynold Xin <rxin@databricks.com>
Closes #9470 from rxin/SPARK-11504.
Diffstat (limited to 'sql')
3 files changed, 113 insertions, 83 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5e9c7efbbf..d3a2249d70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -241,18 +241,6 @@ class DataFrame private[sql]( sb.toString() } - private[sql] def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame = { - val sortOrder: Seq[SortOrder] = sortExprs.map { col => - col.expr match { - case expr: SortOrder => - expr - case expr: Expression => - SortOrder(expr, Ascending) - } - } - Sort(sortOrder, global = global, logicalPlan) - } - override def toString: String = { try { schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") @@ -620,6 +608,32 @@ class DataFrame private[sql]( } /** + * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * + * This is the same operation as "SORT BY" in SQL (Hive QL). + * + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def sortWithinPartitions(sortCol: String, sortCols: String*): DataFrame = { + sortWithinPartitions(sortCol, sortCols : _*) + } + + /** + * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * + * This is the same operation as "SORT BY" in SQL (Hive QL). + * + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def sortWithinPartitions(sortExprs: Column*): DataFrame = { + sortInternal(global = false, sortExprs) + } + + /** * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order. * {{{ * // The following 3 are equivalent @@ -645,7 +659,7 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def sort(sortExprs: Column*): DataFrame = { - sortInternal(true, sortExprs) + sortInternal(global = true, sortExprs) } /** @@ -667,44 +681,6 @@ class DataFrame private[sql]( def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*) /** - * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into - * `numPartitions`. The resulting DataFrame is hash partitioned. - * @group dfops - * @since 1.6.0 - */ - def distributeBy(partitionExprs: Seq[Column], numPartitions: Int): DataFrame = { - RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, Some(numPartitions)) - } - - /** - * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving - * the existing number of partitions. The resulting DataFrame is hash partitioned. - * @group dfops - * @since 1.6.0 - */ - def distributeBy(partitionExprs: Seq[Column]): DataFrame = { - RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, None) - } - - /** - * Returns a new [[DataFrame]] with each partition sorted by the given expressions. - * @group dfops - * @since 1.6.0 - */ - @scala.annotation.varargs - def localSort(sortCol: String, sortCols: String*): DataFrame = localSort(sortCol, sortCols : _*) - - /** - * Returns a new [[DataFrame]] with each partition sorted by the given expressions. - * @group dfops - * @since 1.6.0 - */ - @scala.annotation.varargs - def localSort(sortExprs: Column*): DataFrame = { - sortInternal(false, sortExprs) - } - - /** * Selects column based on the column name and return it as a [[Column]]. * Note that the column name can also reference to a nested column like `a.b`. * @group dfops @@ -798,7 +774,9 @@ class DataFrame private[sql]( * SQL expressions. * * {{{ + * // The following are equivalent: * df.selectExpr("colA", "colB as newName", "abs(colC)") + * df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)")) * }}} * @group dfops * @since 1.3.0 @@ -1524,7 +1502,7 @@ class DataFrame private[sql]( /** * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. - * @group rdd + * @group dfops * @since 1.3.0 */ def repartition(numPartitions: Int): DataFrame = { @@ -1532,6 +1510,34 @@ class DataFrame private[sql]( } /** + * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into + * `numPartitions`. The resulting DataFrame is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = { + RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions)) + } + + /** + * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving + * the existing number of partitions. The resulting DataFrame is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def repartition(partitionExprs: Column*): DataFrame = { + RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None) + } + + /** * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of @@ -2016,6 +2022,12 @@ class DataFrame private[sql]( write.mode(SaveMode.Append).insertInto(tableName) } + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // End of deprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + /** * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with * an execution. @@ -2045,10 +2057,16 @@ class DataFrame private[sql]( } } - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - // End of deprecated methods - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// + private def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame = { + val sortOrder: Seq[SortOrder] = sortExprs.map { col => + col.expr match { + case expr: SortOrder => + expr + case expr: Expression => + SortOrder(expr, Ascending) + } + } + Sort(sortOrder, global = global, logicalPlan) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 605954b105..dbcb011f60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -379,8 +379,8 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { // Set up two tables distributed in the same way. Try this with the data distributed into // different number of partitions. for (numPartitions <- 1 until 10 by 4) { - testData.distributeBy(Column("key") :: Nil, numPartitions).registerTempTable("t1") - testData2.distributeBy(Column("a") :: Nil, numPartitions).registerTempTable("t2") + testData.repartition(numPartitions, $"key").registerTempTable("t1") + testData2.repartition(numPartitions, $"a").registerTempTable("t2") sqlContext.cacheTable("t1") sqlContext.cacheTable("t2") @@ -401,8 +401,20 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { } // Distribute the tables into non-matching number of partitions. Need to shuffle. - testData.distributeBy(Column("key") :: Nil, 6).registerTempTable("t1") - testData2.distributeBy(Column("a") :: Nil, 3).registerTempTable("t2") + testData.repartition(6, $"key").registerTempTable("t1") + testData2.repartition(3, $"a").registerTempTable("t2") + sqlContext.cacheTable("t1") + sqlContext.cacheTable("t2") + + verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"), 2) + sqlContext.uncacheTable("t1") + sqlContext.uncacheTable("t2") + sqlContext.dropTempTable("t1") + sqlContext.dropTempTable("t2") + + // One side of join is not partitioned in the desired way. Need to shuffle. + testData.repartition(6, $"value").registerTempTable("t1") + testData2.repartition(6, $"a").registerTempTable("t2") sqlContext.cacheTable("t1") sqlContext.cacheTable("t2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a9e6413423..84a616d0b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1044,79 +1044,79 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("distributeBy and localSort") { val original = testData.repartition(1) assert(original.rdd.partitions.length == 1) - val df = original.distributeBy(Column("key") :: Nil, 5) - assert(df.rdd.partitions.length == 5) + val df = original.repartition(5, $"key") + assert(df.rdd.partitions.length == 5) checkAnswer(original.select(), df.select()) - val df2 = original.distributeBy(Column("key") :: Nil, 10) - assert(df2.rdd.partitions.length == 10) + val df2 = original.repartition(10, $"key") + assert(df2.rdd.partitions.length == 10) checkAnswer(original.select(), df2.select()) // Group by the column we are distributed by. This should generate a plan with no exchange // between the aggregates - val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count() + val df3 = testData.repartition($"key").groupBy("key").count() verifyNonExchangingAgg(df3) - verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil) + verifyNonExchangingAgg(testData.repartition($"key", $"value") .groupBy("key", "value").count()) // Grouping by just the first distributeBy expr, need to exchange. - verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil) + verifyExchangingAgg(testData.repartition($"key", $"value") .groupBy("key").count()) val data = sqlContext.sparkContext.parallelize( (1 to 100).map(i => TestData2(i % 10, i))).toDF() // Distribute and order by. - val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc) + val df4 = data.repartition($"a").sortWithinPartitions($"b".desc) // Walk each partition and verify that it is sorted descending and does not contain all // the values. - df4.rdd.foreachPartition(p => { + df4.rdd.foreachPartition { p => var previousValue: Int = -1 var allSequential: Boolean = true - p.foreach(r => { + p.foreach { r => val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue < v) throw new SparkException("Partition is not ordered.") if (v + 1 != previousValue) allSequential = false } previousValue = v - }) + } if (allSequential) throw new SparkException("Partition should not be globally ordered") - }) + } // Distribute and order by with multiple order bys - val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc) + val df5 = data.repartition(2, $"a").sortWithinPartitions($"b".asc, $"a".asc) // Walk each partition and verify that it is sorted ascending - df5.rdd.foreachPartition(p => { + df5.rdd.foreachPartition { p => var previousValue: Int = -1 var allSequential: Boolean = true - p.foreach(r => { + p.foreach { r => val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue > v) throw new SparkException("Partition is not ordered.") if (v - 1 != previousValue) allSequential = false } previousValue = v - }) + } if (allSequential) throw new SparkException("Partition should not be all sequential") - }) + } // Distribute into one partition and order by. This partition should contain all the values. - val df6 = data.distributeBy(Column("a") :: Nil, 1).localSort($"b".asc) + val df6 = data.repartition(1, $"a").sortWithinPartitions($"b".asc) // Walk each partition and verify that it is sorted descending and not globally sorted. - df6.rdd.foreachPartition(p => { + df6.rdd.foreachPartition { p => var previousValue: Int = -1 var allSequential: Boolean = true - p.foreach(r => { + p.foreach { r => val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue > v) throw new SparkException("Partition is not ordered.") if (v - 1 != previousValue) allSequential = false } previousValue = v - }) + } if (!allSequential) throw new SparkException("Partition should contain all sequential values") - }) + } } test("fix case sensitivity of partition by") { |