aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSrinath Shankar <srinath@databricks.com>2016-09-03 00:20:43 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-03 00:20:43 +0200
commite6132a6cf10df8b12af8dd8d1a2c563792b5cc5a (patch)
treed706ac4d4091a7ae31eda5c7d62c2d8c2c4a7414
parenta2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a (diff)
downloadspark-e6132a6cf10df8b12af8dd8d1a2c563792b5cc5a.tar.gz
spark-e6132a6cf10df8b12af8dd8d1a2c563792b5cc5a.tar.bz2
spark-e6132a6cf10df8b12af8dd8d1a2c563792b5cc5a.zip
[SPARK-17298][SQL] Require explicit CROSS join for cartesian products
## What changes were proposed in this pull request? Require the use of CROSS join syntax in SQL (and a new crossJoin DataFrame API) to specify explicit cartesian products between relations. By cartesian product we mean a join between relations R and S where there is no join condition involving columns from both R and S. If a cartesian product is detected in the absence of an explicit CROSS join, an error must be thrown. Turning on the "spark.sql.crossJoin.enabled" configuration flag will disable this check and allow cartesian products without an explicit CROSS join. The new crossJoin DataFrame API must be used to specify explicit cross joins. The existing join(DataFrame) method will produce a INNER join that will require a subsequent join condition. That is df1.join(df2) is equivalent to select * from df1, df2. ## How was this patch tested? Added cross-join.sql to the SQLQueryTestSuite to test the check for cartesian products. Added a couple of tests to the DataFrameJoinSuite to test the crossJoin API. Modified various other test suites to explicitly specify a cross join where an INNER join or a comma-separated list was previously used. Author: Srinath Shankar <srinath@databricks.com> Closes #14866 from srinathshankar/crossjoin.
-rw-r--r--R/pkg/R/DataFrame.R2
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala27
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala60
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/cross-join.sql35
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/cte.sql2
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/outer-join.sql5
-rw-r--r--sql/core/src/test/resources/sql-tests/results/cross-join.sql.out129
-rw-r--r--sql/core/src/test/resources/sql-tests/results/cte.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/outer-join.sql.out22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala8
-rw-r--r--sql/hive/src/test/resources/sqlgen/join_2_tables.sql4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
42 files changed, 465 insertions, 127 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a92450274e..d7686972d2 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2276,7 +2276,7 @@ setMethod("join",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL) {
if (is.null(joinExpr)) {
- sdf <- callJMethod(x@sdf, "join", y@sdf)
+ sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
} else {
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
if (is.null(joinType)) {
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a986092f5d..e5eac918a9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -644,7 +644,7 @@ class DataFrame(object):
on = [on]
if on is None or len(on) == 0:
- jdf = self._jdf.join(other._jdf)
+ jdf = self._jdf.crossJoin(other._jdf)
elif isinstance(on[0], basestring):
if how is None:
jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a8af840c1e..0447436ea7 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -375,7 +375,7 @@ setQuantifier
relation
: left=relation
- ((CROSS | joinType) JOIN right=relation joinCriteria?
+ (joinType JOIN right=relation joinCriteria?
| NATURAL joinType JOIN right=relation
) #joinRelation
| relationPrimary #relationDefault
@@ -383,6 +383,7 @@ relation
joinType
: INNER?
+ | CROSS
| LEFT OUTER?
| LEFT SEMI
| RIGHT OUTER?
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 4df100c2a8..75ae588c18 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -36,6 +36,12 @@ trait CatalystConf {
def warehousePath: String
+ /** If true, cartesian products between relations will be allowed for all
+ * join types(inner, (left|right|full) outer).
+ * If false, cartesian products will require explicit CROSS JOIN syntax.
+ */
+ def crossJoinEnabled: Boolean
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
@@ -55,5 +61,6 @@ case class SimpleCatalystConf(
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true,
+ crossJoinEnabled: Boolean = false,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e559f235c5..18f814d6cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1003,7 +1003,7 @@ class Analyzer(
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
j
- case j @ Join(_, right, jt, _) if jt != Inner =>
+ case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
j
@@ -1899,7 +1899,7 @@ class Analyzer(
joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
- case Inner =>
+ case _ : InnerLike =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput
case _ =>
sys.error("Unsupported natural join type " + joinType)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f6e32e29eb..e81370c504 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -94,7 +94,7 @@ object UnsupportedOperationChecker {
joinType match {
- case Inner =>
+ case _: InnerLike =>
if (left.isStreaming && right.isStreaming) {
throwError("Inner join between two streaming DataFrames/Datasets is not supported")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7617d34261..d2f0c97989 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -22,6 +22,7 @@ import scala.collection.immutable.HashSet
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.api.java.function.FilterFunction
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -107,6 +108,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveAliasOnlyProject) ::
+ Batch("Check Cartesian Products", Once,
+ CheckCartesianProducts(conf)) ::
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
Batch("Typed Filter Optimization", fixedPoint,
@@ -838,7 +841,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
split(splitConjunctivePredicates(filterCondition), left, right)
joinType match {
- case Inner =>
+ case _: InnerLike =>
// push down the single side `where` condition into respective sides
val newLeft = leftFilterConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -848,7 +851,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
commonFilterCondition.partition(e => !SubqueryExpression.hasCorrelatedSubquery(e))
val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)
- val join = Join(newLeft, newRight, Inner, newJoinCond)
+ val join = Join(newLeft, newRight, joinType, newJoinCond)
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
@@ -885,7 +888,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
- case Inner | LeftExistence(_) =>
+ case _: InnerLike | LeftExistence(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -933,6 +936,46 @@ object CombineLimits extends Rule[LogicalPlan] {
}
/**
+ * Check if there any cartesian products between joins of any type in the optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ /**
+ * Check if a join is a cartesian product. Returns true if
+ * there are no join conditions involving references from both left and right.
+ */
+ def isCartesianProduct(join: Join): Boolean = {
+ val conditions = join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+ !conditions.map(_.references).exists(refs => refs.exists(join.left.outputSet.contains)
+ && refs.exists(join.right.outputSet.contains))
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan =
+ if (conf.crossJoinEnabled) {
+ plan
+ } else plan transform {
+ case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, condition)
+ if isCartesianProduct(j) =>
+ throw new AnalysisException(
+ s"""Detected cartesian product for ${j.joinType.sql} join between logical plans
+ |${left.treeString(false).trim}
+ |and
+ |${right.treeString(false).trim}
+ |Join condition is missing or trivial.
+ |Use the CROSS JOIN syntax to allow cartesian products between these relations."""
+ .stripMargin)
+ }
+}
+
+/**
* Speeds up aggregates on fixed-precision decimals by executing them on unscaled Long values.
*
* This uses the same rules for increasing the precision and scale of the output as
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 50076b1a41..7400a01918 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -50,7 +50,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
empty(p)
case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
- case Inner => empty(p)
+ case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 158ad3d91f..1621bffd61 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-
/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
* one condition.
@@ -39,39 +38,46 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
*
* The joined plan are picked from left to right, prefer those has at least one join condition.
*
- * @param input a list of LogicalPlans to join.
+ * @param input a list of LogicalPlans to inner join and the type of inner join.
* @param conditions a list of condition for join.
*/
@tailrec
- def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
+ def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression])
+ : LogicalPlan = {
assert(input.size >= 2)
if (input.size == 2) {
val (joinConditions, others) = conditions.partition(
e => !SubqueryExpression.hasCorrelatedSubquery(e))
- val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
+ val ((left, leftJoinType), (right, rightJoinType)) = (input(0), input(1))
+ val innerJoinType = (leftJoinType, rightJoinType) match {
+ case (Inner, Inner) => Inner
+ case (_, _) => Cross
+ }
+ val join = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
join
}
} else {
- val left :: rest = input.toList
+ val (left, _) :: rest = input.toList
// find out the first join that have at least one join condition
- val conditionalJoin = rest.find { plan =>
+ val conditionalJoin = rest.find { planJoinPair =>
+ val plan = planJoinPair._1
val refs = left.outputSet ++ plan.outputSet
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
.exists(_.references.subsetOf(refs))
}
// pick the next one if no condition left
- val right = conditionalJoin.getOrElse(rest.head)
+ val (right, innerJoinType) = conditionalJoin.getOrElse(rest.head)
val joinedRefs = left.outputSet ++ right.outputSet
val (joinConditions, others) = conditions.partition(
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
- val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
+ val joined = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
// should not have reference to same logical plan
- createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
+ createOrderedJoin(Seq((joined, Inner)) ++ rest.filterNot(_._1 eq right), others)
}
}
@@ -82,7 +88,6 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}
-
/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 42fbc16d03..e4cb9f0161 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -539,6 +539,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
def join(ctx: JoinRelationContext, left: LogicalPlan, right: LogicalPlan): Join = {
val baseJoinType = ctx.joinType match {
case null => Inner
+ case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 476c66af76..41cabb8cb3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -159,23 +159,30 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
*/
object ExtractFiltersAndInnerJoins extends PredicateHelper {
- // flatten all inner joins, which are next to each other
- def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match {
- case Join(left, right, Inner, cond) =>
- val (plans, conditions) = flattenJoin(left)
- (plans ++ Seq(right), conditions ++ cond.toSeq)
+ /**
+ * Flatten all inner joins, which are next to each other.
+ * Return a list of logical plans to be joined with a boolean for each plan indicating if it
+ * was involved in an explicit cross join. Also returns the entire list of join conditions for
+ * the left-deep tree.
+ */
+ def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
+ : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match {
+ case Join(left, right, joinType: InnerLike, cond) =>
+ val (plans, conditions) = flattenJoin(left, joinType)
+ (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq)
- case Filter(filterCondition, j @ Join(left, right, Inner, joinCondition)) =>
+ case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) =>
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
- case _ => (Seq(plan), Seq())
+ case _ => (Seq((plan, parentJoinType)), Seq())
}
- def unapply(plan: LogicalPlan): Option[(Seq[LogicalPlan], Seq[Expression])] = plan match {
- case f @ Filter(filterCondition, j @ Join(_, _, Inner, _)) =>
+ def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
+ = plan match {
+ case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) =>
Some(flattenJoin(f))
- case j @ Join(_, _, Inner, _) =>
+ case j @ Join(_, _, joinType, _) =>
Some(flattenJoin(j))
case _ => None
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 80674d9b4b..61e083e6fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -28,6 +28,7 @@ object JoinType {
case "rightouter" | "right" => RightOuter
case "leftsemi" => LeftSemi
case "leftanti" => LeftAnti
+ case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
@@ -35,7 +36,8 @@ object JoinType {
"leftouter", "left",
"rightouter", "right",
"leftsemi",
- "leftanti")
+ "leftanti",
+ "cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
@@ -46,10 +48,24 @@ sealed abstract class JoinType {
def sql: String
}
-case object Inner extends JoinType {
+/**
+ * The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
+ * indicating a cartesian product has been explicitly requested.
+ */
+sealed abstract class InnerLike extends JoinType {
+ def explicitCartesian: Boolean
+}
+
+case object Inner extends InnerLike {
+ override def explicitCartesian: Boolean = false
override def sql: String = "INNER"
}
+case object Cross extends InnerLike {
+ override def explicitCartesian: Boolean = true
+ override def sql: String = "CROSS"
+}
+
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 010aec7ba1..d2d33e40a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -293,7 +293,7 @@ case class Join(
override protected def validConstraints: Set[Expression] = {
joinType match {
- case Inner if condition.isDefined =>
+ case _: InnerLike if condition.isDefined =>
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
@@ -302,7 +302,7 @@ case class Join(
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
left.constraints
- case Inner =>
+ case _: InnerLike =>
left.constraints.union(right.constraints)
case LeftExistence(_) =>
left.constraints
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 13bf034f83..e7c8615bc5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max}
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
@@ -396,7 +396,7 @@ class AnalysisErrorSuite extends AnalysisTest {
}
test("error test for self-join") {
- val join = Join(testRelation, testRelation, Inner, None)
+ val join = Join(testRelation, testRelation, Cross, None)
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
@@ -475,7 +475,7 @@ class AnalysisErrorSuite extends AnalysisTest {
LocalRelation(
AttributeReference("c", BinaryType)(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
- Inner,
+ Cross,
Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("c", BinaryType)(exprId = ExprId(4)))))
@@ -489,7 +489,7 @@ class AnalysisErrorSuite extends AnalysisTest {
LocalRelation(
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
- Inner,
+ Cross,
Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8971edc7d3..50ebad25cd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
@@ -341,7 +341,7 @@ class AnalysisSuite extends AnalysisTest {
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
- Inner, None))
+ Cross, None))
assertAnalysisSuccess(query)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index dbb3e6a527..087718b3ec 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -54,6 +54,18 @@ class JoinOptimizationSuite extends PlanTest {
val z = testRelation.subquery('z)
def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
+ val expectedNoCross = expected map {
+ seq_pair => {
+ val plans = seq_pair._1
+ val noCartesian = plans map { plan => (plan, Inner) }
+ (noCartesian, seq_pair._2)
+ }
+ }
+ testExtractCheckCross(plan, expectedNoCross)
+ }
+
+ def testExtractCheckCross
+ (plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) {
assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
}
@@ -70,6 +82,16 @@ class JoinOptimizationSuite extends PlanTest {
testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+
+ testExtractCheckCross(x.join(y, Cross), Some(Seq((x, Cross), (y, Cross)), Seq()))
+ testExtractCheckCross(x.join(y, Cross).join(z, Cross),
+ Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq()))
+ testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+ Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+ testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+ Some(Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+ testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner),
+ Some(Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === "y.d".attr)))
}
test("reorder inner joins") {
@@ -77,18 +99,28 @@ class JoinOptimizationSuite extends PlanTest {
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
- val originalQuery = {
- x.join(y).join(z)
- .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+ val queryAnswers = Seq(
+ (
+ x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+ x.join(z, condition = Some("x.b".attr === "z.b".attr))
+ .join(y, condition = Some("y.d".attr === "z.a".attr))
+ ),
+ (
+ x.join(y, Cross).join(z, Cross)
+ .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+ x.join(z, Cross, Some("x.b".attr === "z.b".attr))
+ .join(y, Cross, Some("y.d".attr === "z.a".attr))
+ ),
+ (
+ x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
+ x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
+ )
+ )
+
+ queryAnswers foreach { queryAnswerPair =>
+ val optimized = Optimize.execute(queryAnswerPair._1.analyze)
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(queryAnswerPair._2.analyze))
}
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- x.join(z, condition = Some("x.b".attr === "z.b".attr))
- .join(y, condition = Some("y.d".attr === "z.a".attr))
- .analyze
-
- comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
test("broadcasthint sets relation statistics to smallest value") {
@@ -98,7 +130,7 @@ class JoinOptimizationSuite extends PlanTest {
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None),
- BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze
+ BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze
val optimized = Optimize.execute(query)
@@ -106,7 +138,7 @@ class JoinOptimizationSuite extends PlanTest {
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
- Inner, None).analyze
+ Cross, None).analyze
comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index c549832ef3..908dde7a66 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -67,6 +67,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
// Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation.
val testcases = Seq(
(true, true, Inner, None),
+ (true, true, Cross, None),
(true, true, LeftOuter, None),
(true, true, RightOuter, None),
(true, true, FullOuter, None),
@@ -74,6 +75,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(true, true, LeftSemi, None),
(true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(true, false, LeftOuter, None),
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(true, false, FullOuter, None),
@@ -81,6 +83,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(true, false, LeftSemi, None),
(false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, true, RightOuter, None),
(false, true, FullOuter, None),
@@ -88,6 +91,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(false, true, LeftSemi, Some(LocalRelation('a.int))),
(false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 2fcbfc7067..faaea17b64 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -346,7 +346,7 @@ class PlanParserSuite extends PlanTest {
def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = {
tests.foreach(_(sql, jt))
}
- test("cross join", Inner, Seq(testUnconditionalJoin))
+ test("cross join", Cross, Seq(testUnconditionalJoin))
test(",", Inner, Seq(testUnconditionalJoin))
test("join", Inner, testAll)
test("inner join", Inner, testAll)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e7dcf0f51f..3b3cb82078 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -589,9 +589,9 @@ class Dataset[T] private[sql](
def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
/**
- * Cartesian join with another [[DataFrame]].
+ * Join with another [[DataFrame]].
*
- * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+ * Behaves as an INNER JOIN and requires a subsequent join predicate.
*
* @param right Right side of the join operation.
*
@@ -764,6 +764,20 @@ class Dataset[T] private[sql](
}
/**
+ * Explicit cartesian join with another [[DataFrame]].
+ *
+ * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+ *
+ * @param right Right side of the join operation.
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ def crossJoin(right: Dataset[_]): DataFrame = withPlan {
+ Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
+ }
+
+ /**
* :: Experimental ::
* Joins this Dataset returning a [[Tuple2]] for each pair where `condition` evaluates to
* true.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b4899ad688..c389593b4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -140,13 +140,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
- case Inner | LeftOuter | LeftSemi | LeftAnti => true
+ case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
case j: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
- case Inner | RightOuter => true
+ case _: InnerLike | RightOuter => true
case _ => false
}
@@ -200,7 +200,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
- case logical.Join(left, right, Inner, condition) =>
+ case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
@@ -212,8 +212,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition,
- withinBroadcastThreshold = false) :: Nil
+ planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0f24baacd1..0bc261d593 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -79,7 +79,7 @@ case class BroadcastHashJoinExec(
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
joinType match {
- case Inner => codegenInner(ctx, input)
+ case _: InnerLike => codegenInner(ctx, input)
case LeftOuter | RightOuter => codegenOuter(ctx, input)
case LeftSemi => codegenSemi(ctx, input)
case LeftAnti => codegenAnti(ctx, input)
@@ -134,7 +134,7 @@ case class BroadcastHashJoinExec(
ctx.INPUT_ROW = matched
buildPlan.output.zipWithIndex.map { case (a, i) =>
val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
- if (joinType == Inner) {
+ if (joinType.isInstanceOf[InnerLike]) {
ev
} else {
// the variables are needed even there is no matched rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 6a9965f1a2..43cdce7de8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -34,8 +34,7 @@ case class BroadcastNestedLoopJoinExec(
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
- condition: Option[Expression],
- withinBroadcastThreshold: Boolean = true) extends BinaryExecNode {
+ condition: Option[Expression]) extends BinaryExecNode {
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@@ -65,7 +64,7 @@ case class BroadcastNestedLoopJoinExec(
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -340,20 +339,11 @@ case class BroadcastNestedLoopJoinExec(
)
}
- protected override def doPrepare(): Unit = {
- if (!withinBroadcastThreshold && !sqlContext.conf.crossJoinEnabled) {
- throw new AnalysisException("Both sides of this join are outside the broadcasting " +
- "threshold and computing it could be prohibitively expensive. To explicitly enable it, " +
- s"please set ${SQLConf.CROSS_JOINS_ENABLED.key} = true")
- }
- super.doPrepare()
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
val resultRdd = (joinType, buildSide) match {
- case (Inner, _) =>
+ case (_: InnerLike, _) =>
innerJoin(broadcastedRelation)
case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) =>
outerJoin(broadcastedRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 57866df90d..15dc9b4066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -91,15 +91,6 @@ case class CartesianProductExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
- protected override def doPrepare(): Unit = {
- if (!sqlContext.conf.crossJoinEnabled) {
- throw new AnalysisException("Cartesian joins could be prohibitively expensive and are " +
- "disabled by default. To explicitly enable them, please set " +
- s"${SQLConf.CROSS_JOINS_ENABLED.key} = true")
- }
- super.doPrepare()
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index d46a80423f..fb6bfa7b27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -38,7 +38,7 @@ trait HashJoin {
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -225,7 +225,7 @@ trait HashJoin {
numOutputRows: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
- case Inner =>
+ case _: InnerLike =>
innerJoin(streamedIter, hashed)
case LeftOuter | RightOuter =>
outerJoin(streamedIter, hashed)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 5c9c1e6062..b46af2a99a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -45,7 +45,7 @@ case class SortMergeJoinExec(
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -64,7 +64,8 @@ case class SortMergeJoinExec(
}
override def outputPartitioning: Partitioning = joinType match {
- case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
+ case _: InnerLike =>
+ PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
// For left and right outer joins, the output is partitioned by the streamed input's join keys.
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
@@ -111,7 +112,7 @@ case class SortMergeJoinExec(
val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)
joinType match {
- case Inner =>
+ case _: InnerLike =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] var currentRightMatches: ArrayBuffer[InternalRow] = _
@@ -318,7 +319,7 @@ case class SortMergeJoinExec(
}
override def supportCodegen: Boolean = {
- joinType == Inner
+ joinType.isInstanceOf[InnerLike]
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a54342f82e..1d6ca5a965 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -362,7 +362,8 @@ object SQLConf {
.createWithDefault(true)
val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled")
- .doc("When false, we will throw an error if a query contains a cross join")
+ .doc("When false, we will throw an error if a query contains a cartesian product without " +
+ "explicit CROSS JOIN syntax.")
.booleanConf
.createWithDefault(false)
@@ -683,8 +684,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
- def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
-
// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
@@ -709,6 +708,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
+
+ override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
new file mode 100644
index 0000000000..aa73124374
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
@@ -0,0 +1,35 @@
+-- Cross join detection and error checking is done in JoinSuite since explain output is
+-- used in the error message and the ids are not stable. Only positive cases are checked here.
+
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1);
+
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5)
+ as nt2(k, v2);
+
+-- Cross joins with and without predicates
+SELECT * FROM nt1 cross join nt2;
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k;
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k);
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22;
+
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b;
+
+-- Join reordering
+create temporary view A(a, va) as select * from nt1;
+create temporary view B(b, vb) as select * from nt1;
+create temporary view C(c, vc) as select * from nt1;
+create temporary view D(d, vd) as select * from nt1;
+
+-- Allowed since cross join with C is explicit
+select * from ((A join B on (a = b)) cross join C) join D on (a = d);
+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
index 10d34deff4..3914db2691 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -11,4 +11,4 @@ WITH t AS (SELECT 1 FROM t) SELECT * FROM t;
WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
-- WITH clause should reference the previous CTE
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2;
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
index f50f1ebad9..cdc6c81e10 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
@@ -24,6 +24,9 @@ CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1)
CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1);
+-- Set the cross join enabled flag for the LEFT JOIN test since there's no join condition.
+-- Ultimately the join should be optimized away.
+set spark.sql.crossJoin.enabled = true;
SELECT *
FROM (
SELECT
@@ -31,6 +34,6 @@ SELECT
FROM t1
LEFT JOIN t2 ON false
) t where (t.int_col) is not null;
-
+set spark.sql.crossJoin.enabled = false;
diff --git a/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
new file mode 100644
index 0000000000..562e174fc0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
@@ -0,0 +1,129 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 12
+
+
+-- !query 0
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5)
+ as nt2(k, v2)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT * FROM nt1 cross join nt2
+-- !query 2 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 2 output
+one 1 one 1
+one 1 one 5
+one 1 two 22
+three 3 one 1
+three 3 one 5
+three 3 two 22
+two 2 one 1
+two 2 one 5
+two 2 two 22
+
+
+-- !query 3
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k
+-- !query 3 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 3 output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query 4
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k)
+-- !query 4 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 4 output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query 5
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22
+-- !query 5 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 5 output
+one 1 two 22
+
+
+-- !query 6
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b
+-- !query 6 schema
+struct<key:string,key:string>
+-- !query 6 output
+one two
+
+
+-- !query 7
+create temporary view A(a, va) as select * from nt1
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+create temporary view B(b, vb) as select * from nt1
+-- !query 8 schema
+struct<>
+-- !query 8 output
+
+
+
+-- !query 9
+create temporary view C(c, vc) as select * from nt1
+-- !query 9 schema
+struct<>
+-- !query 9 output
+
+
+
+-- !query 10
+create temporary view D(d, vd) as select * from nt1
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+select * from ((A join B on (a = b)) cross join C) join D on (a = d)
+-- !query 11 schema
+struct<a:string,va:int,b:string,vb:int,c:string,vc:int,d:string,vd:int>
+-- !query 11 output
+one 1 one 1 one 1 one 1
+one 1 one 1 three 3 one 1
+one 1 one 1 two 2 one 1
+three 3 three 3 one 1 three 3
+three 3 three 3 three 3 three 3
+three 3 three 3 two 2 three 3
+two 2 two 2 one 1 two 2
+two 2 two 2 three 3 two 2
+two 2 two 2 two 2 two 2
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
index ddee5bf2d4..9fbad8f380 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -47,7 +47,7 @@ Table or view not found: s2; line 1 pos 26
-- !query 5
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2
-- !query 5 schema
struct<id:int,2:int>
-- !query 5 output
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index b39fdb0e58..cc50b9444b 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 8
-- !query 0
@@ -59,6 +59,14 @@ struct<>
-- !query 5
+set spark.sql.crossJoin.enabled = true
+-- !query 5 schema
+struct<key:string,value:string>
+-- !query 5 output
+spark.sql.crossJoin.enabled
+
+
+-- !query 6
SELECT *
FROM (
SELECT
@@ -66,7 +74,15 @@ SELECT
FROM t1
LEFT JOIN t2 ON false
) t where (t.int_col) is not null
--- !query 5 schema
+-- !query 6 schema
struct<int_col:int>
--- !query 5 output
+-- !query 6 output
97
+
+
+-- !query 7
+set spark.sql.crossJoin.enabled = false
+-- !query 7 schema
+struct<key:string,value:string>
+-- !query 7 output
+spark.sql.crossJoin.enabled
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4abf5e42b9..541ffb58e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -104,6 +104,21 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
.collect().toSeq)
}
+ test("join - cross join") {
+ val df1 = Seq((1, "1"), (3, "3")).toDF("int", "str")
+ val df2 = Seq((2, "2"), (4, "4")).toDF("int", "str")
+
+ checkAnswer(
+ df1.crossJoin(df2),
+ Row(1, "1", 2, "2") :: Row(1, "1", 4, "4") ::
+ Row(3, "3", 2, "2") :: Row(3, "3", 4, "4") :: Nil)
+
+ checkAnswer(
+ df2.crossJoin(df1),
+ Row(2, "2", 1, "1") :: Row(2, "2", 3, "3") ::
+ Row(4, "4", 1, "1") :: Row(4, "4", 3, "3") :: Nil)
+ }
+
test("join - using aliases after self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
@@ -145,7 +160,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1)
// no join key -- should not be a broadcast join
- val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
+ val plan2 = df1.crossJoin(broadcast(df2)).queryExecution.sparkPlan
assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0)
// planner should not crash without a join
@@ -155,7 +170,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
withTempPath { path =>
df1.write.parquet(path.getCanonicalPath)
val pf1 = spark.read.parquet(path.getCanonicalPath)
- assert(df1.join(broadcast(pf1)).count() === 4)
+ assert(df1.crossJoin(broadcast(pf1)).count() === 4)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f89951760f..c2d256bdd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -626,9 +626,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("drop(name: String) search and drop all top level columns that matchs the name") {
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((3, 4)).toDF("a", "b")
- checkAnswer(df1.join(df2), Row(1, 2, 3, 4))
+ checkAnswer(df1.crossJoin(df2), Row(1, 2, 3, 4))
// Finds and drops all columns that match the name (case insensitive).
- checkAnswer(df1.join(df2).drop("A"), Row(2, 4))
+ checkAnswer(df1.crossJoin(df2).drop("A"), Row(2, 4))
}
test("withColumnRenamed") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 8ce6ea66b6..3243f352a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -466,7 +466,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("self join") {
val ds = Seq("1", "2").toDS().as("a")
- val joined = ds.joinWith(ds, lit(true))
+ val joined = ds.joinWith(ds, lit(true), "cross")
checkDataset(joined, ("1", "1"), ("1", "2"), ("2", "1"), ("2", "2"))
}
@@ -486,7 +486,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("Kryo encoder self join") {
implicit val kryoEncoder = Encoders.kryo[KryoData]
val ds = Seq(KryoData(1), KryoData(2)).toDS()
- assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+ assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
Set(
(KryoData(1), KryoData(1)),
(KryoData(1), KryoData(2)),
@@ -514,7 +514,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("Java encoder self join") {
implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
val ds = Seq(JavaData(1), JavaData(2)).toDS()
- assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+ assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
Set(
(JavaData(1), JavaData(1)),
(JavaData(1), JavaData(2)),
@@ -532,7 +532,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val ds2 = Seq((nullInt, "1"), (new java.lang.Integer(22), "2")).toDS()
checkDataset(
- ds1.joinWith(ds2, lit(true)),
+ ds1.joinWith(ds2, lit(true), "cross"),
((nullInt, "1"), (nullInt, "1")),
((nullInt, "1"), (new java.lang.Integer(22), "2")),
((new java.lang.Integer(22), "2"), (nullInt, "1")),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44889d92ee..913b2ae976 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -225,8 +225,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(2, 2, 1, null) ::
Row(2, 2, 2, 2) :: Nil)
}
- assert(e.getMessage.contains("Cartesian joins could be prohibitively expensive and are " +
- "disabled by default"))
+ assert(e.getMessage.contains("Detected cartesian product for INNER join " +
+ "between logical plans"))
}
}
@@ -482,7 +482,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
// we set the threshold is greater than statistic of the cached table testData
withSQLConf(
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString()) {
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString(),
+ SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
assert(statisticSizeInByte(spark.table("testData2")) >
spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
@@ -573,4 +574,34 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(3, 1) ::
Row(3, 2) :: Nil)
}
+
+ test("cross join detection") {
+ testData.createOrReplaceTempView("A")
+ testData.createOrReplaceTempView("B")
+ testData2.createOrReplaceTempView("C")
+ testData3.createOrReplaceTempView("D")
+ upperCaseData.where('N >= 3).createOrReplaceTempView("`right`")
+ val cartesianQueries = Seq(
+ /** The following should error out since there is no explicit cross join */
+ "SELECT * FROM testData inner join testData2",
+ "SELECT * FROM testData left outer join testData2",
+ "SELECT * FROM testData right outer join testData2",
+ "SELECT * FROM testData full outer join testData2",
+ "SELECT * FROM testData, testData2",
+ "SELECT * FROM testData, testData2 where testData.key = 1 and testData2.a = 22",
+ /** The following should fail because after reordering there are cartesian products */
+ "select * from (A join B on (A.key = B.key)) join D on (A.key=D.a) join C",
+ "select * from ((A join B on (A.key = B.key)) join C) join D on (A.key = D.a)",
+ /** Cartesian product involving C, which is not involved in a CROSS join */
+ "select * from ((A join B on (A.key = B.key)) cross join D) join C on (A.key = D.a)");
+
+ def checkCartesianDetection(query: String): Unit = {
+ val e = intercept[Exception] {
+ checkAnswer(sql(query), Nil);
+ }
+ assert(e.getMessage.contains("Detected cartesian product"))
+ }
+
+ cartesianQueries.foreach(checkCartesianDetection)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index d3cfa953a3..afd47897ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -361,7 +361,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
|with
| v0 as (select 0 as key, 1 as value),
| v1 as (select key, count(value) over (partition by key) cnt_val from v0),
- | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+ | v2 as (select v1.key, v1_lag.cnt_val from v1 cross join v1 v1_lag
+ | where v1.key = v1_lag.key)
| select key, cnt_val from v2 order by key limit 1
""".stripMargin), Row(0, 1))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 35dab63672..4408ece112 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -109,8 +109,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
leftPlan: SparkPlan,
rightPlan: SparkPlan,
side: BuildSide) = {
- val shuffledHashJoin =
- joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan)
+ val shuffledHashJoin = joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner,
+ side, None, leftPlan, rightPlan)
val filteredJoin =
boundCondition.map(FilterExec(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
EnsureRequirements(spark.sessionState.conf).apply(filteredJoin)
@@ -122,8 +122,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
boundCondition: Option[Expression],
leftPlan: SparkPlan,
rightPlan: SparkPlan) = {
- val sortMergeJoin =
- joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan)
+ val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
+ leftPlan, rightPlan)
EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
}
diff --git a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
index 9dd200c3c0..0f033a04ae 100644
--- a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
+++ b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
@@ -1,7 +1,7 @@
-- This file is automatically generated by LogicalPlanToSQLSuite.
SELECT COUNT(a.value), b.KEY, a.KEY
-FROM parquet_t1 a, parquet_t1 b
+FROM parquet_t1 a CROSS JOIN parquet_t1 b
GROUP BY a.KEY, b.KEY
HAVING MAX(a.KEY) > 0
--------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
+SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 CROSS JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 9c6da6a628..3e0fdc1f8b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -642,7 +642,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkColumnNames(
"""SELECT x.a, y.a, x.b, y.b
|FROM (SELECT 1 AS a, 2 AS b) x
- |INNER JOIN (SELECT 1 AS a, 2 AS b) y
+ |CROSS JOIN (SELECT 1 AS a, 2 AS b) y
|ON x.a = y.a
""".stripMargin,
"a", "a", "b", "b"
@@ -810,7 +810,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkSQL(
"""
|SELECT COUNT(a.value), b.KEY, a.KEY
- |FROM parquet_t1 a, parquet_t1 b
+ |FROM parquet_t1 a CROSS JOIN parquet_t1 b
|GROUP BY a.KEY, b.KEY
|HAVING MAX(a.KEY) > 0
""".stripMargin,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 3c7dbb449c..1d1a958d3f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -318,10 +318,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("trivial join ON clause",
"SELECT * FROM src a JOIN src b ON a.key = b.key")
- createQueryTest("small.cartesian",
- "SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN " +
- "(SELECT key FROM src WHERE key = 2) b")
-
createQueryTest("length.udf",
"SELECT length(\"test\") FROM src LIMIT 1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e92bbdea75..2f6d9fb96b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -592,9 +592,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("self-join") {
val table = spark.table("normal_parquet")
- val selfJoin = table.as("t1").join(table.as("t2"))
+ val selfJoin = table.as("t1").crossJoin(table.as("t2"))
checkAnswer(selfJoin,
- sql("SELECT * FROM normal_parquet x JOIN normal_parquet y"))
+ sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
}
}