aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-02-22 17:26:56 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-22 17:26:56 -0800
commitdc005ed53c87216efff50268009217ba26e34a10 (patch)
treea60d01a64d761433eee31b8e60e07f9a1085b336 /sql/core/src/main/scala/org
parent4661d30b988bf773ab45a15b143efb2908d33743 (diff)
downloadspark-dc005ed53c87216efff50268009217ba26e34a10.tar.gz
spark-dc005ed53c87216efff50268009217ba26e34a10.tar.bz2
spark-dc005ed53c87216efff50268009217ba26e34a10.zip
[SPARK-19658][SQL] Set NumPartitions of RepartitionByExpression In Parser
### What changes were proposed in this pull request? Currently, if `NumPartitions` is not set in RepartitionByExpression, we will set it using `spark.sql.shuffle.partitions` during Planner. However, this is not following the general resolution process. This PR is to set it in `Parser` and then `Optimizer` can use the value for plan optimization. ### How was this patch tested? Added a test case. Author: Xiao Li <gatorsmile@gmail.com> Closes #16988 from gatorsmile/resolveRepartition.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala6
3 files changed, 19 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 38a24cc8ed..1ebc53d2bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2410,7 +2410,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
- RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
+ RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions)
}
/**
@@ -2425,7 +2425,8 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
- RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
+ RepartitionByExpression(
+ partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d508002352..1340aebc1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -22,16 +22,17 @@ import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode
-import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, _}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.types.StructType
/**
* Concrete parser for Spark SQL statements.
@@ -1441,4 +1442,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
reader, writer,
schemaLess)
}
+
+ /**
+ * Create a clause for DISTRIBUTE BY.
+ */
+ override protected def withRepartitionByExpression(
+ ctx: QueryOrganizationContext,
+ expressions: Seq[Expression],
+ query: LogicalPlan): LogicalPlan = {
+ RepartitionByExpression(expressions, query, conf.numShufflePartitions)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 557181ebd9..0e3d5595df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -332,8 +332,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
- def numPartitions: Int = self.numPartitions
-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
@@ -414,9 +412,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
case r: logical.Range =>
execution.RangeExec(r) :: Nil
- case logical.RepartitionByExpression(expressions, child, nPartitions) =>
+ case logical.RepartitionByExpression(expressions, child, numPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
- expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
+ expressions, numPartitions), planLater(child)) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil