From 5c8a0ec99bded2271481f8d6cf5443fea5da4bbd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 23 Apr 2016 12:44:00 -0700 Subject: [SPARK-14872][SQL] Restructure command package ## What changes were proposed in this pull request? This patch restructures sql.execution.command package to break the commands into multiple files, in some logical organization: databases, tables, views, functions. I also renamed basicOperators.scala to basicLogicalOperators.scala and basicPhysicalOperators.scala. ## How was this patch tested? N/A - all I did was moving code around. Author: Reynold Xin Closes #12636 from rxin/SPARK-14872. --- .../plans/logical/basicLogicalOperators.scala | 709 +++++++++++++++++++++ .../catalyst/plans/logical/basicOperators.scala | 709 --------------------- .../spark/sql/execution/basicOperators.scala | 530 --------------- .../sql/execution/basicPhysicalOperators.scala | 530 +++++++++++++++ .../apache/spark/sql/execution/command/cache.scala | 70 ++ .../spark/sql/execution/command/commands.scala | 264 +------- .../spark/sql/execution/command/databases.scala | 64 ++ .../spark/sql/execution/command/functions.scala | 99 ++- .../spark/sql/execution/command/tables.scala | 77 ++- 9 files changed, 1556 insertions(+), 1496 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala new file mode 100644 index 0000000000..a445ce6947 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -0,0 +1,709 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.types._ + +/** + * When planning take() or collect() operations, this special node that is inserted at the top of + * the logical plan before invoking the query planner. + * + * Rules can pattern-match on this node in order to apply transformations that only take effect + * at the top of the logical query plan. + */ +case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + +case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override def maxRows: Option[Long] = child.maxRows + + override lazy val resolved: Boolean = { + val hasSpecialExpressions = projectList.exists ( _.collect { + case agg: AggregateExpression => agg + case generator: Generator => generator + case window: WindowExpression => window + }.nonEmpty + ) + + !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions + } + + override def validConstraints: Set[Expression] = + child.constraints.union(getAliasedConstraints(projectList)) +} + +/** + * Applies a [[Generator]] to a stream of input rows, combining the + * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional + * programming with one important additional feature, which allows the input rows to be joined with + * their output. + * + * @param generator the generator expression + * @param join when true, each output row is implicitly joined with the input tuple that produced + * it. + * @param outer when true, each input row will be output at least once, even if the output of the + * given `generator` is empty. `outer` has no effect when `join` is false. + * @param qualifier Qualifier for the attributes of generator(UDTF) + * @param generatorOutput The output schema of the Generator. + * @param child Children logical plan node + */ +case class Generate( + generator: Generator, + join: Boolean, + outer: Boolean, + qualifier: Option[String], + generatorOutput: Seq[Attribute], + child: LogicalPlan) + extends UnaryNode { + + /** The set of all attributes produced by this node. */ + def generatedSet: AttributeSet = AttributeSet(generatorOutput) + + override lazy val resolved: Boolean = { + generator.resolved && + childrenResolved && + generator.elementTypes.length == generatorOutput.length && + generatorOutput.forall(_.resolved) + } + + override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) + + def output: Seq[Attribute] = { + val qualified = qualifier.map(q => + // prepend the new qualifier to the existed one + generatorOutput.map(a => a.withQualifier(Some(q))) + ).getOrElse(generatorOutput) + + if (join) child.output ++ qualified else qualified + } +} + +case class Filter(condition: Expression, child: LogicalPlan) + extends UnaryNode with PredicateHelper { + override def output: Seq[Attribute] = child.output + + override def maxRows: Option[Long] = child.maxRows + + override protected def validConstraints: Set[Expression] = + child.constraints.union(splitConjunctivePredicates(condition).toSet) +} + +abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + + protected def leftConstraints: Set[Expression] = left.constraints + + protected def rightConstraints: Set[Expression] = { + require(left.output.size == right.output.size) + val attributeRewrites = AttributeMap(right.output.zip(left.output)) + right.constraints.map(_ transform { + case a: Attribute => attributeRewrites(a) + }) + } +} + +private[sql] object SetOperation { + def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) +} + +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { + + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + + override def output: Seq[Attribute] = + left.output.zip(right.output).map { case (leftAttr, rightAttr) => + leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) + } + + override protected def validConstraints: Set[Expression] = + leftConstraints.union(rightConstraints) + + // Intersect are only resolved if they don't introduce ambiguous expression ids, + // since the Optimizer will convert Intersect to Join. + override lazy val resolved: Boolean = + childrenResolved && + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && + duplicateResolved + + override def maxRows: Option[Long] = { + if (children.exists(_.maxRows.isEmpty)) { + None + } else { + Some(children.flatMap(_.maxRows).min) + } + } + + override def statistics: Statistics = { + val leftSize = left.statistics.sizeInBytes + val rightSize = right.statistics.sizeInBytes + val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize + Statistics(sizeInBytes = sizeInBytes) + } +} + +case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { + /** We don't use right.output because those rows get excluded from the set. */ + override def output: Seq[Attribute] = left.output + + override protected def validConstraints: Set[Expression] = leftConstraints + + override lazy val resolved: Boolean = + childrenResolved && + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } + + override def statistics: Statistics = { + Statistics(sizeInBytes = left.statistics.sizeInBytes) + } +} + +/** Factory for constructing new `Union` nodes. */ +object Union { + def apply(left: LogicalPlan, right: LogicalPlan): Union = { + Union (left :: right :: Nil) + } +} + +case class Union(children: Seq[LogicalPlan]) extends LogicalPlan { + override def maxRows: Option[Long] = { + if (children.exists(_.maxRows.isEmpty)) { + None + } else { + Some(children.flatMap(_.maxRows).sum) + } + } + + // updating nullability to make all the children consistent + override def output: Seq[Attribute] = + children.map(_.output).transpose.map(attrs => + attrs.head.withNullability(attrs.exists(_.nullable))) + + override lazy val resolved: Boolean = { + // allChildrenCompatible needs to be evaluated after childrenResolved + def allChildrenCompatible: Boolean = + children.tail.forall( child => + // compare the attribute number with the first child + child.output.length == children.head.output.length && + // compare the data types with the first child + child.output.zip(children.head.output).forall { + case (l, r) => l.dataType == r.dataType } + ) + + children.length > 1 && childrenResolved && allChildrenCompatible + } + + override def statistics: Statistics = { + val sizeInBytes = children.map(_.statistics.sizeInBytes).sum + Statistics(sizeInBytes = sizeInBytes) + } + + /** + * Maps the constraints containing a given (original) sequence of attributes to those with a + * given (reference) sequence of attributes. Given the nature of union, we expect that the + * mapping between the original and reference sequences are symmetric. + */ + private def rewriteConstraints( + reference: Seq[Attribute], + original: Seq[Attribute], + constraints: Set[Expression]): Set[Expression] = { + require(reference.size == original.size) + val attributeRewrites = AttributeMap(original.zip(reference)) + constraints.map(_ transform { + case a: Attribute => attributeRewrites(a) + }) + } + + private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = { + val common = a.intersect(b) + // The constraint with only one reference could be easily inferred as predicate + // Grouping the constraints by it's references so we can combine the constraints with same + // reference together + val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head) + val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head) + // loose the constraints by: A1 && B1 || A2 && B2 -> (A1 || A2) && (B1 || B2) + val others = (othera.keySet intersect otherb.keySet).map { attr => + Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And)) + } + common ++ others + } + + override protected def validConstraints: Set[Expression] = { + children + .map(child => rewriteConstraints(children.head.output, child.output, child.constraints)) + .reduce(merge(_, _)) + } +} + +case class Join( + left: LogicalPlan, + right: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]) + extends BinaryNode with PredicateHelper { + + override def output: Seq[Attribute] = { + joinType match { + case LeftExistence(_) => + left.output + case LeftOuter => + left.output ++ right.output.map(_.withNullability(true)) + case RightOuter => + left.output.map(_.withNullability(true)) ++ right.output + case FullOuter => + left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) + case _ => + left.output ++ right.output + } + } + + override protected def validConstraints: Set[Expression] = { + joinType match { + case Inner if condition.isDefined => + left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(condition.get).toSet) + case LeftSemi if condition.isDefined => + left.constraints + .union(splitConjunctivePredicates(condition.get).toSet) + case Inner => + left.constraints.union(right.constraints) + case LeftExistence(_) => + left.constraints + case LeftOuter => + left.constraints + case RightOuter => + right.constraints + case FullOuter => + Set.empty[Expression] + } + } + + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + + // Joins are only resolved if they don't introduce ambiguous expression ids. + // NaturalJoin should be ready for resolution only if everything else is resolved here + lazy val resolvedExceptNatural: Boolean = { + childrenResolved && + expressions.forall(_.resolved) && + duplicateResolved && + condition.forall(_.dataType == BooleanType) + } + + // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or + // using join, we still need to eliminate natural or using before we mark it resolved. + override lazy val resolved: Boolean = joinType match { + case NaturalJoin(_) => false + case UsingJoin(_, _) => false + case _ => resolvedExceptNatural + } +} + +/** + * A hint for the optimizer that we should broadcast the `child` if used in a join operator. + */ +case class BroadcastHint(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + + // We manually set statistics of BroadcastHint to smallest value to make sure + // the plan wrapped by BroadcastHint will be considered to broadcast later. + override def statistics: Statistics = Statistics(sizeInBytes = 1) +} + +case class InsertIntoTable( + table: LogicalPlan, + partition: Map[String, Option[String]], + child: LogicalPlan, + overwrite: Boolean, + ifNotExists: Boolean) + extends LogicalPlan { + + override def children: Seq[LogicalPlan] = child :: Nil + override def output: Seq[Attribute] = Seq.empty + + assert(overwrite || !ifNotExists) + override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { + case (childAttr, tableAttr) => + DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) + } +} + +/** + * A container for holding named common table expressions (CTEs) and a query plan. + * This operator will be removed during analysis and the relations will be substituted into child. + * + * @param child The final query of this CTE. + * @param cteRelations Queries that this CTE defined, + * key is the alias of the CTE definition, + * value is the CTE definition. + */ +case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + +case class WithWindowDefinition( + windowDefinitions: Map[String, WindowSpecDefinition], + child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + +/** + * @param order The ordering expressions + * @param global True means global sorting apply for entire data set, + * False means sorting only apply within the partition. + * @param child Child logical plan + */ +case class Sort( + order: Seq[SortOrder], + global: Boolean, + child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = child.maxRows +} + +/** Factory for constructing new `Range` nodes. */ +object Range { + def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { + val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes + new Range(start, end, step, numSlices, output) + } +} + +case class Range( + start: Long, + end: Long, + step: Long, + numSlices: Int, + output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { + require(step != 0, "step cannot be 0") + val numElements: BigInt = { + val safeStart = BigInt(start) + val safeEnd = BigInt(end) + if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) { + (safeEnd - safeStart) / step + } else { + // the remainder has the same sign with range, could add 1 more + (safeEnd - safeStart) / step + 1 + } + } + + override def newInstance(): Range = + Range(start, end, step, numSlices, output.map(_.newInstance())) + + override def statistics: Statistics = { + val sizeInBytes = LongType.defaultSize * numElements + Statistics( sizeInBytes = sizeInBytes ) + } +} + +case class Aggregate( + groupingExpressions: Seq[Expression], + aggregateExpressions: Seq[NamedExpression], + child: LogicalPlan) + extends UnaryNode { + + override lazy val resolved: Boolean = { + val hasWindowExpressions = aggregateExpressions.exists ( _.collect { + case window: WindowExpression => window + }.nonEmpty + ) + + !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions + } + + override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) + override def maxRows: Option[Long] = child.maxRows + + override def validConstraints: Set[Expression] = + child.constraints.union(getAliasedConstraints(aggregateExpressions)) + + override def statistics: Statistics = { + if (groupingExpressions.isEmpty) { + Statistics(sizeInBytes = 1) + } else { + super.statistics + } + } +} + +case class Window( + windowExpressions: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: LogicalPlan) extends UnaryNode { + + override def output: Seq[Attribute] = + child.output ++ windowExpressions.map(_.toAttribute) + + def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute)) +} + +private[sql] object Expand { + /** + * Extract attribute set according to the grouping id. + * + * @param bitmask bitmask to represent the selected of the attribute sequence + * @param attrs the attributes in sequence + * @return the attributes of non selected specified via bitmask (with the bit set to 1) + */ + private def buildNonSelectAttrSet( + bitmask: Int, + attrs: Seq[Attribute]): AttributeSet = { + val nonSelect = new ArrayBuffer[Attribute]() + + var bit = attrs.length - 1 + while (bit >= 0) { + if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1) + bit -= 1 + } + + AttributeSet(nonSelect) + } + + /** + * Apply the all of the GroupExpressions to every input row, hence we will get + * multiple output rows for a input row. + * + * @param bitmasks The bitmask set represents the grouping sets + * @param groupByAliases The aliased original group by expressions + * @param groupByAttrs The attributes of aliased group by expressions + * @param gid Attribute of the grouping id + * @param child Child operator + */ + def apply( + bitmasks: Seq[Int], + groupByAliases: Seq[Alias], + groupByAttrs: Seq[Attribute], + gid: Attribute, + child: LogicalPlan): Expand = { + // Create an array of Projections for the child projection, and replace the projections' + // expressions which equal GroupBy expressions with Literal(null), if those expressions + // are not set for this grouping set (according to the bit mask). + val projections = bitmasks.map { bitmask => + // get the non selected grouping attributes according to the bit mask + val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs) + + child.output ++ groupByAttrs.map { attr => + if (nonSelectedGroupAttrSet.contains(attr)) { + // if the input attribute in the Invalid Grouping Expression set of for this group + // replace it with constant null + Literal.create(null, attr.dataType) + } else { + attr + } + // groupingId is the last output, here we use the bit mask as the concrete value for it. + } :+ Literal.create(bitmask, IntegerType) + } + + // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original + // grouping expression or null, so here we create new instance of it. + val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid + Expand(projections, output, Project(child.output ++ groupByAliases, child)) + } +} + +/** + * Apply a number of projections to every input row, hence we will get multiple output rows for + * a input row. + * + * @param projections to apply + * @param output of all projections. + * @param child operator. + */ +case class Expand( + projections: Seq[Seq[Expression]], + output: Seq[Attribute], + child: LogicalPlan) extends UnaryNode { + override def references: AttributeSet = + AttributeSet(projections.flatten.flatMap(_.references)) + + override def statistics: Statistics = { + val sizeInBytes = super.statistics.sizeInBytes * projections.length + Statistics(sizeInBytes = sizeInBytes) + } + + // This operator can reuse attributes (for example making them null when doing a roll up) so + // the contraints of the child may no longer be valid. + override protected def validConstraints: Set[Expression] = Set.empty[Expression] +} + +/** + * A GROUP BY clause with GROUPING SETS can generate a result set equivalent + * to generated by a UNION ALL of multiple simple GROUP BY clauses. + * + * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer + * + * @param bitmasks A list of bitmasks, each of the bitmask indicates the selected + * GroupBy expressions + * @param groupByExprs The Group By expressions candidates, take effective only if the + * associated bit in the bitmask set to 1. + * @param child Child operator + * @param aggregations The Aggregation expressions, those non selected group by expressions + * will be considered as constant null if it appears in the expressions + */ +case class GroupingSets( + bitmasks: Seq[Int], + groupByExprs: Seq[Expression], + child: LogicalPlan, + aggregations: Seq[NamedExpression]) extends UnaryNode { + + override def output: Seq[Attribute] = aggregations.map(_.toAttribute) + + // Needs to be unresolved before its translated to Aggregate + Expand because output attributes + // will change in analysis. + override lazy val resolved: Boolean = false +} + +case class Pivot( + groupByExprs: Seq[NamedExpression], + pivotColumn: Expression, + pivotValues: Seq[Literal], + aggregates: Seq[Expression], + child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match { + case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)()) + case _ => pivotValues.flatMap{ value => + aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)()) + } + } +} + +object Limit { + def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = { + GlobalLimit(limitExpr, LocalLimit(limitExpr, child)) + } + + def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = { + p match { + case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child)) + case _ => None + } + } +} + +case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = { + limitExpr match { + case IntegerLiteral(limit) => Some(limit) + case _ => None + } + } + override lazy val statistics: Statistics = { + val limit = limitExpr.eval().asInstanceOf[Int] + val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + Statistics(sizeInBytes = sizeInBytes) + } +} + +case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def maxRows: Option[Long] = { + limitExpr match { + case IntegerLiteral(limit) => Some(limit) + case _ => None + } + } + override lazy val statistics: Statistics = { + val limit = limitExpr.eval().asInstanceOf[Int] + val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum + Statistics(sizeInBytes = sizeInBytes) + } +} + +case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode { + + override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) +} + +/** + * Sample the dataset. + * + * @param lowerBound Lower-bound of the sampling probability (usually 0.0) + * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled + * will be ub - lb. + * @param withReplacement Whether to sample with replacement. + * @param seed the random seed + * @param child the LogicalPlan + * @param isTableSample Is created from TABLESAMPLE in the parser. + */ +case class Sample( + lowerBound: Double, + upperBound: Double, + withReplacement: Boolean, + seed: Long, + child: LogicalPlan)( + val isTableSample: java.lang.Boolean = false) extends UnaryNode { + + override def output: Seq[Attribute] = child.output + + override def statistics: Statistics = { + val ratio = upperBound - lowerBound + // BigInt can't multiply with Double + var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100 + if (sizeInBytes == 0) { + sizeInBytes = 1 + } + Statistics(sizeInBytes = sizeInBytes) + } + + override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil +} + +/** + * Returns a new logical plan that dedups input rows. + */ +case class Distinct(child: LogicalPlan) extends UnaryNode { + override def maxRows: Option[Long] = child.maxRows + override def output: Seq[Attribute] = child.output +} + +/** + * Returns a new RDD that has exactly `numPartitions` partitions. Differs from + * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user + * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer + * of the output requires some specific ordering or distribution of the data. + */ +case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) + extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + +/** + * A relation with one row. This is used in "SELECT ..." without a from clause. + */ +case object OneRowRelation extends LeafNode { + override def maxRows: Option[Long] = Some(1) + override def output: Seq[Attribute] = Nil + + /** + * Computes [[Statistics]] for this plan. The default implementation assumes the output + * cardinality is the product of of all child plan's cardinality, i.e. applies in the case + * of cartesian joins. + * + * [[LeafNode]]s must override this. + */ + override def statistics: Statistics = Statistics(sizeInBytes = 1) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala deleted file mode 100644 index a445ce6947..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ /dev/null @@ -1,709 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.types._ - -/** - * When planning take() or collect() operations, this special node that is inserted at the top of - * the logical plan before invoking the query planner. - * - * Rules can pattern-match on this node in order to apply transformations that only take effect - * at the top of the logical query plan. - */ -case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = projectList.map(_.toAttribute) - override def maxRows: Option[Long] = child.maxRows - - override lazy val resolved: Boolean = { - val hasSpecialExpressions = projectList.exists ( _.collect { - case agg: AggregateExpression => agg - case generator: Generator => generator - case window: WindowExpression => window - }.nonEmpty - ) - - !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions - } - - override def validConstraints: Set[Expression] = - child.constraints.union(getAliasedConstraints(projectList)) -} - -/** - * Applies a [[Generator]] to a stream of input rows, combining the - * output of each into a new stream of rows. This operation is similar to a `flatMap` in functional - * programming with one important additional feature, which allows the input rows to be joined with - * their output. - * - * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. - * @param outer when true, each input row will be output at least once, even if the output of the - * given `generator` is empty. `outer` has no effect when `join` is false. - * @param qualifier Qualifier for the attributes of generator(UDTF) - * @param generatorOutput The output schema of the Generator. - * @param child Children logical plan node - */ -case class Generate( - generator: Generator, - join: Boolean, - outer: Boolean, - qualifier: Option[String], - generatorOutput: Seq[Attribute], - child: LogicalPlan) - extends UnaryNode { - - /** The set of all attributes produced by this node. */ - def generatedSet: AttributeSet = AttributeSet(generatorOutput) - - override lazy val resolved: Boolean = { - generator.resolved && - childrenResolved && - generator.elementTypes.length == generatorOutput.length && - generatorOutput.forall(_.resolved) - } - - override def producedAttributes: AttributeSet = AttributeSet(generatorOutput) - - def output: Seq[Attribute] = { - val qualified = qualifier.map(q => - // prepend the new qualifier to the existed one - generatorOutput.map(a => a.withQualifier(Some(q))) - ).getOrElse(generatorOutput) - - if (join) child.output ++ qualified else qualified - } -} - -case class Filter(condition: Expression, child: LogicalPlan) - extends UnaryNode with PredicateHelper { - override def output: Seq[Attribute] = child.output - - override def maxRows: Option[Long] = child.maxRows - - override protected def validConstraints: Set[Expression] = - child.constraints.union(splitConjunctivePredicates(condition).toSet) -} - -abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - - protected def leftConstraints: Set[Expression] = left.constraints - - protected def rightConstraints: Set[Expression] = { - require(left.output.size == right.output.size) - val attributeRewrites = AttributeMap(right.output.zip(left.output)) - right.constraints.map(_ transform { - case a: Attribute => attributeRewrites(a) - }) - } -} - -private[sql] object SetOperation { - def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) -} - -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - - def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - - override def output: Seq[Attribute] = - left.output.zip(right.output).map { case (leftAttr, rightAttr) => - leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) - } - - override protected def validConstraints: Set[Expression] = - leftConstraints.union(rightConstraints) - - // Intersect are only resolved if they don't introduce ambiguous expression ids, - // since the Optimizer will convert Intersect to Join. - override lazy val resolved: Boolean = - childrenResolved && - left.output.length == right.output.length && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && - duplicateResolved - - override def maxRows: Option[Long] = { - if (children.exists(_.maxRows.isEmpty)) { - None - } else { - Some(children.flatMap(_.maxRows).min) - } - } - - override def statistics: Statistics = { - val leftSize = left.statistics.sizeInBytes - val rightSize = right.statistics.sizeInBytes - val sizeInBytes = if (leftSize < rightSize) leftSize else rightSize - Statistics(sizeInBytes = sizeInBytes) - } -} - -case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - /** We don't use right.output because those rows get excluded from the set. */ - override def output: Seq[Attribute] = left.output - - override protected def validConstraints: Set[Expression] = leftConstraints - - override lazy val resolved: Boolean = - childrenResolved && - left.output.length == right.output.length && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } - - override def statistics: Statistics = { - Statistics(sizeInBytes = left.statistics.sizeInBytes) - } -} - -/** Factory for constructing new `Union` nodes. */ -object Union { - def apply(left: LogicalPlan, right: LogicalPlan): Union = { - Union (left :: right :: Nil) - } -} - -case class Union(children: Seq[LogicalPlan]) extends LogicalPlan { - override def maxRows: Option[Long] = { - if (children.exists(_.maxRows.isEmpty)) { - None - } else { - Some(children.flatMap(_.maxRows).sum) - } - } - - // updating nullability to make all the children consistent - override def output: Seq[Attribute] = - children.map(_.output).transpose.map(attrs => - attrs.head.withNullability(attrs.exists(_.nullable))) - - override lazy val resolved: Boolean = { - // allChildrenCompatible needs to be evaluated after childrenResolved - def allChildrenCompatible: Boolean = - children.tail.forall( child => - // compare the attribute number with the first child - child.output.length == children.head.output.length && - // compare the data types with the first child - child.output.zip(children.head.output).forall { - case (l, r) => l.dataType == r.dataType } - ) - - children.length > 1 && childrenResolved && allChildrenCompatible - } - - override def statistics: Statistics = { - val sizeInBytes = children.map(_.statistics.sizeInBytes).sum - Statistics(sizeInBytes = sizeInBytes) - } - - /** - * Maps the constraints containing a given (original) sequence of attributes to those with a - * given (reference) sequence of attributes. Given the nature of union, we expect that the - * mapping between the original and reference sequences are symmetric. - */ - private def rewriteConstraints( - reference: Seq[Attribute], - original: Seq[Attribute], - constraints: Set[Expression]): Set[Expression] = { - require(reference.size == original.size) - val attributeRewrites = AttributeMap(original.zip(reference)) - constraints.map(_ transform { - case a: Attribute => attributeRewrites(a) - }) - } - - private def merge(a: Set[Expression], b: Set[Expression]): Set[Expression] = { - val common = a.intersect(b) - // The constraint with only one reference could be easily inferred as predicate - // Grouping the constraints by it's references so we can combine the constraints with same - // reference together - val othera = a.diff(common).filter(_.references.size == 1).groupBy(_.references.head) - val otherb = b.diff(common).filter(_.references.size == 1).groupBy(_.references.head) - // loose the constraints by: A1 && B1 || A2 && B2 -> (A1 || A2) && (B1 || B2) - val others = (othera.keySet intersect otherb.keySet).map { attr => - Or(othera(attr).reduceLeft(And), otherb(attr).reduceLeft(And)) - } - common ++ others - } - - override protected def validConstraints: Set[Expression] = { - children - .map(child => rewriteConstraints(children.head.output, child.output, child.constraints)) - .reduce(merge(_, _)) - } -} - -case class Join( - left: LogicalPlan, - right: LogicalPlan, - joinType: JoinType, - condition: Option[Expression]) - extends BinaryNode with PredicateHelper { - - override def output: Seq[Attribute] = { - joinType match { - case LeftExistence(_) => - left.output - case LeftOuter => - left.output ++ right.output.map(_.withNullability(true)) - case RightOuter => - left.output.map(_.withNullability(true)) ++ right.output - case FullOuter => - left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true)) - case _ => - left.output ++ right.output - } - } - - override protected def validConstraints: Set[Expression] = { - joinType match { - case Inner if condition.isDefined => - left.constraints - .union(right.constraints) - .union(splitConjunctivePredicates(condition.get).toSet) - case LeftSemi if condition.isDefined => - left.constraints - .union(splitConjunctivePredicates(condition.get).toSet) - case Inner => - left.constraints.union(right.constraints) - case LeftExistence(_) => - left.constraints - case LeftOuter => - left.constraints - case RightOuter => - right.constraints - case FullOuter => - Set.empty[Expression] - } - } - - def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - - // Joins are only resolved if they don't introduce ambiguous expression ids. - // NaturalJoin should be ready for resolution only if everything else is resolved here - lazy val resolvedExceptNatural: Boolean = { - childrenResolved && - expressions.forall(_.resolved) && - duplicateResolved && - condition.forall(_.dataType == BooleanType) - } - - // if not a natural join, use `resolvedExceptNatural`. if it is a natural join or - // using join, we still need to eliminate natural or using before we mark it resolved. - override lazy val resolved: Boolean = joinType match { - case NaturalJoin(_) => false - case UsingJoin(_, _) => false - case _ => resolvedExceptNatural - } -} - -/** - * A hint for the optimizer that we should broadcast the `child` if used in a join operator. - */ -case class BroadcastHint(child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - - // We manually set statistics of BroadcastHint to smallest value to make sure - // the plan wrapped by BroadcastHint will be considered to broadcast later. - override def statistics: Statistics = Statistics(sizeInBytes = 1) -} - -case class InsertIntoTable( - table: LogicalPlan, - partition: Map[String, Option[String]], - child: LogicalPlan, - overwrite: Boolean, - ifNotExists: Boolean) - extends LogicalPlan { - - override def children: Seq[LogicalPlan] = child :: Nil - override def output: Seq[Attribute] = Seq.empty - - assert(overwrite || !ifNotExists) - override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { - case (childAttr, tableAttr) => - DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType) - } -} - -/** - * A container for holding named common table expressions (CTEs) and a query plan. - * This operator will be removed during analysis and the relations will be substituted into child. - * - * @param child The final query of this CTE. - * @param cteRelations Queries that this CTE defined, - * key is the alias of the CTE definition, - * value is the CTE definition. - */ -case class With(child: LogicalPlan, cteRelations: Map[String, SubqueryAlias]) extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -case class WithWindowDefinition( - windowDefinitions: Map[String, WindowSpecDefinition], - child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -/** - * @param order The ordering expressions - * @param global True means global sorting apply for entire data set, - * False means sorting only apply within the partition. - * @param child Child logical plan - */ -case class Sort( - order: Seq[SortOrder], - global: Boolean, - child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - override def maxRows: Option[Long] = child.maxRows -} - -/** Factory for constructing new `Range` nodes. */ -object Range { - def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { - val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes - new Range(start, end, step, numSlices, output) - } -} - -case class Range( - start: Long, - end: Long, - step: Long, - numSlices: Int, - output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { - require(step != 0, "step cannot be 0") - val numElements: BigInt = { - val safeStart = BigInt(start) - val safeEnd = BigInt(end) - if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) { - (safeEnd - safeStart) / step - } else { - // the remainder has the same sign with range, could add 1 more - (safeEnd - safeStart) / step + 1 - } - } - - override def newInstance(): Range = - Range(start, end, step, numSlices, output.map(_.newInstance())) - - override def statistics: Statistics = { - val sizeInBytes = LongType.defaultSize * numElements - Statistics( sizeInBytes = sizeInBytes ) - } -} - -case class Aggregate( - groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[NamedExpression], - child: LogicalPlan) - extends UnaryNode { - - override lazy val resolved: Boolean = { - val hasWindowExpressions = aggregateExpressions.exists ( _.collect { - case window: WindowExpression => window - }.nonEmpty - ) - - !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions - } - - override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - override def maxRows: Option[Long] = child.maxRows - - override def validConstraints: Set[Expression] = - child.constraints.union(getAliasedConstraints(aggregateExpressions)) - - override def statistics: Statistics = { - if (groupingExpressions.isEmpty) { - Statistics(sizeInBytes = 1) - } else { - super.statistics - } - } -} - -case class Window( - windowExpressions: Seq[NamedExpression], - partitionSpec: Seq[Expression], - orderSpec: Seq[SortOrder], - child: LogicalPlan) extends UnaryNode { - - override def output: Seq[Attribute] = - child.output ++ windowExpressions.map(_.toAttribute) - - def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute)) -} - -private[sql] object Expand { - /** - * Extract attribute set according to the grouping id. - * - * @param bitmask bitmask to represent the selected of the attribute sequence - * @param attrs the attributes in sequence - * @return the attributes of non selected specified via bitmask (with the bit set to 1) - */ - private def buildNonSelectAttrSet( - bitmask: Int, - attrs: Seq[Attribute]): AttributeSet = { - val nonSelect = new ArrayBuffer[Attribute]() - - var bit = attrs.length - 1 - while (bit >= 0) { - if (((bitmask >> bit) & 1) == 1) nonSelect += attrs(attrs.length - bit - 1) - bit -= 1 - } - - AttributeSet(nonSelect) - } - - /** - * Apply the all of the GroupExpressions to every input row, hence we will get - * multiple output rows for a input row. - * - * @param bitmasks The bitmask set represents the grouping sets - * @param groupByAliases The aliased original group by expressions - * @param groupByAttrs The attributes of aliased group by expressions - * @param gid Attribute of the grouping id - * @param child Child operator - */ - def apply( - bitmasks: Seq[Int], - groupByAliases: Seq[Alias], - groupByAttrs: Seq[Attribute], - gid: Attribute, - child: LogicalPlan): Expand = { - // Create an array of Projections for the child projection, and replace the projections' - // expressions which equal GroupBy expressions with Literal(null), if those expressions - // are not set for this grouping set (according to the bit mask). - val projections = bitmasks.map { bitmask => - // get the non selected grouping attributes according to the bit mask - val nonSelectedGroupAttrSet = buildNonSelectAttrSet(bitmask, groupByAttrs) - - child.output ++ groupByAttrs.map { attr => - if (nonSelectedGroupAttrSet.contains(attr)) { - // if the input attribute in the Invalid Grouping Expression set of for this group - // replace it with constant null - Literal.create(null, attr.dataType) - } else { - attr - } - // groupingId is the last output, here we use the bit mask as the concrete value for it. - } :+ Literal.create(bitmask, IntegerType) - } - - // the `groupByAttrs` has different meaning in `Expand.output`, it could be the original - // grouping expression or null, so here we create new instance of it. - val output = child.output ++ groupByAttrs.map(_.newInstance) :+ gid - Expand(projections, output, Project(child.output ++ groupByAliases, child)) - } -} - -/** - * Apply a number of projections to every input row, hence we will get multiple output rows for - * a input row. - * - * @param projections to apply - * @param output of all projections. - * @param child operator. - */ -case class Expand( - projections: Seq[Seq[Expression]], - output: Seq[Attribute], - child: LogicalPlan) extends UnaryNode { - override def references: AttributeSet = - AttributeSet(projections.flatten.flatMap(_.references)) - - override def statistics: Statistics = { - val sizeInBytes = super.statistics.sizeInBytes * projections.length - Statistics(sizeInBytes = sizeInBytes) - } - - // This operator can reuse attributes (for example making them null when doing a roll up) so - // the contraints of the child may no longer be valid. - override protected def validConstraints: Set[Expression] = Set.empty[Expression] -} - -/** - * A GROUP BY clause with GROUPING SETS can generate a result set equivalent - * to generated by a UNION ALL of multiple simple GROUP BY clauses. - * - * We will transform GROUPING SETS into logical plan Aggregate(.., Expand) in Analyzer - * - * @param bitmasks A list of bitmasks, each of the bitmask indicates the selected - * GroupBy expressions - * @param groupByExprs The Group By expressions candidates, take effective only if the - * associated bit in the bitmask set to 1. - * @param child Child operator - * @param aggregations The Aggregation expressions, those non selected group by expressions - * will be considered as constant null if it appears in the expressions - */ -case class GroupingSets( - bitmasks: Seq[Int], - groupByExprs: Seq[Expression], - child: LogicalPlan, - aggregations: Seq[NamedExpression]) extends UnaryNode { - - override def output: Seq[Attribute] = aggregations.map(_.toAttribute) - - // Needs to be unresolved before its translated to Aggregate + Expand because output attributes - // will change in analysis. - override lazy val resolved: Boolean = false -} - -case class Pivot( - groupByExprs: Seq[NamedExpression], - pivotColumn: Expression, - pivotValues: Seq[Literal], - aggregates: Seq[Expression], - child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = groupByExprs.map(_.toAttribute) ++ aggregates match { - case agg :: Nil => pivotValues.map(value => AttributeReference(value.toString, agg.dataType)()) - case _ => pivotValues.flatMap{ value => - aggregates.map(agg => AttributeReference(value + "_" + agg.sql, agg.dataType)()) - } - } -} - -object Limit { - def apply(limitExpr: Expression, child: LogicalPlan): UnaryNode = { - GlobalLimit(limitExpr, LocalLimit(limitExpr, child)) - } - - def unapply(p: GlobalLimit): Option[(Expression, LogicalPlan)] = { - p match { - case GlobalLimit(le1, LocalLimit(le2, child)) if le1 == le2 => Some((le1, child)) - case _ => None - } - } -} - -case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - override def maxRows: Option[Long] = { - limitExpr match { - case IntegerLiteral(limit) => Some(limit) - case _ => None - } - } - override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Int] - val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum - Statistics(sizeInBytes = sizeInBytes) - } -} - -case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - override def maxRows: Option[Long] = { - limitExpr match { - case IntegerLiteral(limit) => Some(limit) - case _ => None - } - } - override lazy val statistics: Statistics = { - val limit = limitExpr.eval().asInstanceOf[Int] - val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum - Statistics(sizeInBytes = sizeInBytes) - } -} - -case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode { - - override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias))) -} - -/** - * Sample the dataset. - * - * @param lowerBound Lower-bound of the sampling probability (usually 0.0) - * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled - * will be ub - lb. - * @param withReplacement Whether to sample with replacement. - * @param seed the random seed - * @param child the LogicalPlan - * @param isTableSample Is created from TABLESAMPLE in the parser. - */ -case class Sample( - lowerBound: Double, - upperBound: Double, - withReplacement: Boolean, - seed: Long, - child: LogicalPlan)( - val isTableSample: java.lang.Boolean = false) extends UnaryNode { - - override def output: Seq[Attribute] = child.output - - override def statistics: Statistics = { - val ratio = upperBound - lowerBound - // BigInt can't multiply with Double - var sizeInBytes = child.statistics.sizeInBytes * (ratio * 100).toInt / 100 - if (sizeInBytes == 0) { - sizeInBytes = 1 - } - Statistics(sizeInBytes = sizeInBytes) - } - - override protected def otherCopyArgs: Seq[AnyRef] = isTableSample :: Nil -} - -/** - * Returns a new logical plan that dedups input rows. - */ -case class Distinct(child: LogicalPlan) extends UnaryNode { - override def maxRows: Option[Long] = child.maxRows - override def output: Seq[Attribute] = child.output -} - -/** - * Returns a new RDD that has exactly `numPartitions` partitions. Differs from - * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user - * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer - * of the output requires some specific ordering or distribution of the data. - */ -case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) - extends UnaryNode { - override def output: Seq[Attribute] = child.output -} - -/** - * A relation with one row. This is used in "SELECT ..." without a from clause. - */ -case object OneRowRelation extends LeafNode { - override def maxRows: Option[Long] = Some(1) - override def output: Seq[Attribute] = Nil - - /** - * Computes [[Statistics]] for this plan. The default implementation assumes the output - * cardinality is the product of of all child plan's cardinality, i.e. applies in the case - * of cartesian joins. - * - * [[LeafNode]]s must override this. - */ - override def statistics: Statistics = Statistics(sizeInBytes = 1) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala deleted file mode 100644 index 83f527f555..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ /dev/null @@ -1,530 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.LongType -import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} - -/** Physical plan for Project. */ -case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport { - - override def output: Seq[Attribute] = projectList.map(_.toAttribute) - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() - } - - protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def usedInputs: AttributeSet = { - // only the attributes those are used at least twice should be evaluated before this plan, - // otherwise we could defer the evaluation until output attribute is actually used. - val usedExprIds = projectList.flatMap(_.collect { - case a: Attribute => a.exprId - }) - val usedMoreThanOnce = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet - references.filter(a => usedMoreThanOnce.contains(a.exprId)) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val exprs = projectList.map(x => - ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) - ctx.currentVars = input - val resultVars = exprs.map(_.genCode(ctx)) - // Evaluation of non-deterministic expressions can't be deferred. - val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute) - s""" - |${evaluateRequiredVariables(output, resultVars, AttributeSet(nonDeterministicAttrs))} - |${consume(ctx, resultVars)} - """.stripMargin - } - - protected override def doExecute(): RDD[InternalRow] = { - child.execute().mapPartitionsInternal { iter => - val project = UnsafeProjection.create(projectList, child.output, - subexpressionEliminationEnabled) - iter.map(project) - } - } - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering -} - - -/** Physical plan for Filter. */ -case class FilterExec(condition: Expression, child: SparkPlan) - extends UnaryExecNode with CodegenSupport with PredicateHelper { - - // Split out all the IsNotNulls from condition. - private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { - case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true - case _ => false - } - - // The columns that will filtered out by `IsNotNull` could be considered as not nullable. - private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) - - // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate - // all the variables at the beginning to take advantage of short circuiting. - override def usedInputs: AttributeSet = AttributeSet.empty - - override def output: Seq[Attribute] = { - child.output.map { a => - if (a.nullable && notNullAttributes.contains(a.exprId)) { - a.withNullability(false) - } else { - a - } - } - } - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() - } - - protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val numOutput = metricTerm(ctx, "numOutputRows") - - /** - * Generates code for `c`, using `in` for input attributes and `attrs` for nullability. - */ - def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = { - val bound = BindReferences.bindReference(c, attrs) - val evaluated = evaluateRequiredVariables(child.output, in, c.references) - - // Generate the code for the predicate. - val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx) - val nullCheck = if (bound.nullable) { - s"${ev.isNull} || " - } else { - s"" - } - - s""" - |$evaluated - |${ev.code} - |if (${nullCheck}!${ev.value}) continue; - """.stripMargin - } - - ctx.currentVars = input - - // To generate the predicates we will follow this algorithm. - // For each predicate that is not IsNotNull, we will generate them one by one loading attributes - // as necessary. For each of both attributes, if there is a IsNotNull predicate we will generate - // that check *before* the predicate. After all of these predicates, we will generate the - // remaining IsNotNull checks that were not part of other predicates. - // This has the property of not doing redundant IsNotNull checks and taking better advantage of - // short-circuiting, not loading attributes until they are needed. - // This is very perf sensitive. - // TODO: revisit this. We can consider reordering predicates as well. - val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) - val generated = otherPreds.map { c => - val nullChecks = c.references.map { r => - val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} - if (idx != -1 && !generatedIsNotNullChecks(idx)) { - generatedIsNotNullChecks(idx) = true - // Use the child's output. The nullability is what the child produced. - genPredicate(notNullPreds(idx), input, child.output) - } else { - "" - } - }.mkString("\n").trim - - // Here we use *this* operator's output with this output's nullability since we already - // enforced them with the IsNotNull checks above. - s""" - |$nullChecks - |${genPredicate(c, input, output)} - """.stripMargin.trim - }.mkString("\n") - - val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) => - if (!generatedIsNotNullChecks(idx)) { - genPredicate(c, input, child.output) - } else { - "" - } - }.mkString("\n") - - // Reset the isNull to false for the not-null columns, then the followed operators could - // generate better code (remove dead branches). - val resultVars = input.zipWithIndex.map { case (ev, i) => - if (notNullAttributes.contains(child.output(i).exprId)) { - ev.isNull = "false" - } - ev - } - - s""" - |$generated - |$nullChecks - |$numOutput.add(1); - |${consume(ctx, resultVars)} - """.stripMargin - } - - protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - child.execute().mapPartitionsInternal { iter => - val predicate = newPredicate(condition, child.output) - iter.filter { row => - val r = predicate(row) - if (r) numOutputRows += 1 - r - } - } - } - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering -} - -/** - * Physical plan for sampling the dataset. - * - * @param lowerBound Lower-bound of the sampling probability (usually 0.0) - * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled - * will be ub - lb. - * @param withReplacement Whether to sample with replacement. - * @param seed the random seed - * @param child the SparkPlan - */ -case class SampleExec( - lowerBound: Double, - upperBound: Double, - withReplacement: Boolean, - seed: Long, - child: SparkPlan) extends UnaryExecNode with CodegenSupport { - override def output: Seq[Attribute] = child.output - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - protected override def doExecute(): RDD[InternalRow] = { - if (withReplacement) { - // Disable gap sampling since the gap sampling method buffers two rows internally, - // requiring us to copy the row, which is more expensive than the random number generator. - new PartitionwiseSampledRDD[InternalRow, InternalRow]( - child.execute(), - new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false), - preservesPartitioning = true, - seed) - } else { - child.execute().randomSampleWithRange(lowerBound, upperBound, seed) - } - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].inputRDDs() - } - - protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val numOutput = metricTerm(ctx, "numOutputRows") - val sampler = ctx.freshName("sampler") - - if (withReplacement) { - val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName - val initSampler = ctx.freshName("initSampler") - ctx.addMutableState(s"$samplerClass", sampler, - s"$initSampler();") - - ctx.addNewFunction(initSampler, - s""" - | private void $initSampler() { - | $sampler = new $samplerClass($upperBound - $lowerBound, false); - | java.util.Random random = new java.util.Random(${seed}L); - | long randomSeed = random.nextLong(); - | int loopCount = 0; - | while (loopCount < partitionIndex) { - | randomSeed = random.nextLong(); - | loopCount += 1; - | } - | $sampler.setSeed(randomSeed); - | } - """.stripMargin.trim) - - val samplingCount = ctx.freshName("samplingCount") - s""" - | int $samplingCount = $sampler.sample(); - | while ($samplingCount-- > 0) { - | $numOutput.add(1); - | ${consume(ctx, input)} - | } - """.stripMargin.trim - } else { - val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName - ctx.addMutableState(s"$samplerClass", sampler, - s""" - | $sampler = new $samplerClass($lowerBound, $upperBound, false); - | $sampler.setSeed(${seed}L + partitionIndex); - """.stripMargin.trim) - - s""" - | if ($sampler.sample() == 0) continue; - | $numOutput.add(1); - | ${consume(ctx, input)} - """.stripMargin.trim - } - } -} - - -/** - * Physical plan for range (generating a range of 64 bit numbers. - * - * @param start first number in the range, inclusive. - * @param step size of the step increment. - * @param numSlices number of partitions. - * @param numElements total number of elements to output. - * @param output output attributes. - */ -case class RangeExec( - start: Long, - step: Long, - numSlices: Int, - numElements: BigInt, - output: Seq[Attribute]) - extends LeafExecNode with CodegenSupport { - - private[sql] override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - // output attributes should not affect the results - override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements) - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - sqlContext.sparkContext.parallelize(0 until numSlices, numSlices) - .map(i => InternalRow(i)) :: Nil - } - - protected override def doProduce(ctx: CodegenContext): String = { - val numOutput = metricTerm(ctx, "numOutputRows") - - val initTerm = ctx.freshName("initRange") - ctx.addMutableState("boolean", initTerm, s"$initTerm = false;") - val partitionEnd = ctx.freshName("partitionEnd") - ctx.addMutableState("long", partitionEnd, s"$partitionEnd = 0L;") - val number = ctx.freshName("number") - ctx.addMutableState("long", number, s"$number = 0L;") - val overflow = ctx.freshName("overflow") - ctx.addMutableState("boolean", overflow, s"$overflow = false;") - - val value = ctx.freshName("value") - val ev = ExprCode("", "false", value) - val BigInt = classOf[java.math.BigInteger].getName - val checkEnd = if (step > 0) { - s"$number < $partitionEnd" - } else { - s"$number > $partitionEnd" - } - - ctx.addNewFunction("initRange", - s""" - | private void initRange(int idx) { - | $BigInt index = $BigInt.valueOf(idx); - | $BigInt numSlice = $BigInt.valueOf(${numSlices}L); - | $BigInt numElement = $BigInt.valueOf(${numElements.toLong}L); - | $BigInt step = $BigInt.valueOf(${step}L); - | $BigInt start = $BigInt.valueOf(${start}L); - | - | $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); - | if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { - | $number = Long.MAX_VALUE; - | } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { - | $number = Long.MIN_VALUE; - | } else { - | $number = st.longValue(); - | } - | - | $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice) - | .multiply(step).add(start); - | if (end.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { - | $partitionEnd = Long.MAX_VALUE; - | } else if (end.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { - | $partitionEnd = Long.MIN_VALUE; - | } else { - | $partitionEnd = end.longValue(); - | } - | - | $numOutput.add(($partitionEnd - $number) / ${step}L); - | } - """.stripMargin) - - val input = ctx.freshName("input") - // Right now, Range is only used when there is one upstream. - ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") - s""" - | // initialize Range - | if (!$initTerm) { - | $initTerm = true; - | initRange(partitionIndex); - | } - | - | while (!$overflow && $checkEnd) { - | long $value = $number; - | $number += ${step}L; - | if ($number < $value ^ ${step}L < 0) { - | $overflow = true; - | } - | ${consume(ctx, Seq(ev))} - | if (shouldStop()) return; - | } - """.stripMargin - } - - protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - sqlContext - .sparkContext - .parallelize(0 until numSlices, numSlices) - .mapPartitionsWithIndex { (i, _) => - val partitionStart = (i * numElements) / numSlices * step + start - val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start - def getSafeMargin(bi: BigInt): Long = - if (bi.isValidLong) { - bi.toLong - } else if (bi > 0) { - Long.MaxValue - } else { - Long.MinValue - } - val safePartitionStart = getSafeMargin(partitionStart) - val safePartitionEnd = getSafeMargin(partitionEnd) - val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize - val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1) - - new Iterator[InternalRow] { - private[this] var number: Long = safePartitionStart - private[this] var overflow: Boolean = false - - override def hasNext = - if (!overflow) { - if (step > 0) { - number < safePartitionEnd - } else { - number > safePartitionEnd - } - } else false - - override def next() = { - val ret = number - number += step - if (number < ret ^ step < 0) { - // we have Long.MaxValue + Long.MaxValue < Long.MaxValue - // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step - // back, we are pretty sure that we have an overflow. - overflow = true - } - - numOutputRows += 1 - unsafeRow.setLong(0, ret) - unsafeRow - } - } - } - } -} - -/** - * Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL. - */ -case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { - override def output: Seq[Attribute] = - children.map(_.output).transpose.map(attrs => - attrs.head.withNullability(attrs.exists(_.nullable))) - - protected override def doExecute(): RDD[InternalRow] = - sparkContext.union(children.map(_.execute())) -} - -/** - * Physical plan for returning a new RDD that has exactly `numPartitions` partitions. - * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. - * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of - * the 100 new partitions will claim 10 of the current partitions. - */ -case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { - override def output: Seq[Attribute] = child.output - - override def outputPartitioning: Partitioning = { - if (numPartitions == 1) SinglePartition - else UnknownPartitioning(numPartitions) - } - - protected override def doExecute(): RDD[InternalRow] = { - child.execute().coalesce(numPartitions, shuffle = false) - } -} - -/** - * Physical plan for returning a table with the elements from left that are not in right using - * the built-in spark subtract function. - */ -case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode { - override def output: Seq[Attribute] = left.output - - protected override def doExecute(): RDD[InternalRow] = { - left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) - } -} - -/** - * A plan node that does nothing but lie about the output of its child. Used to spice a - * (hopefully structurally equivalent) tree from a different optimization sequence into an already - * resolved tree. - */ -case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { - def children: Seq[SparkPlan] = child :: Nil - - protected override def doExecute(): RDD[InternalRow] = child.execute() -} - -/** - * Physical plan for a subquery. - * - * This is used to generate tree string for SparkScalarSubquery. - */ -case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { - override def output: Seq[Attribute] = child.output - override def outputPartitioning: Partitioning = child.outputPartitioning - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala new file mode 100644 index 0000000000..83f527f555 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.types.LongType +import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} + +/** Physical plan for Project. */ +case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) + extends UnaryExecNode with CodegenSupport { + + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def usedInputs: AttributeSet = { + // only the attributes those are used at least twice should be evaluated before this plan, + // otherwise we could defer the evaluation until output attribute is actually used. + val usedExprIds = projectList.flatMap(_.collect { + case a: Attribute => a.exprId + }) + val usedMoreThanOnce = usedExprIds.groupBy(id => id).filter(_._2.size > 1).keySet + references.filter(a => usedMoreThanOnce.contains(a.exprId)) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val exprs = projectList.map(x => + ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) + ctx.currentVars = input + val resultVars = exprs.map(_.genCode(ctx)) + // Evaluation of non-deterministic expressions can't be deferred. + val nonDeterministicAttrs = projectList.filterNot(_.deterministic).map(_.toAttribute) + s""" + |${evaluateRequiredVariables(output, resultVars, AttributeSet(nonDeterministicAttrs))} + |${consume(ctx, resultVars)} + """.stripMargin + } + + protected override def doExecute(): RDD[InternalRow] = { + child.execute().mapPartitionsInternal { iter => + val project = UnsafeProjection.create(projectList, child.output, + subexpressionEliminationEnabled) + iter.map(project) + } + } + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering +} + + +/** Physical plan for Filter. */ +case class FilterExec(condition: Expression, child: SparkPlan) + extends UnaryExecNode with CodegenSupport with PredicateHelper { + + // Split out all the IsNotNulls from condition. + private val (notNullPreds, otherPreds) = splitConjunctivePredicates(condition).partition { + case IsNotNull(a: NullIntolerant) if a.references.subsetOf(child.outputSet) => true + case _ => false + } + + // The columns that will filtered out by `IsNotNull` could be considered as not nullable. + private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) + + // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate + // all the variables at the beginning to take advantage of short circuiting. + override def usedInputs: AttributeSet = AttributeSet.empty + + override def output: Seq[Attribute] = { + child.output.map { a => + if (a.nullable && notNullAttributes.contains(a.exprId)) { + a.withNullability(false) + } else { + a + } + } + } + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val numOutput = metricTerm(ctx, "numOutputRows") + + /** + * Generates code for `c`, using `in` for input attributes and `attrs` for nullability. + */ + def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = { + val bound = BindReferences.bindReference(c, attrs) + val evaluated = evaluateRequiredVariables(child.output, in, c.references) + + // Generate the code for the predicate. + val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx) + val nullCheck = if (bound.nullable) { + s"${ev.isNull} || " + } else { + s"" + } + + s""" + |$evaluated + |${ev.code} + |if (${nullCheck}!${ev.value}) continue; + """.stripMargin + } + + ctx.currentVars = input + + // To generate the predicates we will follow this algorithm. + // For each predicate that is not IsNotNull, we will generate them one by one loading attributes + // as necessary. For each of both attributes, if there is a IsNotNull predicate we will generate + // that check *before* the predicate. After all of these predicates, we will generate the + // remaining IsNotNull checks that were not part of other predicates. + // This has the property of not doing redundant IsNotNull checks and taking better advantage of + // short-circuiting, not loading attributes until they are needed. + // This is very perf sensitive. + // TODO: revisit this. We can consider reordering predicates as well. + val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) + val generated = otherPreds.map { c => + val nullChecks = c.references.map { r => + val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} + if (idx != -1 && !generatedIsNotNullChecks(idx)) { + generatedIsNotNullChecks(idx) = true + // Use the child's output. The nullability is what the child produced. + genPredicate(notNullPreds(idx), input, child.output) + } else { + "" + } + }.mkString("\n").trim + + // Here we use *this* operator's output with this output's nullability since we already + // enforced them with the IsNotNull checks above. + s""" + |$nullChecks + |${genPredicate(c, input, output)} + """.stripMargin.trim + }.mkString("\n") + + val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) => + if (!generatedIsNotNullChecks(idx)) { + genPredicate(c, input, child.output) + } else { + "" + } + }.mkString("\n") + + // Reset the isNull to false for the not-null columns, then the followed operators could + // generate better code (remove dead branches). + val resultVars = input.zipWithIndex.map { case (ev, i) => + if (notNullAttributes.contains(child.output(i).exprId)) { + ev.isNull = "false" + } + ev + } + + s""" + |$generated + |$nullChecks + |$numOutput.add(1); + |${consume(ctx, resultVars)} + """.stripMargin + } + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + child.execute().mapPartitionsInternal { iter => + val predicate = newPredicate(condition, child.output) + iter.filter { row => + val r = predicate(row) + if (r) numOutputRows += 1 + r + } + } + } + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering +} + +/** + * Physical plan for sampling the dataset. + * + * @param lowerBound Lower-bound of the sampling probability (usually 0.0) + * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled + * will be ub - lb. + * @param withReplacement Whether to sample with replacement. + * @param seed the random seed + * @param child the SparkPlan + */ +case class SampleExec( + lowerBound: Double, + upperBound: Double, + withReplacement: Boolean, + seed: Long, + child: SparkPlan) extends UnaryExecNode with CodegenSupport { + override def output: Seq[Attribute] = child.output + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + protected override def doExecute(): RDD[InternalRow] = { + if (withReplacement) { + // Disable gap sampling since the gap sampling method buffers two rows internally, + // requiring us to copy the row, which is more expensive than the random number generator. + new PartitionwiseSampledRDD[InternalRow, InternalRow]( + child.execute(), + new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false), + preservesPartitioning = true, + seed) + } else { + child.execute().randomSampleWithRange(lowerBound, upperBound, seed) + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val numOutput = metricTerm(ctx, "numOutputRows") + val sampler = ctx.freshName("sampler") + + if (withReplacement) { + val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName + val initSampler = ctx.freshName("initSampler") + ctx.addMutableState(s"$samplerClass", sampler, + s"$initSampler();") + + ctx.addNewFunction(initSampler, + s""" + | private void $initSampler() { + | $sampler = new $samplerClass($upperBound - $lowerBound, false); + | java.util.Random random = new java.util.Random(${seed}L); + | long randomSeed = random.nextLong(); + | int loopCount = 0; + | while (loopCount < partitionIndex) { + | randomSeed = random.nextLong(); + | loopCount += 1; + | } + | $sampler.setSeed(randomSeed); + | } + """.stripMargin.trim) + + val samplingCount = ctx.freshName("samplingCount") + s""" + | int $samplingCount = $sampler.sample(); + | while ($samplingCount-- > 0) { + | $numOutput.add(1); + | ${consume(ctx, input)} + | } + """.stripMargin.trim + } else { + val samplerClass = classOf[BernoulliCellSampler[UnsafeRow]].getName + ctx.addMutableState(s"$samplerClass", sampler, + s""" + | $sampler = new $samplerClass($lowerBound, $upperBound, false); + | $sampler.setSeed(${seed}L + partitionIndex); + """.stripMargin.trim) + + s""" + | if ($sampler.sample() == 0) continue; + | $numOutput.add(1); + | ${consume(ctx, input)} + """.stripMargin.trim + } + } +} + + +/** + * Physical plan for range (generating a range of 64 bit numbers. + * + * @param start first number in the range, inclusive. + * @param step size of the step increment. + * @param numSlices number of partitions. + * @param numElements total number of elements to output. + * @param output output attributes. + */ +case class RangeExec( + start: Long, + step: Long, + numSlices: Int, + numElements: BigInt, + output: Seq[Attribute]) + extends LeafExecNode with CodegenSupport { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) + + // output attributes should not affect the results + override lazy val cleanArgs: Seq[Any] = Seq(start, step, numSlices, numElements) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + sqlContext.sparkContext.parallelize(0 until numSlices, numSlices) + .map(i => InternalRow(i)) :: Nil + } + + protected override def doProduce(ctx: CodegenContext): String = { + val numOutput = metricTerm(ctx, "numOutputRows") + + val initTerm = ctx.freshName("initRange") + ctx.addMutableState("boolean", initTerm, s"$initTerm = false;") + val partitionEnd = ctx.freshName("partitionEnd") + ctx.addMutableState("long", partitionEnd, s"$partitionEnd = 0L;") + val number = ctx.freshName("number") + ctx.addMutableState("long", number, s"$number = 0L;") + val overflow = ctx.freshName("overflow") + ctx.addMutableState("boolean", overflow, s"$overflow = false;") + + val value = ctx.freshName("value") + val ev = ExprCode("", "false", value) + val BigInt = classOf[java.math.BigInteger].getName + val checkEnd = if (step > 0) { + s"$number < $partitionEnd" + } else { + s"$number > $partitionEnd" + } + + ctx.addNewFunction("initRange", + s""" + | private void initRange(int idx) { + | $BigInt index = $BigInt.valueOf(idx); + | $BigInt numSlice = $BigInt.valueOf(${numSlices}L); + | $BigInt numElement = $BigInt.valueOf(${numElements.toLong}L); + | $BigInt step = $BigInt.valueOf(${step}L); + | $BigInt start = $BigInt.valueOf(${start}L); + | + | $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); + | if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { + | $number = Long.MAX_VALUE; + | } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { + | $number = Long.MIN_VALUE; + | } else { + | $number = st.longValue(); + | } + | + | $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice) + | .multiply(step).add(start); + | if (end.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { + | $partitionEnd = Long.MAX_VALUE; + | } else if (end.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { + | $partitionEnd = Long.MIN_VALUE; + | } else { + | $partitionEnd = end.longValue(); + | } + | + | $numOutput.add(($partitionEnd - $number) / ${step}L); + | } + """.stripMargin) + + val input = ctx.freshName("input") + // Right now, Range is only used when there is one upstream. + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + s""" + | // initialize Range + | if (!$initTerm) { + | $initTerm = true; + | initRange(partitionIndex); + | } + | + | while (!$overflow && $checkEnd) { + | long $value = $number; + | $number += ${step}L; + | if ($number < $value ^ ${step}L < 0) { + | $overflow = true; + | } + | ${consume(ctx, Seq(ev))} + | if (shouldStop()) return; + | } + """.stripMargin + } + + protected override def doExecute(): RDD[InternalRow] = { + val numOutputRows = longMetric("numOutputRows") + sqlContext + .sparkContext + .parallelize(0 until numSlices, numSlices) + .mapPartitionsWithIndex { (i, _) => + val partitionStart = (i * numElements) / numSlices * step + start + val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start + def getSafeMargin(bi: BigInt): Long = + if (bi.isValidLong) { + bi.toLong + } else if (bi > 0) { + Long.MaxValue + } else { + Long.MinValue + } + val safePartitionStart = getSafeMargin(partitionStart) + val safePartitionEnd = getSafeMargin(partitionEnd) + val rowSize = UnsafeRow.calculateBitSetWidthInBytes(1) + LongType.defaultSize + val unsafeRow = UnsafeRow.createFromByteArray(rowSize, 1) + + new Iterator[InternalRow] { + private[this] var number: Long = safePartitionStart + private[this] var overflow: Boolean = false + + override def hasNext = + if (!overflow) { + if (step > 0) { + number < safePartitionEnd + } else { + number > safePartitionEnd + } + } else false + + override def next() = { + val ret = number + number += step + if (number < ret ^ step < 0) { + // we have Long.MaxValue + Long.MaxValue < Long.MaxValue + // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step + // back, we are pretty sure that we have an overflow. + overflow = true + } + + numOutputRows += 1 + unsafeRow.setLong(0, ret) + unsafeRow + } + } + } + } +} + +/** + * Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL. + */ +case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { + override def output: Seq[Attribute] = + children.map(_.output).transpose.map(attrs => + attrs.head.withNullability(attrs.exists(_.nullable))) + + protected override def doExecute(): RDD[InternalRow] = + sparkContext.union(children.map(_.execute())) +} + +/** + * Physical plan for returning a new RDD that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + */ +case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = { + if (numPartitions == 1) SinglePartition + else UnknownPartitioning(numPartitions) + } + + protected override def doExecute(): RDD[InternalRow] = { + child.execute().coalesce(numPartitions, shuffle = false) + } +} + +/** + * Physical plan for returning a table with the elements from left that are not in right using + * the built-in spark subtract function. + */ +case class ExceptExec(left: SparkPlan, right: SparkPlan) extends BinaryExecNode { + override def output: Seq[Attribute] = left.output + + protected override def doExecute(): RDD[InternalRow] = { + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + } +} + +/** + * A plan node that does nothing but lie about the output of its child. Used to spice a + * (hopefully structurally equivalent) tree from a different optimization sequence into an already + * resolved tree. + */ +case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends SparkPlan { + def children: Seq[SparkPlan] = child :: Nil + + protected override def doExecute(): RDD[InternalRow] = child.execute() +} + +/** + * Physical plan for a subquery. + * + * This is used to generate tree string for SparkScalarSubquery. + */ +case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala new file mode 100644 index 0000000000..5be5d0c2b0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Dataset, Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + + +case class CacheTableCommand( + tableName: String, + plan: Option[LogicalPlan], + isLazy: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + plan.foreach { logicalPlan => + sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName) + } + sqlContext.cacheTable(tableName) + + if (!isLazy) { + // Performs eager caching + sqlContext.table(tableName).count() + } + + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + + +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.table(tableName).unpersist(blocking = false) + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Clear all cached data from the in-memory cache. + */ +case object ClearCacheCommand extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.clearCache() + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 971770a97b..0fd7fa92a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -78,6 +78,15 @@ private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkP * * Note that this command takes in a logical plan, runs the optimizer on the logical plan * (but do NOT actually execute it). + * + * {{{ + * EXPLAIN (EXTENDED|CODEGEN) SELECT * FROM ... + * }}} + * + * @param logicalPlan plan to explain + * @param output output schema + * @param extended whether to do extended explain or not + * @param codegen whether to output generated code from whole-stage codegen or not */ case class ExplainCommand( logicalPlan: LogicalPlan, @@ -89,7 +98,6 @@ case class ExplainCommand( // Run through the optimizer to generate the physical plan. override def run(sqlContext: SQLContext): Seq[Row] = try { - // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. val queryExecution = sqlContext.executePlan(logicalPlan) val outputString = if (codegen) { @@ -104,257 +112,3 @@ case class ExplainCommand( ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } - - -case class CacheTableCommand( - tableName: String, - plan: Option[LogicalPlan], - isLazy: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - plan.foreach { logicalPlan => - sqlContext.registerDataFrameAsTable(Dataset.ofRows(sqlContext, logicalPlan), tableName) - } - sqlContext.cacheTable(tableName) - - if (!isLazy) { - // Performs eager caching - sqlContext.table(tableName).count() - } - - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -case class UncacheTableCommand(tableName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.table(tableName).unpersist(blocking = false) - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - -/** - * Clear all cached data from the in-memory cache. - */ -case object ClearCacheCommand extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.clearCache() - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -/** - * A command for users to get tables in the given database. - * If a databaseName is not given, the current database will be used. - * The syntax of using this command in SQL is: - * {{{ - * SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards']; - * }}} - */ -case class ShowTablesCommand( - databaseName: Option[String], - tableIdentifierPattern: Option[String]) extends RunnableCommand { - - // The result of SHOW TABLES has two columns, tableName and isTemporary. - override val output: Seq[Attribute] = { - AttributeReference("tableName", StringType, nullable = false)() :: - AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Since we need to return a Seq of rows, we will call getTables directly - // instead of calling tables in sqlContext. - val catalog = sqlContext.sessionState.catalog - val db = databaseName.getOrElse(catalog.getCurrentDatabase) - val tables = - tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) - tables.map { t => - val isTemp = t.database.isEmpty - Row(t.table, isTemp) - } - } -} - -/** - * A command for users to list the databases/schemas. - * If a databasePattern is supplied then the databases that only matches the - * pattern would be listed. - * The syntax of using this command in SQL is: - * {{{ - * SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards']; - * }}} - */ -case class ShowDatabasesCommand(databasePattern: Option[String]) extends RunnableCommand { - - // The result of SHOW DATABASES has one column called 'result' - override val output: Seq[Attribute] = { - AttributeReference("result", StringType, nullable = false)() :: Nil - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - val catalog = sqlContext.sessionState.catalog - val databases = - databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases()) - databases.map { d => Row(d) } - } -} - -/** - * A command for users to list the properties for a table If propertyKey is specified, the value - * for the propertyKey is returned. If propertyKey is not specified, all the keys and their - * corresponding values are returned. - * The syntax of using this command in SQL is: - * {{{ - * SHOW TBLPROPERTIES table_name[('propertyKey')]; - * }}} - */ -case class ShowTablePropertiesCommand( - table: TableIdentifier, - propertyKey: Option[String]) extends RunnableCommand { - - override val output: Seq[Attribute] = { - val schema = AttributeReference("value", StringType, nullable = false)() :: Nil - propertyKey match { - case None => AttributeReference("key", StringType, nullable = false)() :: schema - case _ => schema - } - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - val catalog = sqlContext.sessionState.catalog - - if (catalog.isTemporaryTable(table)) { - Seq.empty[Row] - } else { - val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table) - - propertyKey match { - case Some(p) => - val propValue = catalogTable - .properties - .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p") - Seq(Row(propValue)) - case None => - catalogTable.properties.map(p => Row(p._1, p._2)).toSeq - } - } - } -} - -/** - * A command for users to list all of the registered functions. - * The syntax of using this command in SQL is: - * {{{ - * SHOW FUNCTIONS [LIKE pattern] - * }}} - * For the pattern, '*' matches any sequence of characters (including no characters) and - * '|' is for alternation. - * For example, "show functions like 'yea*|windo*'" will return "window" and "year". - * - * TODO currently we are simply ignore the db - */ -case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase) - // If pattern is not specified, we use '*', which is used to - // match any sequence of characters (including no characters). - val functionNames = - sqlContext.sessionState.catalog - .listFunctions(dbName, pattern.getOrElse("*")) - .map(_.unquotedString) - // The session catalog caches some persistent functions in the FunctionRegistry - // so there can be duplicates. - functionNames.distinct.sorted.map(Row(_)) - } -} - -/** - * A command for users to get the usage of a registered function. - * The syntax of using this command in SQL is - * {{{ - * DESCRIBE FUNCTION [EXTENDED] upper; - * }}} - */ -case class DescribeFunction( - functionName: String, - isExtended: Boolean) extends RunnableCommand { - - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function_desc", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - private def replaceFunctionName(usage: String, functionName: String): String = { - if (usage == null) { - "To be added." - } else { - usage.replaceAll("_FUNC_", functionName) - } - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions. - functionName.toLowerCase match { - case "<>" => - Row(s"Function: $functionName") :: - Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil - case "!=" => - Row(s"Function: $functionName") :: - Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil - case "between" => - Row(s"Function: between") :: - Row(s"Usage: a [NOT] BETWEEN b AND c - " + - s"evaluate if a is [not] in between b and c") :: Nil - case "case" => - Row(s"Function: case") :: - Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " + - s"When a = b, returns c; when a = d, return e; else return f") :: Nil - case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match { - case Some(info) => - val result = - Row(s"Function: ${info.getName}") :: - Row(s"Class: ${info.getClassName}") :: - Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil - - if (isExtended) { - result :+ - Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") - } else { - result - } - - case None => Seq(Row(s"Function: $functionName not found.")) - } - } - } -} - -case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.sessionState.catalog.setCurrentDatabase(databaseName) - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala new file mode 100644 index 0000000000..33cc10d53a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/databases.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + + +/** + * A command for users to list the databases/schemas. + * If a databasePattern is supplied then the databases that only matches the + * pattern would be listed. + * The syntax of using this command in SQL is: + * {{{ + * SHOW (DATABASES|SCHEMAS) [LIKE 'identifier_with_wildcards']; + * }}} + */ +case class ShowDatabasesCommand(databasePattern: Option[String]) extends RunnableCommand { + + // The result of SHOW DATABASES has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + val databases = + databasePattern.map(catalog.listDatabases(_)).getOrElse(catalog.listDatabases()) + databases.map { d => Row(d) } + } +} + + +/** + * Command for setting the current database. + * {{{ + * USE database_name; + * }}} + */ +case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.sessionState.catalog.setCurrentDatabase(databaseName) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index c6e601799f..89ccacdc73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogFunction -import org.apache.spark.sql.catalyst.expressions.ExpressionInfo +import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo} +import org.apache.spark.sql.types.{StringType, StructField, StructType} /** @@ -73,6 +74,69 @@ case class CreateFunction( } } + +/** + * A command for users to get the usage of a registered function. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE FUNCTION [EXTENDED] upper; + * }}} + */ +case class DescribeFunction( + functionName: String, + isExtended: Boolean) extends RunnableCommand { + + override val output: Seq[Attribute] = { + val schema = StructType(StructField("function_desc", StringType, nullable = false) :: Nil) + schema.toAttributes + } + + private def replaceFunctionName(usage: String, functionName: String): String = { + if (usage == null) { + "To be added." + } else { + usage.replaceAll("_FUNC_", functionName) + } + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Hard code "<>", "!=", "between", and "case" for now as there is no corresponding functions. + functionName.toLowerCase match { + case "<>" => + Row(s"Function: $functionName") :: + Row(s"Usage: a <> b - Returns TRUE if a is not equal to b") :: Nil + case "!=" => + Row(s"Function: $functionName") :: + Row(s"Usage: a != b - Returns TRUE if a is not equal to b") :: Nil + case "between" => + Row(s"Function: between") :: + Row(s"Usage: a [NOT] BETWEEN b AND c - " + + s"evaluate if a is [not] in between b and c") :: Nil + case "case" => + Row(s"Function: case") :: + Row(s"Usage: CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END - " + + s"When a = b, returns c; when a = d, return e; else return f") :: Nil + case _ => sqlContext.sessionState.functionRegistry.lookupFunction(functionName) match { + case Some(info) => + val result = + Row(s"Function: ${info.getName}") :: + Row(s"Class: ${info.getClassName}") :: + Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil + + if (isExtended) { + result :+ + Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") + } else { + result + } + + case None => Seq(Row(s"Function: $functionName not found.")) + } + } + } +} + + /** * The DDL command that drops a function. * ifExists: returns an error if the function doesn't exist, unless this is true. @@ -103,3 +167,36 @@ case class DropFunction( Seq.empty[Row] } } + + +/** + * A command for users to list all of the registered functions. + * The syntax of using this command in SQL is: + * {{{ + * SHOW FUNCTIONS [LIKE pattern] + * }}} + * For the pattern, '*' matches any sequence of characters (including no characters) and + * '|' is for alternation. + * For example, "show functions like 'yea*|windo*'" will return "window" and "year". + * + * TODO currently we are simply ignore the db + */ +case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { + override val output: Seq[Attribute] = { + val schema = StructType(StructField("function", StringType, nullable = false) :: Nil) + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + val dbName = db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase) + // If pattern is not specified, we use '*', which is used to + // match any sequence of characters (including no characters). + val functionNames = + sqlContext.sessionState.catalog + .listFunctions(dbName, pattern.getOrElse("*")) + .map(_.unquotedString) + // The session catalog caches some persistent functions in the FunctionRegistry + // so there can be duplicates. + functionNames.distinct.sorted.map(Row(_)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index b7e3056f92..eae8fe8975 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, ExternalCatalog} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{MetadataBuilder, StringType} +import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -313,3 +313,78 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) result } } + + +/** + * A command for users to get tables in the given database. + * If a databaseName is not given, the current database will be used. + * The syntax of using this command in SQL is: + * {{{ + * SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards']; + * }}} + */ +case class ShowTablesCommand( + databaseName: Option[String], + tableIdentifierPattern: Option[String]) extends RunnableCommand { + + // The result of SHOW TABLES has two columns, tableName and isTemporary. + override val output: Seq[Attribute] = { + AttributeReference("tableName", StringType, nullable = false)() :: + AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Since we need to return a Seq of rows, we will call getTables directly + // instead of calling tables in sqlContext. + val catalog = sqlContext.sessionState.catalog + val db = databaseName.getOrElse(catalog.getCurrentDatabase) + val tables = + tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) + tables.map { t => + val isTemp = t.database.isEmpty + Row(t.table, isTemp) + } + } +} + + +/** + * A command for users to list the properties for a table If propertyKey is specified, the value + * for the propertyKey is returned. If propertyKey is not specified, all the keys and their + * corresponding values are returned. + * The syntax of using this command in SQL is: + * {{{ + * SHOW TBLPROPERTIES table_name[('propertyKey')]; + * }}} + */ +case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Option[String]) + extends RunnableCommand { + + override val output: Seq[Attribute] = { + val schema = AttributeReference("value", StringType, nullable = false)() :: Nil + propertyKey match { + case None => AttributeReference("key", StringType, nullable = false)() :: schema + case _ => schema + } + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + val catalog = sqlContext.sessionState.catalog + + if (catalog.isTemporaryTable(table)) { + Seq.empty[Row] + } else { + val catalogTable = sqlContext.sessionState.catalog.getTableMetadata(table) + + propertyKey match { + case Some(p) => + val propValue = catalogTable + .properties + .getOrElse(p, s"Table ${catalogTable.qualifiedName} does not have property: $p") + Seq(Row(propValue)) + case None => + catalogTable.properties.map(p => Row(p._1, p._2)).toSeq + } + } + } +} -- cgit v1.2.3