aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-14 15:00:27 -0700
committerAndrew Or <andrew@databricks.com>2015-09-14 15:00:27 -0700
commit217e4964444f4e07b894b1bca768a0cbbe799ea0 (patch)
tree5de332f6b3504eaea106900dc3b7edeeb5a5d4f4
parent64f04154e3078ec7340da97e3c2b07cf24e89098 (diff)
downloadspark-217e4964444f4e07b894b1bca768a0cbbe799ea0.tar.gz
spark-217e4964444f4e07b894b1bca768a0cbbe799ea0.tar.bz2
spark-217e4964444f4e07b894b1bca768a0cbbe799ea0.zip
[SPARK-9996] [SPARK-9997] [SQL] Add local expand and NestedLoopJoin operators
This PR is in conflict with #8535 and #8573. Will update this one when they are merged. Author: zsxwing <zsxwing@gmail.com> Closes #8642 from zsxwing/expand-nest-join.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala55
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala156
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala51
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala239
7 files changed, 574 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
new file mode 100644
index 0000000000..2aff156d18
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}
+
+case class ExpandNode(
+ conf: SQLConf,
+ projections: Seq[Seq[Expression]],
+ output: Seq[Attribute],
+ child: LocalNode) extends UnaryLocalNode(conf) {
+
+ assert(projections.size > 0)
+
+ private[this] var result: InternalRow = _
+ private[this] var idx: Int = _
+ private[this] var input: InternalRow = _
+ private[this] var groups: Array[Projection] = _
+
+ override def open(): Unit = {
+ child.open()
+ groups = projections.map(ee => newProjection(ee, child.output)).toArray
+ idx = groups.length
+ }
+
+ override def next(): Boolean = {
+ if (idx >= groups.length) {
+ if (child.next()) {
+ input = child.fetch()
+ idx = 0
+ } else {
+ return false
+ }
+ }
+ result = groups(idx)(input)
+ idx += 1
+ true
+ }
+
+ override def fetch(): InternalRow = result
+
+ override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index e540ef8555..9840080e16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{SQLConf, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType
@@ -69,6 +69,18 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
*/
def close(): Unit
+ /** Specifies whether this operator outputs UnsafeRows */
+ def outputsUnsafeRows: Boolean = false
+
+ /** Specifies whether this operator is capable of processing UnsafeRows */
+ def canProcessUnsafeRows: Boolean = false
+
+ /**
+ * Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
+ * that are not UnsafeRows).
+ */
+ def canProcessSafeRows: Boolean = true
+
/**
* Returns the content through the [[Iterator]] interface.
*/
@@ -91,6 +103,28 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
result
}
+ protected def newProjection(
+ expressions: Seq[Expression],
+ inputSchema: Seq[Attribute]): Projection = {
+ log.debug(
+ s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
+ if (codegenEnabled) {
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate projection, fallback to interpret", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+ } else {
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
+
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
@@ -113,6 +147,25 @@ abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
}
}
+ protected def newPredicate(
+ expression: Expression,
+ inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
+ if (codegenEnabled) {
+ try {
+ GeneratePredicate.generate(expression, inputSchema)
+ } catch {
+ case NonFatal(e) =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate predicate, fallback to interpreted", e)
+ InterpretedPredicate.create(expression, inputSchema)
+ }
+ }
+ } else {
+ InterpretedPredicate.create(expression, inputSchema)
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
new file mode 100644
index 0000000000..7321fc66b4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, JoinType}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+
+case class NestedLoopJoinNode(
+ conf: SQLConf,
+ left: LocalNode,
+ right: LocalNode,
+ buildSide: BuildSide,
+ joinType: JoinType,
+ condition: Option[Expression]) extends BinaryLocalNode(conf) {
+
+ override def output: Seq[Attribute] = {
+ joinType match {
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+ case x =>
+ throw new IllegalArgumentException(
+ s"NestedLoopJoin should not take $x as the JoinType")
+ }
+ }
+
+ private[this] def genResultProjection: InternalRow => InternalRow = {
+ if (outputsUnsafeRows) {
+ UnsafeProjection.create(schema)
+ } else {
+ identity[InternalRow]
+ }
+ }
+
+ private[this] var currentRow: InternalRow = _
+
+ private[this] var iterator: Iterator[InternalRow] = _
+
+ override def open(): Unit = {
+ val (streamed, build) = buildSide match {
+ case BuildRight => (left, right)
+ case BuildLeft => (right, left)
+ }
+ build.open()
+ val buildRelation = new CompactBuffer[InternalRow]
+ while (build.next()) {
+ buildRelation += build.fetch().copy()
+ }
+ build.close()
+
+ val boundCondition =
+ newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
+
+ val leftNulls = new GenericMutableRow(left.output.size)
+ val rightNulls = new GenericMutableRow(right.output.size)
+ val joinedRow = new JoinedRow
+ val matchedBuildTuples = new BitSet(buildRelation.size)
+ val resultProj = genResultProjection
+ streamed.open()
+
+ // streamedRowMatches also contains null rows if using outer join
+ val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
+ val matchedRows = new CompactBuffer[InternalRow]
+
+ var i = 0
+ var streamRowMatched = false
+
+ // Scan the build relation to look for matches for each streamed row
+ while (i < buildRelation.size) {
+ val buildRow = buildRelation(i)
+ buildSide match {
+ case BuildRight => joinedRow(streamedRow, buildRow)
+ case BuildLeft => joinedRow(buildRow, streamedRow)
+ }
+ if (boundCondition(joinedRow)) {
+ matchedRows += resultProj(joinedRow).copy()
+ streamRowMatched = true
+ matchedBuildTuples.set(i)
+ }
+ i += 1
+ }
+
+ // If this row had no matches and we're using outer join, join it with the null rows
+ if (!streamRowMatched) {
+ (joinType, buildSide) match {
+ case (LeftOuter | FullOuter, BuildRight) =>
+ matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
+ case (RightOuter | FullOuter, BuildLeft) =>
+ matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
+ case _ =>
+ }
+ }
+
+ matchedRows.iterator
+ }
+
+ // If we're using outer join, find rows on the build side that didn't match anything
+ // and join them with the null row
+ lazy val unmatchedBuildRows: Iterator[InternalRow] = {
+ var i = 0
+ buildRelation.filter { row =>
+ val r = !matchedBuildTuples.get(i)
+ i += 1
+ r
+ }.iterator
+ }
+ iterator = (joinType, buildSide) match {
+ case (RightOuter | FullOuter, BuildRight) =>
+ streamedRowMatches ++
+ unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
+ case (LeftOuter | FullOuter, BuildLeft) =>
+ streamedRowMatches ++
+ unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
+ case _ => streamedRowMatches
+ }
+ }
+
+ override def next(): Boolean = {
+ if (iterator.hasNext) {
+ currentRow = iterator.next()
+ true
+ } else {
+ false
+ }
+ }
+
+ override def fetch(): InternalRow = currentRow
+
+ override def close(): Unit = {
+ left.close()
+ right.close()
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
new file mode 100644
index 0000000000..cfa7f3f6dc
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+class ExpandNodeSuite extends LocalNodeTest {
+
+ import testImplicits._
+
+ test("expand") {
+ val input = Seq((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)).toDF("key", "value")
+ checkAnswer(
+ input,
+ node =>
+ ExpandNode(conf, Seq(
+ Seq(
+ input.col("key") + input.col("value"), input.col("key") - input.col("value")
+ ).map(_.expr),
+ Seq(
+ input.col("key") * input.col("value"), input.col("key") / input.col("value")
+ ).map(_.expr)
+ ), node.output, node),
+ Seq(
+ (2, 0),
+ (1, 1),
+ (4, 0),
+ (4, 1),
+ (6, 0),
+ (9, 1),
+ (8, 0),
+ (16, 1),
+ (10, 0),
+ (25, 1)
+ ).toDF().collect()
+ )
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
index 43b6f06aea..78d891351f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
@@ -24,20 +24,6 @@ class HashJoinNodeSuite extends LocalNodeTest {
import testImplicits._
- private def wrapForUnsafe(
- f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
- if (conf.unsafeEnabled) {
- (left: LocalNode, right: LocalNode) => {
- val _left = ConvertToUnsafeNode(conf, left)
- val _right = ConvertToUnsafeNode(conf, right)
- val r = f(_left, _right)
- ConvertToSafeNode(conf, r)
- }
- } else {
- f
- }
- }
-
def joinSuite(suiteName: String, confPairs: (String, String)*): Unit = {
test(s"$suiteName: inner join with one match per row") {
withSQLConf(confPairs: _*) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
index b95d4ea7f8..86dd28064c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
@@ -27,6 +27,20 @@ class LocalNodeTest extends SparkFunSuite with SharedSQLContext {
def conf: SQLConf = sqlContext.conf
+ protected def wrapForUnsafe(
+ f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
+ if (conf.unsafeEnabled) {
+ (left: LocalNode, right: LocalNode) => {
+ val _left = ConvertToUnsafeNode(conf, left)
+ val _right = ConvertToUnsafeNode(conf, right)
+ val r = f(_left, _right)
+ ConvertToSafeNode(conf, r)
+ }
+ } else {
+ f
+ }
+ }
+
/**
* Runs the LocalNode and makes sure the answer matches the expected result.
* @param input the input data to be used.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
new file mode 100644
index 0000000000..b1ef26ba82
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
@@ -0,0 +1,239 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+
+class NestedLoopJoinNodeSuite extends LocalNodeTest {
+
+ import testImplicits._
+
+ private def joinSuite(
+ suiteName: String, buildSide: BuildSide, confPairs: (String, String)*): Unit = {
+ test(s"$suiteName: left outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some((upperCaseData.col("N") === lowerCaseData.col("n")).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N", "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left").collect())
+
+ checkAnswer2(
+ upperCaseData,
+ lowerCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ LeftOuter,
+ Some(
+ (upperCaseData.col("N") === lowerCaseData.col("n") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left").collect())
+ }
+ }
+
+ test(s"$suiteName: right outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N", "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "right").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ RightOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "right").collect())
+ }
+ }
+
+ test(s"$suiteName: full outer join") {
+ withSQLConf(confPairs: _*) {
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N", "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("n") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ upperCaseData.col("N") > 1).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, "full").collect())
+
+ checkAnswer2(
+ lowerCaseData,
+ upperCaseData,
+ wrapForUnsafe(
+ (node1, node2) => NestedLoopJoinNode(
+ conf,
+ node1,
+ node2,
+ buildSide,
+ FullOuter,
+ Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+ lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+ ),
+ lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", "full").collect())
+ }
+ }
+ }
+
+ joinSuite(
+ "general-build-left",
+ BuildLeft,
+ SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> "false")
+ joinSuite(
+ "general-build-right",
+ BuildRight,
+ SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> "false")
+ joinSuite(
+ "tungsten-build-left",
+ BuildLeft,
+ SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> "true")
+ joinSuite(
+ "tungsten-build-right",
+ BuildRight,
+ SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> "true")
+}