aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--R/pkg/R/DataFrame.R2
-rw-r--r--python/pyspark/sql/dataframe.py2
-rw-r--r--sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g43
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala49
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala25
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala27
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala8
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala60
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/cross-join.sql35
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/cte.sql2
-rw-r--r--sql/core/src/test/resources/sql-tests/inputs/outer-join.sql5
-rw-r--r--sql/core/src/test/resources/sql-tests/results/cross-join.sql.out129
-rw-r--r--sql/core/src/test/resources/sql-tests/results/cte.sql.out2
-rw-r--r--sql/core/src/test/resources/sql-tests/results/outer-join.sql.out22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala8
-rw-r--r--sql/hive/src/test/resources/sqlgen/join_2_tables.sql4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
42 files changed, 465 insertions, 127 deletions
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a92450274e..d7686972d2 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2276,7 +2276,7 @@ setMethod("join",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y, joinExpr = NULL, joinType = NULL) {
if (is.null(joinExpr)) {
- sdf <- callJMethod(x@sdf, "join", y@sdf)
+ sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
} else {
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
if (is.null(joinType)) {
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a986092f5d..e5eac918a9 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -644,7 +644,7 @@ class DataFrame(object):
on = [on]
if on is None or len(on) == 0:
- jdf = self._jdf.join(other._jdf)
+ jdf = self._jdf.crossJoin(other._jdf)
elif isinstance(on[0], basestring):
if how is None:
jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a8af840c1e..0447436ea7 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -375,7 +375,7 @@ setQuantifier
relation
: left=relation
- ((CROSS | joinType) JOIN right=relation joinCriteria?
+ (joinType JOIN right=relation joinCriteria?
| NATURAL joinType JOIN right=relation
) #joinRelation
| relationPrimary #relationDefault
@@ -383,6 +383,7 @@ relation
joinType
: INNER?
+ | CROSS
| LEFT OUTER?
| LEFT SEMI
| RIGHT OUTER?
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 4df100c2a8..75ae588c18 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -36,6 +36,12 @@ trait CatalystConf {
def warehousePath: String
+ /** If true, cartesian products between relations will be allowed for all
+ * join types(inner, (left|right|full) outer).
+ * If false, cartesian products will require explicit CROSS JOIN syntax.
+ */
+ def crossJoinEnabled: Boolean
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
@@ -55,5 +61,6 @@ case class SimpleCatalystConf(
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true,
+ crossJoinEnabled: Boolean = false,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e559f235c5..18f814d6cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1003,7 +1003,7 @@ class Analyzer(
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
j
- case j @ Join(_, right, jt, _) if jt != Inner =>
+ case j @ Join(_, right, jt, _) if !jt.isInstanceOf[InnerLike] =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
j
@@ -1899,7 +1899,7 @@ class Analyzer(
joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
rUniqueOutput.map(_.withNullability(true))
- case Inner =>
+ case _ : InnerLike =>
leftKeys ++ lUniqueOutput ++ rUniqueOutput
case _ =>
sys.error("Unsupported natural join type " + joinType)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f6e32e29eb..e81370c504 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -94,7 +94,7 @@ object UnsupportedOperationChecker {
joinType match {
- case Inner =>
+ case _: InnerLike =>
if (left.isStreaming && right.isStreaming) {
throwError("Inner join between two streaming DataFrames/Datasets is not supported")
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7617d34261..d2f0c97989 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -22,6 +22,7 @@ import scala.collection.immutable.HashSet
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.api.java.function.FilterFunction
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -107,6 +108,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveAliasOnlyProject) ::
+ Batch("Check Cartesian Products", Once,
+ CheckCartesianProducts(conf)) ::
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
Batch("Typed Filter Optimization", fixedPoint,
@@ -838,7 +841,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
split(splitConjunctivePredicates(filterCondition), left, right)
joinType match {
- case Inner =>
+ case _: InnerLike =>
// push down the single side `where` condition into respective sides
val newLeft = leftFilterConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -848,7 +851,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
commonFilterCondition.partition(e => !SubqueryExpression.hasCorrelatedSubquery(e))
val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)
- val join = Join(newLeft, newRight, Inner, newJoinCond)
+ val join = Join(newLeft, newRight, joinType, newJoinCond)
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
@@ -885,7 +888,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
- case Inner | LeftExistence(_) =>
+ case _: InnerLike | LeftExistence(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -933,6 +936,46 @@ object CombineLimits extends Rule[LogicalPlan] {
}
/**
+ * Check if there any cartesian products between joins of any type in the optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+ extends Rule[LogicalPlan] with PredicateHelper {
+ /**
+ * Check if a join is a cartesian product. Returns true if
+ * there are no join conditions involving references from both left and right.
+ */
+ def isCartesianProduct(join: Join): Boolean = {
+ val conditions = join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+ !conditions.map(_.references).exists(refs => refs.exists(join.left.outputSet.contains)
+ && refs.exists(join.right.outputSet.contains))
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan =
+ if (conf.crossJoinEnabled) {
+ plan
+ } else plan transform {
+ case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, condition)
+ if isCartesianProduct(j) =>
+ throw new AnalysisException(
+ s"""Detected cartesian product for ${j.joinType.sql} join between logical plans
+ |${left.treeString(false).trim}
+ |and
+ |${right.treeString(false).trim}
+ |Join condition is missing or trivial.
+ |Use the CROSS JOIN syntax to allow cartesian products between these relations."""
+ .stripMargin)
+ }
+}
+
+/**
* Speeds up aggregates on fixed-precision decimals by executing them on unscaled Long values.
*
* This uses the same rules for increasing the precision and scale of the output as
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 50076b1a41..7400a01918 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -50,7 +50,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
empty(p)
case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
- case Inner => empty(p)
+ case _: InnerLike => empty(p)
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 158ad3d91f..1621bffd61 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-
/**
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
* one condition.
@@ -39,39 +38,46 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
*
* The joined plan are picked from left to right, prefer those has at least one join condition.
*
- * @param input a list of LogicalPlans to join.
+ * @param input a list of LogicalPlans to inner join and the type of inner join.
* @param conditions a list of condition for join.
*/
@tailrec
- def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
+ def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: Seq[Expression])
+ : LogicalPlan = {
assert(input.size >= 2)
if (input.size == 2) {
val (joinConditions, others) = conditions.partition(
e => !SubqueryExpression.hasCorrelatedSubquery(e))
- val join = Join(input(0), input(1), Inner, joinConditions.reduceLeftOption(And))
+ val ((left, leftJoinType), (right, rightJoinType)) = (input(0), input(1))
+ val innerJoinType = (leftJoinType, rightJoinType) match {
+ case (Inner, Inner) => Inner
+ case (_, _) => Cross
+ }
+ val join = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
join
}
} else {
- val left :: rest = input.toList
+ val (left, _) :: rest = input.toList
// find out the first join that have at least one join condition
- val conditionalJoin = rest.find { plan =>
+ val conditionalJoin = rest.find { planJoinPair =>
+ val plan = planJoinPair._1
val refs = left.outputSet ++ plan.outputSet
conditions.filterNot(canEvaluate(_, left)).filterNot(canEvaluate(_, plan))
.exists(_.references.subsetOf(refs))
}
// pick the next one if no condition left
- val right = conditionalJoin.getOrElse(rest.head)
+ val (right, innerJoinType) = conditionalJoin.getOrElse(rest.head)
val joinedRefs = left.outputSet ++ right.outputSet
val (joinConditions, others) = conditions.partition(
e => e.references.subsetOf(joinedRefs) && !SubqueryExpression.hasCorrelatedSubquery(e))
- val joined = Join(left, right, Inner, joinConditions.reduceLeftOption(And))
+ val joined = Join(left, right, innerJoinType, joinConditions.reduceLeftOption(And))
// should not have reference to same logical plan
- createOrderedJoin(Seq(joined) ++ rest.filterNot(_ eq right), others)
+ createOrderedJoin(Seq((joined, Inner)) ++ rest.filterNot(_._1 eq right), others)
}
}
@@ -82,7 +88,6 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
}
}
-
/**
* Elimination of outer joins, if the predicates can restrict the result sets so that
* all null-supplying rows are eliminated
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 42fbc16d03..e4cb9f0161 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -539,6 +539,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
def join(ctx: JoinRelationContext, left: LogicalPlan, right: LogicalPlan): Join = {
val baseJoinType = ctx.joinType match {
case null => Inner
+ case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 476c66af76..41cabb8cb3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -159,23 +159,30 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
*/
object ExtractFiltersAndInnerJoins extends PredicateHelper {
- // flatten all inner joins, which are next to each other
- def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) = plan match {
- case Join(left, right, Inner, cond) =>
- val (plans, conditions) = flattenJoin(left)
- (plans ++ Seq(right), conditions ++ cond.toSeq)
+ /**
+ * Flatten all inner joins, which are next to each other.
+ * Return a list of logical plans to be joined with a boolean for each plan indicating if it
+ * was involved in an explicit cross join. Also returns the entire list of join conditions for
+ * the left-deep tree.
+ */
+ def flattenJoin(plan: LogicalPlan, parentJoinType: InnerLike = Inner)
+ : (Seq[(LogicalPlan, InnerLike)], Seq[Expression]) = plan match {
+ case Join(left, right, joinType: InnerLike, cond) =>
+ val (plans, conditions) = flattenJoin(left, joinType)
+ (plans ++ Seq((right, joinType)), conditions ++ cond.toSeq)
- case Filter(filterCondition, j @ Join(left, right, Inner, joinCondition)) =>
+ case Filter(filterCondition, j @ Join(left, right, _: InnerLike, joinCondition)) =>
val (plans, conditions) = flattenJoin(j)
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
- case _ => (Seq(plan), Seq())
+ case _ => (Seq((plan, parentJoinType)), Seq())
}
- def unapply(plan: LogicalPlan): Option[(Seq[LogicalPlan], Seq[Expression])] = plan match {
- case f @ Filter(filterCondition, j @ Join(_, _, Inner, _)) =>
+ def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]
+ = plan match {
+ case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, _)) =>
Some(flattenJoin(f))
- case j @ Join(_, _, Inner, _) =>
+ case j @ Join(_, _, joinType, _) =>
Some(flattenJoin(j))
case _ => None
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 80674d9b4b..61e083e6fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -28,6 +28,7 @@ object JoinType {
case "rightouter" | "right" => RightOuter
case "leftsemi" => LeftSemi
case "leftanti" => LeftAnti
+ case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
@@ -35,7 +36,8 @@ object JoinType {
"leftouter", "left",
"rightouter", "right",
"leftsemi",
- "leftanti")
+ "leftanti",
+ "cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
@@ -46,10 +48,24 @@ sealed abstract class JoinType {
def sql: String
}
-case object Inner extends JoinType {
+/**
+ * The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
+ * indicating a cartesian product has been explicitly requested.
+ */
+sealed abstract class InnerLike extends JoinType {
+ def explicitCartesian: Boolean
+}
+
+case object Inner extends InnerLike {
+ override def explicitCartesian: Boolean = false
override def sql: String = "INNER"
}
+case object Cross extends InnerLike {
+ override def explicitCartesian: Boolean = true
+ override def sql: String = "CROSS"
+}
+
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 010aec7ba1..d2d33e40a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -293,7 +293,7 @@ case class Join(
override protected def validConstraints: Set[Expression] = {
joinType match {
- case Inner if condition.isDefined =>
+ case _: InnerLike if condition.isDefined =>
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
@@ -302,7 +302,7 @@ case class Join(
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
left.constraints
- case Inner =>
+ case _: InnerLike =>
left.constraints.union(right.constraints)
case LeftExistence(_) =>
left.constraints
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 13bf034f83..e7c8615bc5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count, Max}
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
@@ -396,7 +396,7 @@ class AnalysisErrorSuite extends AnalysisTest {
}
test("error test for self-join") {
- val join = Join(testRelation, testRelation, Inner, None)
+ val join = Join(testRelation, testRelation, Cross, None)
val error = intercept[AnalysisException] {
SimpleAnalyzer.checkAnalysis(join)
}
@@ -475,7 +475,7 @@ class AnalysisErrorSuite extends AnalysisTest {
LocalRelation(
AttributeReference("c", BinaryType)(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
- Inner,
+ Cross,
Some(EqualTo(AttributeReference("a", BinaryType)(exprId = ExprId(2)),
AttributeReference("c", BinaryType)(exprId = ExprId(4)))))
@@ -489,7 +489,7 @@ class AnalysisErrorSuite extends AnalysisTest {
LocalRelation(
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)),
AttributeReference("d", IntegerType)(exprId = ExprId(3))),
- Inner,
+ Cross,
Some(EqualTo(AttributeReference("a", MapType(IntegerType, StringType))(exprId = ExprId(2)),
AttributeReference("c", MapType(IntegerType, StringType))(exprId = ExprId(4)))))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8971edc7d3..50ebad25cd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
@@ -341,7 +341,7 @@ class AnalysisSuite extends AnalysisTest {
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
- Inner, None))
+ Cross, None))
assertAnalysisSuccess(query)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index dbb3e6a527..087718b3ec 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
+import org.apache.spark.sql.catalyst.plans.{Cross, Inner, InnerLike, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -54,6 +54,18 @@ class JoinOptimizationSuite extends PlanTest {
val z = testRelation.subquery('z)
def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], Seq[Expression])]) {
+ val expectedNoCross = expected map {
+ seq_pair => {
+ val plans = seq_pair._1
+ val noCartesian = plans map { plan => (plan, Inner) }
+ (noCartesian, seq_pair._2)
+ }
+ }
+ testExtractCheckCross(plan, expectedNoCross)
+ }
+
+ def testExtractCheckCross
+ (plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])]) {
assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
}
@@ -70,6 +82,16 @@ class JoinOptimizationSuite extends PlanTest {
testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+
+ testExtractCheckCross(x.join(y, Cross), Some(Seq((x, Cross), (y, Cross)), Seq()))
+ testExtractCheckCross(x.join(y, Cross).join(z, Cross),
+ Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq()))
+ testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+ Some(Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+ testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === "y.d".attr)).join(z, Cross),
+ Some(Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === "y.d".attr)))
+ testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === "y.d".attr)).join(z, Inner),
+ Some(Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === "y.d".attr)))
}
test("reorder inner joins") {
@@ -77,18 +99,28 @@ class JoinOptimizationSuite extends PlanTest {
val y = testRelation1.subquery('y)
val z = testRelation.subquery('z)
- val originalQuery = {
- x.join(y).join(z)
- .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+ val queryAnswers = Seq(
+ (
+ x.join(y).join(z).where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+ x.join(z, condition = Some("x.b".attr === "z.b".attr))
+ .join(y, condition = Some("y.d".attr === "z.a".attr))
+ ),
+ (
+ x.join(y, Cross).join(z, Cross)
+ .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr)),
+ x.join(z, Cross, Some("x.b".attr === "z.b".attr))
+ .join(y, Cross, Some("y.d".attr === "z.a".attr))
+ ),
+ (
+ x.join(y, Inner).join(z, Cross).where("x.b".attr === "z.a".attr),
+ x.join(z, Cross, Some("x.b".attr === "z.a".attr)).join(y, Inner)
+ )
+ )
+
+ queryAnswers foreach { queryAnswerPair =>
+ val optimized = Optimize.execute(queryAnswerPair._1.analyze)
+ comparePlans(optimized, analysis.EliminateSubqueryAliases(queryAnswerPair._2.analyze))
}
-
- val optimized = Optimize.execute(originalQuery.analyze)
- val correctAnswer =
- x.join(z, condition = Some("x.b".attr === "z.b".attr))
- .join(y, condition = Some("y.d".attr === "z.a".attr))
- .analyze
-
- comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
test("broadcasthint sets relation statistics to smallest value") {
@@ -98,7 +130,7 @@ class JoinOptimizationSuite extends PlanTest {
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None),
- BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze
+ BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze
val optimized = Optimize.execute(query)
@@ -106,7 +138,7 @@ class JoinOptimizationSuite extends PlanTest {
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
- Inner, None).analyze
+ Cross, None).analyze
comparePlans(optimized, expected)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index c549832ef3..908dde7a66 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -67,6 +67,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
// Note that `None` is used to compare with OptimizeWithoutPropagateEmptyRelation.
val testcases = Seq(
(true, true, Inner, None),
+ (true, true, Cross, None),
(true, true, LeftOuter, None),
(true, true, RightOuter, None),
(true, true, FullOuter, None),
@@ -74,6 +75,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(true, true, LeftSemi, None),
(true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(true, false, LeftOuter, None),
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(true, false, FullOuter, None),
@@ -81,6 +83,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(true, false, LeftSemi, None),
(false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, true, RightOuter, None),
(false, true, FullOuter, None),
@@ -88,6 +91,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
(false, true, LeftSemi, Some(LocalRelation('a.int))),
(false, false, Inner, Some(LocalRelation('a.int, 'b.int))),
+ (false, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, false, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(false, false, FullOuter, Some(LocalRelation('a.int, 'b.int))),
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 2fcbfc7067..faaea17b64 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -346,7 +346,7 @@ class PlanParserSuite extends PlanTest {
def test(sql: String, jt: JoinType, tests: Seq[(String, JoinType) => Unit]): Unit = {
tests.foreach(_(sql, jt))
}
- test("cross join", Inner, Seq(testUnconditionalJoin))
+ test("cross join", Cross, Seq(testUnconditionalJoin))
test(",", Inner, Seq(testUnconditionalJoin))
test("join", Inner, testAll)
test("inner join", Inner, testAll)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e7dcf0f51f..3b3cb82078 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -589,9 +589,9 @@ class Dataset[T] private[sql](
def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
/**
- * Cartesian join with another [[DataFrame]].
+ * Join with another [[DataFrame]].
*
- * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+ * Behaves as an INNER JOIN and requires a subsequent join predicate.
*
* @param right Right side of the join operation.
*
@@ -764,6 +764,20 @@ class Dataset[T] private[sql](
}
/**
+ * Explicit cartesian join with another [[DataFrame]].
+ *
+ * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+ *
+ * @param right Right side of the join operation.
+ *
+ * @group untypedrel
+ * @since 2.0.0
+ */
+ def crossJoin(right: Dataset[_]): DataFrame = withPlan {
+ Join(logicalPlan, right.logicalPlan, joinType = Cross, None)
+ }
+
+ /**
* :: Experimental ::
* Joins this Dataset returning a [[Tuple2]] for each pair where `condition` evaluates to
* true.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b4899ad688..c389593b4f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -140,13 +140,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
- case Inner | LeftOuter | LeftSemi | LeftAnti => true
+ case _: InnerLike | LeftOuter | LeftSemi | LeftAnti => true
case j: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
- case Inner | RightOuter => true
+ case _: InnerLike | RightOuter => true
case _ => false
}
@@ -200,7 +200,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
planLater(left), planLater(right), BuildLeft, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
- case logical.Join(left, right, Inner, condition) =>
+ case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
@@ -212,8 +212,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition,
- withinBroadcastThreshold = false) :: Nil
+ planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index 0f24baacd1..0bc261d593 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -79,7 +79,7 @@ case class BroadcastHashJoinExec(
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
joinType match {
- case Inner => codegenInner(ctx, input)
+ case _: InnerLike => codegenInner(ctx, input)
case LeftOuter | RightOuter => codegenOuter(ctx, input)
case LeftSemi => codegenSemi(ctx, input)
case LeftAnti => codegenAnti(ctx, input)
@@ -134,7 +134,7 @@ case class BroadcastHashJoinExec(
ctx.INPUT_ROW = matched
buildPlan.output.zipWithIndex.map { case (a, i) =>
val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx)
- if (joinType == Inner) {
+ if (joinType.isInstanceOf[InnerLike]) {
ev
} else {
// the variables are needed even there is no matched rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
index 6a9965f1a2..43cdce7de8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
@@ -34,8 +34,7 @@ case class BroadcastNestedLoopJoinExec(
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
- condition: Option[Expression],
- withinBroadcastThreshold: Boolean = true) extends BinaryExecNode {
+ condition: Option[Expression]) extends BinaryExecNode {
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@@ -65,7 +64,7 @@ case class BroadcastNestedLoopJoinExec(
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -340,20 +339,11 @@ case class BroadcastNestedLoopJoinExec(
)
}
- protected override def doPrepare(): Unit = {
- if (!withinBroadcastThreshold && !sqlContext.conf.crossJoinEnabled) {
- throw new AnalysisException("Both sides of this join are outside the broadcasting " +
- "threshold and computing it could be prohibitively expensive. To explicitly enable it, " +
- s"please set ${SQLConf.CROSS_JOINS_ENABLED.key} = true")
- }
- super.doPrepare()
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val broadcastedRelation = broadcast.executeBroadcast[Array[InternalRow]]()
val resultRdd = (joinType, buildSide) match {
- case (Inner, _) =>
+ case (_: InnerLike, _) =>
innerJoin(broadcastedRelation)
case (LeftOuter, BuildRight) | (RightOuter, BuildLeft) =>
outerJoin(broadcastedRelation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 57866df90d..15dc9b4066 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -91,15 +91,6 @@ case class CartesianProductExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
- protected override def doPrepare(): Unit = {
- if (!sqlContext.conf.crossJoinEnabled) {
- throw new AnalysisException("Cartesian joins could be prohibitively expensive and are " +
- "disabled by default. To explicitly enable them, please set " +
- s"${SQLConf.CROSS_JOINS_ENABLED.key} = true")
- }
- super.doPrepare()
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index d46a80423f..fb6bfa7b27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -38,7 +38,7 @@ trait HashJoin {
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -225,7 +225,7 @@ trait HashJoin {
numOutputRows: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
- case Inner =>
+ case _: InnerLike =>
innerJoin(streamedIter, hashed)
case LeftOuter | RightOuter =>
outerJoin(streamedIter, hashed)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 5c9c1e6062..b46af2a99a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -45,7 +45,7 @@ case class SortMergeJoinExec(
override def output: Seq[Attribute] = {
joinType match {
- case Inner =>
+ case _: InnerLike =>
left.output ++ right.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
@@ -64,7 +64,8 @@ case class SortMergeJoinExec(
}
override def outputPartitioning: Partitioning = joinType match {
- case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
+ case _: InnerLike =>
+ PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
// For left and right outer joins, the output is partitioned by the streamed input's join keys.
case LeftOuter => left.outputPartitioning
case RightOuter => right.outputPartitioning
@@ -111,7 +112,7 @@ case class SortMergeJoinExec(
val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)
joinType match {
- case Inner =>
+ case _: InnerLike =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] var currentRightMatches: ArrayBuffer[InternalRow] = _
@@ -318,7 +319,7 @@ case class SortMergeJoinExec(
}
override def supportCodegen: Boolean = {
- joinType == Inner
+ joinType.isInstanceOf[InnerLike]
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a54342f82e..1d6ca5a965 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -362,7 +362,8 @@ object SQLConf {
.createWithDefault(true)
val CROSS_JOINS_ENABLED = SQLConfigBuilder("spark.sql.crossJoin.enabled")
- .doc("When false, we will throw an error if a query contains a cross join")
+ .doc("When false, we will throw an error if a query contains a cartesian product without " +
+ "explicit CROSS JOIN syntax.")
.booleanConf
.createWithDefault(false)
@@ -683,8 +684,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
- def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
-
// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
@@ -709,6 +708,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
+
+ override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
new file mode 100644
index 0000000000..aa73124374
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/cross-join.sql
@@ -0,0 +1,35 @@
+-- Cross join detection and error checking is done in JoinSuite since explain output is
+-- used in the error message and the ids are not stable. Only positive cases are checked here.
+
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1);
+
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5)
+ as nt2(k, v2);
+
+-- Cross joins with and without predicates
+SELECT * FROM nt1 cross join nt2;
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k;
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k);
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22;
+
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b;
+
+-- Join reordering
+create temporary view A(a, va) as select * from nt1;
+create temporary view B(b, vb) as select * from nt1;
+create temporary view C(c, vc) as select * from nt1;
+create temporary view D(d, vd) as select * from nt1;
+
+-- Allowed since cross join with C is explicit
+select * from ((A join B on (a = b)) cross join C) join D on (a = d);
+
diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte.sql b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
index 10d34deff4..3914db2691 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/cte.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/cte.sql
@@ -11,4 +11,4 @@ WITH t AS (SELECT 1 FROM t) SELECT * FROM t;
WITH s1 AS (SELECT 1 FROM s2), s2 AS (SELECT 1 FROM s1) SELECT * FROM s1, s2;
-- WITH clause should reference the previous CTE
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2;
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
index f50f1ebad9..cdc6c81e10 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql
@@ -24,6 +24,9 @@ CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1)
CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1);
+-- Set the cross join enabled flag for the LEFT JOIN test since there's no join condition.
+-- Ultimately the join should be optimized away.
+set spark.sql.crossJoin.enabled = true;
SELECT *
FROM (
SELECT
@@ -31,6 +34,6 @@ SELECT
FROM t1
LEFT JOIN t2 ON false
) t where (t.int_col) is not null;
-
+set spark.sql.crossJoin.enabled = false;
diff --git a/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
new file mode 100644
index 0000000000..562e174fc0
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/cross-join.sql.out
@@ -0,0 +1,129 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 12
+
+
+-- !query 0
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5)
+ as nt2(k, v2)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT * FROM nt1 cross join nt2
+-- !query 2 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 2 output
+one 1 one 1
+one 1 one 5
+one 1 two 22
+three 3 one 1
+three 3 one 5
+three 3 two 22
+two 2 one 1
+two 2 one 5
+two 2 two 22
+
+
+-- !query 3
+SELECT * FROM nt1 cross join nt2 where nt1.k = nt2.k
+-- !query 3 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 3 output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query 4
+SELECT * FROM nt1 cross join nt2 on (nt1.k = nt2.k)
+-- !query 4 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 4 output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query 5
+SELECT * FROM nt1 cross join nt2 where nt1.v1 = 1 and nt2.v2 = 22
+-- !query 5 schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query 5 output
+one 1 two 22
+
+
+-- !query 6
+SELECT a.key, b.key FROM
+(SELECT k key FROM nt1 WHERE v1 < 2) a
+CROSS JOIN
+(SELECT k key FROM nt2 WHERE v2 = 22) b
+-- !query 6 schema
+struct<key:string,key:string>
+-- !query 6 output
+one two
+
+
+-- !query 7
+create temporary view A(a, va) as select * from nt1
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+create temporary view B(b, vb) as select * from nt1
+-- !query 8 schema
+struct<>
+-- !query 8 output
+
+
+
+-- !query 9
+create temporary view C(c, vc) as select * from nt1
+-- !query 9 schema
+struct<>
+-- !query 9 output
+
+
+
+-- !query 10
+create temporary view D(d, vd) as select * from nt1
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+select * from ((A join B on (a = b)) cross join C) join D on (a = d)
+-- !query 11 schema
+struct<a:string,va:int,b:string,vb:int,c:string,vc:int,d:string,vd:int>
+-- !query 11 output
+one 1 one 1 one 1 one 1
+one 1 one 1 three 3 one 1
+one 1 one 1 two 2 one 1
+three 3 three 3 one 1 three 3
+three 3 three 3 three 3 three 3
+three 3 three 3 two 2 three 3
+two 2 two 2 one 1 two 2
+two 2 two 2 three 3 two 2
+two 2 two 2 two 2 two 2
diff --git a/sql/core/src/test/resources/sql-tests/results/cte.sql.out b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
index ddee5bf2d4..9fbad8f380 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte.sql.out
@@ -47,7 +47,7 @@ Table or view not found: s2; line 1 pos 26
-- !query 5
-WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1, t2
+WITH t1 AS (SELECT * FROM t2), t2 AS (SELECT 2 FROM t1) SELECT * FROM t1 cross join t2
-- !query 5 schema
struct<id:int,2:int>
-- !query 5 output
diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
index b39fdb0e58..cc50b9444b 100644
--- a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 8
-- !query 0
@@ -59,6 +59,14 @@ struct<>
-- !query 5
+set spark.sql.crossJoin.enabled = true
+-- !query 5 schema
+struct<key:string,value:string>
+-- !query 5 output
+spark.sql.crossJoin.enabled
+
+
+-- !query 6
SELECT *
FROM (
SELECT
@@ -66,7 +74,15 @@ SELECT
FROM t1
LEFT JOIN t2 ON false
) t where (t.int_col) is not null
--- !query 5 schema
+-- !query 6 schema
struct<int_col:int>
--- !query 5 output
+-- !query 6 output
97
+
+
+-- !query 7
+set spark.sql.crossJoin.enabled = false
+-- !query 7 schema
+struct<key:string,value:string>
+-- !query 7 output
+spark.sql.crossJoin.enabled
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 4abf5e42b9..541ffb58e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -104,6 +104,21 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
.collect().toSeq)
}
+ test("join - cross join") {
+ val df1 = Seq((1, "1"), (3, "3")).toDF("int", "str")
+ val df2 = Seq((2, "2"), (4, "4")).toDF("int", "str")
+
+ checkAnswer(
+ df1.crossJoin(df2),
+ Row(1, "1", 2, "2") :: Row(1, "1", 4, "4") ::
+ Row(3, "3", 2, "2") :: Row(3, "3", 4, "4") :: Nil)
+
+ checkAnswer(
+ df2.crossJoin(df1),
+ Row(2, "2", 1, "1") :: Row(2, "2", 3, "3") ::
+ Row(4, "4", 1, "1") :: Row(4, "4", 3, "3") :: Nil)
+ }
+
test("join - using aliases after self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
checkAnswer(
@@ -145,7 +160,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1)
// no join key -- should not be a broadcast join
- val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
+ val plan2 = df1.crossJoin(broadcast(df2)).queryExecution.sparkPlan
assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0)
// planner should not crash without a join
@@ -155,7 +170,7 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
withTempPath { path =>
df1.write.parquet(path.getCanonicalPath)
val pf1 = spark.read.parquet(path.getCanonicalPath)
- assert(df1.join(broadcast(pf1)).count() === 4)
+ assert(df1.crossJoin(broadcast(pf1)).count() === 4)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f89951760f..c2d256bdd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -626,9 +626,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("drop(name: String) search and drop all top level columns that matchs the name") {
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((3, 4)).toDF("a", "b")
- checkAnswer(df1.join(df2), Row(1, 2, 3, 4))
+ checkAnswer(df1.crossJoin(df2), Row(1, 2, 3, 4))
// Finds and drops all columns that match the name (case insensitive).
- checkAnswer(df1.join(df2).drop("A"), Row(2, 4))
+ checkAnswer(df1.crossJoin(df2).drop("A"), Row(2, 4))
}
test("withColumnRenamed") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 8ce6ea66b6..3243f352a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -466,7 +466,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("self join") {
val ds = Seq("1", "2").toDS().as("a")
- val joined = ds.joinWith(ds, lit(true))
+ val joined = ds.joinWith(ds, lit(true), "cross")
checkDataset(joined, ("1", "1"), ("1", "2"), ("2", "1"), ("2", "2"))
}
@@ -486,7 +486,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("Kryo encoder self join") {
implicit val kryoEncoder = Encoders.kryo[KryoData]
val ds = Seq(KryoData(1), KryoData(2)).toDS()
- assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+ assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
Set(
(KryoData(1), KryoData(1)),
(KryoData(1), KryoData(2)),
@@ -514,7 +514,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
test("Java encoder self join") {
implicit val kryoEncoder = Encoders.javaSerialization[JavaData]
val ds = Seq(JavaData(1), JavaData(2)).toDS()
- assert(ds.joinWith(ds, lit(true)).collect().toSet ==
+ assert(ds.joinWith(ds, lit(true), "cross").collect().toSet ==
Set(
(JavaData(1), JavaData(1)),
(JavaData(1), JavaData(2)),
@@ -532,7 +532,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val ds2 = Seq((nullInt, "1"), (new java.lang.Integer(22), "2")).toDS()
checkDataset(
- ds1.joinWith(ds2, lit(true)),
+ ds1.joinWith(ds2, lit(true), "cross"),
((nullInt, "1"), (nullInt, "1")),
((nullInt, "1"), (new java.lang.Integer(22), "2")),
((new java.lang.Integer(22), "2"), (nullInt, "1")),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 44889d92ee..913b2ae976 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -225,8 +225,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(2, 2, 1, null) ::
Row(2, 2, 2, 2) :: Nil)
}
- assert(e.getMessage.contains("Cartesian joins could be prohibitively expensive and are " +
- "disabled by default"))
+ assert(e.getMessage.contains("Detected cartesian product for INNER join " +
+ "between logical plans"))
}
}
@@ -482,7 +482,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
// we set the threshold is greater than statistic of the cached table testData
withSQLConf(
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString()) {
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (sizeInByteOfTestData + 1).toString(),
+ SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
assert(statisticSizeInByte(spark.table("testData2")) >
spark.conf.get(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD))
@@ -573,4 +574,34 @@ class JoinSuite extends QueryTest with SharedSQLContext {
Row(3, 1) ::
Row(3, 2) :: Nil)
}
+
+ test("cross join detection") {
+ testData.createOrReplaceTempView("A")
+ testData.createOrReplaceTempView("B")
+ testData2.createOrReplaceTempView("C")
+ testData3.createOrReplaceTempView("D")
+ upperCaseData.where('N >= 3).createOrReplaceTempView("`right`")
+ val cartesianQueries = Seq(
+ /** The following should error out since there is no explicit cross join */
+ "SELECT * FROM testData inner join testData2",
+ "SELECT * FROM testData left outer join testData2",
+ "SELECT * FROM testData right outer join testData2",
+ "SELECT * FROM testData full outer join testData2",
+ "SELECT * FROM testData, testData2",
+ "SELECT * FROM testData, testData2 where testData.key = 1 and testData2.a = 22",
+ /** The following should fail because after reordering there are cartesian products */
+ "select * from (A join B on (A.key = B.key)) join D on (A.key=D.a) join C",
+ "select * from ((A join B on (A.key = B.key)) join C) join D on (A.key = D.a)",
+ /** Cartesian product involving C, which is not involved in a CROSS join */
+ "select * from ((A join B on (A.key = B.key)) cross join D) join C on (A.key = D.a)");
+
+ def checkCartesianDetection(query: String): Unit = {
+ val e = intercept[Exception] {
+ checkAnswer(sql(query), Nil);
+ }
+ assert(e.getMessage.contains("Detected cartesian product"))
+ }
+
+ cartesianQueries.foreach(checkCartesianDetection)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index d3cfa953a3..afd47897ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -361,7 +361,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
|with
| v0 as (select 0 as key, 1 as value),
| v1 as (select key, count(value) over (partition by key) cnt_val from v0),
- | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+ | v2 as (select v1.key, v1_lag.cnt_val from v1 cross join v1 v1_lag
+ | where v1.key = v1_lag.key)
| select key, cnt_val from v2 order by key limit 1
""".stripMargin), Row(0, 1))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 35dab63672..4408ece112 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -109,8 +109,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
leftPlan: SparkPlan,
rightPlan: SparkPlan,
side: BuildSide) = {
- val shuffledHashJoin =
- joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan)
+ val shuffledHashJoin = joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner,
+ side, None, leftPlan, rightPlan)
val filteredJoin =
boundCondition.map(FilterExec(_, shuffledHashJoin)).getOrElse(shuffledHashJoin)
EnsureRequirements(spark.sessionState.conf).apply(filteredJoin)
@@ -122,8 +122,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
boundCondition: Option[Expression],
leftPlan: SparkPlan,
rightPlan: SparkPlan) = {
- val sortMergeJoin =
- joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan)
+ val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
+ leftPlan, rightPlan)
EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
}
diff --git a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
index 9dd200c3c0..0f033a04ae 100644
--- a/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
+++ b/sql/hive/src/test/resources/sqlgen/join_2_tables.sql
@@ -1,7 +1,7 @@
-- This file is automatically generated by LogicalPlanToSQLSuite.
SELECT COUNT(a.value), b.KEY, a.KEY
-FROM parquet_t1 a, parquet_t1 b
+FROM parquet_t1 a CROSS JOIN parquet_t1 b
GROUP BY a.KEY, b.KEY
HAVING MAX(a.KEY) > 0
--------------------------------------------------------------------------------
-SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 INNER JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
+SELECT `gen_attr_0` AS `count(value)`, `gen_attr_1` AS `KEY`, `gen_attr_2` AS `KEY` FROM (SELECT `gen_attr_0`, `gen_attr_1`, `gen_attr_2` FROM (SELECT count(`gen_attr_4`) AS `gen_attr_0`, `gen_attr_1`, `gen_attr_2`, max(`gen_attr_2`) AS `gen_attr_3` FROM (SELECT `key` AS `gen_attr_2`, `value` AS `gen_attr_4` FROM `default`.`parquet_t1`) AS gen_subquery_0 CROSS JOIN (SELECT `key` AS `gen_attr_1`, `value` AS `gen_attr_5` FROM `default`.`parquet_t1`) AS gen_subquery_1 GROUP BY `gen_attr_2`, `gen_attr_1` HAVING (`gen_attr_3` > CAST(0 AS BIGINT))) AS gen_subquery_2) AS gen_subquery_3
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 9c6da6a628..3e0fdc1f8b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -642,7 +642,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkColumnNames(
"""SELECT x.a, y.a, x.b, y.b
|FROM (SELECT 1 AS a, 2 AS b) x
- |INNER JOIN (SELECT 1 AS a, 2 AS b) y
+ |CROSS JOIN (SELECT 1 AS a, 2 AS b) y
|ON x.a = y.a
""".stripMargin,
"a", "a", "b", "b"
@@ -810,7 +810,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
checkSQL(
"""
|SELECT COUNT(a.value), b.KEY, a.KEY
- |FROM parquet_t1 a, parquet_t1 b
+ |FROM parquet_t1 a CROSS JOIN parquet_t1 b
|GROUP BY a.KEY, b.KEY
|HAVING MAX(a.KEY) > 0
""".stripMargin,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 3c7dbb449c..1d1a958d3f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -318,10 +318,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("trivial join ON clause",
"SELECT * FROM src a JOIN src b ON a.key = b.key")
- createQueryTest("small.cartesian",
- "SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN " +
- "(SELECT key FROM src WHERE key = 2) b")
-
createQueryTest("length.udf",
"SELECT length(\"test\") FROM src LIMIT 1")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e92bbdea75..2f6d9fb96b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -592,9 +592,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
test("self-join") {
val table = spark.table("normal_parquet")
- val selfJoin = table.as("t1").join(table.as("t2"))
+ val selfJoin = table.as("t1").crossJoin(table.as("t2"))
checkAnswer(selfJoin,
- sql("SELECT * FROM normal_parquet x JOIN normal_parquet y"))
+ sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
}
}