aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala71
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala63
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala157
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala152
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala74
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala68
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala45
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala141
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala87
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala142
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala51
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala50
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala55
30 files changed, 0 insertions, 2060 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 99f8841c87..6235897ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkSqlSerializer
-import org.apache.spark.sql.execution.local.LocalNode
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator, Utils}
@@ -157,10 +156,6 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
- def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
- apply(localNode.asIterator, keyGenerator)
- }
-
def apply(
input: Iterator[InternalRow],
keyGenerator: Projection,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
deleted file mode 100644
index 97f9358016..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A [[HashJoinNode]] that builds the [[HashedRelation]] according to the value of
- * `buildSide`. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BinaryHashJoinNode(
- conf: SQLConf,
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- buildSide: BuildSide,
- left: LocalNode,
- right: LocalNode)
- extends BinaryLocalNode(conf) with HashJoinNode {
-
- protected override val (streamedNode, streamedKeys) = buildSide match {
- case BuildLeft => (right, rightKeys)
- case BuildRight => (left, leftKeys)
- }
-
- private val (buildNode, buildKeys) = buildSide match {
- case BuildLeft => (left, leftKeys)
- case BuildRight => (right, rightKeys)
- }
-
- override def output: Seq[Attribute] = left.output ++ right.output
-
- private def buildSideKeyGenerator: Projection = {
- // We are expecting the data types of buildKeys and streamedKeys are the same.
- assert(buildKeys.map(_.dataType) == streamedKeys.map(_.dataType))
- UnsafeProjection.create(buildKeys, buildNode.output)
- }
-
- protected override def doOpen(): Unit = {
- buildNode.open()
- val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
- // We have built the HashedRelation. So, close buildNode.
- buildNode.close()
-
- streamedNode.open()
- // Set the HashedRelation used by the HashJoinNode.
- withHashedRelation(hashedRelation)
- }
-
- override def close(): Unit = {
- // Please note that we do not need to call the close method of our buildNode because
- // it has been called in this.open.
- streamedNode.close()
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
deleted file mode 100644
index 779f4833fa..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A [[HashJoinNode]] for broadcast join. It takes a streamedNode and a broadcast
- * [[HashedRelation]]. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BroadcastHashJoinNode(
- conf: SQLConf,
- streamedKeys: Seq[Expression],
- streamedNode: LocalNode,
- buildSide: BuildSide,
- buildOutput: Seq[Attribute],
- hashedRelation: Broadcast[HashedRelation])
- extends UnaryLocalNode(conf) with HashJoinNode {
-
- override val child = streamedNode
-
- // Because we do not pass in the buildNode, we take the output of buildNode to
- // create the inputSet properly.
- override def inputSet: AttributeSet = AttributeSet(child.output ++ buildOutput)
-
- override def output: Seq[Attribute] = buildSide match {
- case BuildRight => streamedNode.output ++ buildOutput
- case BuildLeft => buildOutput ++ streamedNode.output
- }
-
- protected override def doOpen(): Unit = {
- streamedNode.open()
- // Set the HashedRelation used by the HashJoinNode.
- withHashedRelation(hashedRelation.value)
- }
-
- override def close(): Unit = {
- streamedNode.close()
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
deleted file mode 100644
index f79d795a90..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}
-import org.apache.spark.sql.internal.SQLConf
-
-case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var convertToSafe: Projection = _
-
- override def open(): Unit = {
- child.open()
- convertToSafe = FromUnsafeProjection(child.schema)
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = convertToSafe(child.fetch())
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
deleted file mode 100644
index f3fa474b0f..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-
-case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var convertToUnsafe: Projection = _
-
- override def open(): Unit = {
- child.open()
- convertToUnsafe = UnsafeProjection.create(child.schema)
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = convertToUnsafe(child.fetch())
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
deleted file mode 100644
index 6ccd6db0e6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.internal.SQLConf
-
-case class ExpandNode(
- conf: SQLConf,
- projections: Seq[Seq[Expression]],
- output: Seq[Attribute],
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- assert(projections.size > 0)
-
- private[this] var result: InternalRow = _
- private[this] var idx: Int = _
- private[this] var input: InternalRow = _
- private[this] var groups: Array[Projection] = _
-
- override def open(): Unit = {
- child.open()
- groups = projections.map(ee => newMutableProjection(ee, child.output)()).toArray
- idx = groups.length
- }
-
- override def next(): Boolean = {
- if (idx >= groups.length) {
- if (child.next()) {
- input = child.fetch()
- idx = 0
- } else {
- return false
- }
- }
- result = groups(idx)(input)
- idx += 1
- true
- }
-
- override def fetch(): InternalRow = result
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
deleted file mode 100644
index c5eb33cef4..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
- extends UnaryLocalNode(conf) {
-
- private[this] var predicate: (InternalRow) => Boolean = _
-
- override def output: Seq[Attribute] = child.output
-
- override def open(): Unit = {
- child.open()
- predicate = GeneratePredicate.generate(condition, child.output)
- }
-
- override def next(): Boolean = {
- var found = false
- while (!found && child.next()) {
- found = predicate.apply(child.fetch())
- }
- found
- }
-
- override def fetch(): InternalRow = child.fetch()
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
deleted file mode 100644
index fd7948ffa9..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins._
-
-/**
- * An abstract node for sharing common functionality among different implementations of
- * inner hash equi-join, notably [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]].
- *
- * Much of this code is similar to [[org.apache.spark.sql.execution.joins.HashJoin]].
- */
-trait HashJoinNode {
-
- self: LocalNode =>
-
- protected def streamedKeys: Seq[Expression]
- protected def streamedNode: LocalNode
- protected def buildSide: BuildSide
-
- private[this] var currentStreamedRow: InternalRow = _
- private[this] var currentHashMatches: Seq[InternalRow] = _
- private[this] var currentMatchPosition: Int = -1
-
- private[this] var joinRow: JoinedRow = _
- private[this] var resultProjection: (InternalRow) => InternalRow = _
-
- private[this] var hashed: HashedRelation = _
- private[this] var joinKeys: Projection = _
-
- private def streamSideKeyGenerator: Projection =
- UnsafeProjection.create(streamedKeys, streamedNode.output)
-
- /**
- * Sets the HashedRelation used by this node. This method needs to be called after
- * before the first `next` gets called.
- */
- protected def withHashedRelation(hashedRelation: HashedRelation): Unit = {
- hashed = hashedRelation
- }
-
- /**
- * Custom open implementation to be overridden by subclasses.
- */
- protected def doOpen(): Unit
-
- override def open(): Unit = {
- doOpen()
- joinRow = new JoinedRow
- resultProjection = UnsafeProjection.create(schema)
- joinKeys = streamSideKeyGenerator
- }
-
- override def next(): Boolean = {
- currentMatchPosition += 1
- if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
- fetchNextMatch()
- } else {
- true
- }
- }
-
- /**
- * Populate `currentHashMatches` with build-side rows matching the next streamed row.
- * @return whether matches are found such that subsequent calls to `fetch` are valid.
- */
- private def fetchNextMatch(): Boolean = {
- currentHashMatches = null
- currentMatchPosition = -1
-
- while (currentHashMatches == null && streamedNode.next()) {
- currentStreamedRow = streamedNode.fetch()
- val key = joinKeys(currentStreamedRow)
- if (!key.anyNull) {
- currentHashMatches = hashed.get(key)
- }
- }
-
- if (currentHashMatches == null) {
- false
- } else {
- currentMatchPosition = 0
- true
- }
- }
-
- override def fetch(): InternalRow = {
- val ret = buildSide match {
- case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
- case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
- }
- resultProjection(ret)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
deleted file mode 100644
index e594e132de..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.internal.SQLConf
-
-case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
- extends BinaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = left.output
-
- private[this] var leftRows: mutable.HashSet[InternalRow] = _
-
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- left.open()
- leftRows = mutable.HashSet[InternalRow]()
- while (left.next()) {
- leftRows += left.fetch().copy()
- }
- left.close()
- right.open()
- }
-
- override def next(): Boolean = {
- currentRow = null
- while (currentRow == null && right.next()) {
- currentRow = right.fetch()
- if (!leftRows.contains(currentRow)) {
- currentRow = null
- }
- }
- currentRow != null
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- left.close()
- right.close()
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
deleted file mode 100644
index 9af45ac0aa..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {
-
- private[this] var count = 0
-
- override def output: Seq[Attribute] = child.output
-
- override def open(): Unit = child.open()
-
- override def close(): Unit = child.close()
-
- override def fetch(): InternalRow = child.fetch()
-
- override def next(): Boolean = {
- if (count < limit) {
- count += 1
- child.next()
- } else {
- false
- }
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
deleted file mode 100644
index a5d09691dc..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-
-/**
- * A local physical operator, in the form of an iterator.
- *
- * Before consuming the iterator, open function must be called.
- * After consuming the iterator, close function must be called.
- */
-abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
-
- private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
-
- /**
- * Called before open(). Prepare can be used to reserve memory needed. It must NOT consume
- * any input data.
- *
- * Implementations of this must also call the `prepare()` function of its children.
- */
- def prepare(): Unit = children.foreach(_.prepare())
-
- /**
- * Initializes the iterator state. Must be called before calling `next()`.
- *
- * Implementations of this must also call the `open()` function of its children.
- */
- def open(): Unit
-
- /**
- * Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
- */
- def next(): Boolean
-
- /**
- * Returns the current tuple.
- */
- def fetch(): InternalRow
-
- /**
- * Closes the iterator and releases all resources. It should be idempotent.
- *
- * Implementations of this must also call the `close()` function of its children.
- */
- def close(): Unit
-
- /**
- * Returns the content through the [[Iterator]] interface.
- */
- final def asIterator: Iterator[InternalRow] = new LocalNodeIterator(this)
-
- /**
- * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
- */
- final def collect(): Seq[Row] = {
- val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
- val result = new scala.collection.mutable.ArrayBuffer[Row]
- open()
- try {
- while (next()) {
- result += converter.apply(fetch()).asInstanceOf[Row]
- }
- } finally {
- close()
- }
- result
- }
-
- protected def newMutableProjection(
- expressions: Seq[Expression],
- inputSchema: Seq[Attribute]): () => MutableProjection = {
- log.debug(
- s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- GenerateMutableProjection.generate(expressions, inputSchema)
- }
-
- protected def newPredicate(
- expression: Expression,
- inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- GeneratePredicate.generate(expression, inputSchema)
- }
-}
-
-
-abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
- override def children: Seq[LocalNode] = Seq.empty
-}
-
-
-abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-
- def child: LocalNode
-
- override def children: Seq[LocalNode] = Seq(child)
-}
-
-abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-
- def left: LocalNode
-
- def right: LocalNode
-
- override def children: Seq[LocalNode] = Seq(left, right)
-}
-
-/**
- * An thin wrapper around a [[LocalNode]] that provides an `Iterator` interface.
- */
-private class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
- private var nextRow: InternalRow = _
-
- override def hasNext: Boolean = {
- if (nextRow == null) {
- val res = localNode.next()
- if (res) {
- nextRow = localNode.fetch()
- }
- res
- } else {
- true
- }
- }
-
- override def next(): InternalRow = {
- if (hasNext) {
- val res = nextRow
- nextRow = null
- res
- } else {
- throw new NoSuchElementException
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
deleted file mode 100644
index b5ea08325c..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
-
-case class NestedLoopJoinNode(
- conf: SQLConf,
- left: LocalNode,
- right: LocalNode,
- buildSide: BuildSide,
- joinType: JoinType,
- condition: Option[Expression]) extends BinaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = {
- joinType match {
- case LeftOuter =>
- left.output ++ right.output.map(_.withNullability(true))
- case RightOuter =>
- left.output.map(_.withNullability(true)) ++ right.output
- case FullOuter =>
- left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
- case x =>
- throw new IllegalArgumentException(
- s"NestedLoopJoin should not take $x as the JoinType")
- }
- }
-
- private[this] def genResultProjection: InternalRow => InternalRow = {
- UnsafeProjection.create(schema)
- }
-
- private[this] var currentRow: InternalRow = _
-
- private[this] var iterator: Iterator[InternalRow] = _
-
- override def open(): Unit = {
- val (streamed, build) = buildSide match {
- case BuildRight => (left, right)
- case BuildLeft => (right, left)
- }
- build.open()
- val buildRelation = new CompactBuffer[InternalRow]
- while (build.next()) {
- buildRelation += build.fetch().copy()
- }
- build.close()
-
- val boundCondition =
- newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
-
- val leftNulls = new GenericMutableRow(left.output.size)
- val rightNulls = new GenericMutableRow(right.output.size)
- val joinedRow = new JoinedRow
- val matchedBuildTuples = new BitSet(buildRelation.size)
- val resultProj = genResultProjection
- streamed.open()
-
- // streamedRowMatches also contains null rows if using outer join
- val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
- val matchedRows = new CompactBuffer[InternalRow]
-
- var i = 0
- var streamRowMatched = false
-
- // Scan the build relation to look for matches for each streamed row
- while (i < buildRelation.size) {
- val buildRow = buildRelation(i)
- buildSide match {
- case BuildRight => joinedRow(streamedRow, buildRow)
- case BuildLeft => joinedRow(buildRow, streamedRow)
- }
- if (boundCondition(joinedRow)) {
- matchedRows += resultProj(joinedRow).copy()
- streamRowMatched = true
- matchedBuildTuples.set(i)
- }
- i += 1
- }
-
- // If this row had no matches and we're using outer join, join it with the null rows
- if (!streamRowMatched) {
- (joinType, buildSide) match {
- case (LeftOuter | FullOuter, BuildRight) =>
- matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
- case (RightOuter | FullOuter, BuildLeft) =>
- matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
- case _ =>
- }
- }
-
- matchedRows.iterator
- }
-
- // If we're using outer join, find rows on the build side that didn't match anything
- // and join them with the null row
- lazy val unmatchedBuildRows: Iterator[InternalRow] = {
- var i = 0
- buildRelation.filter { row =>
- val r = !matchedBuildTuples.get(i)
- i += 1
- r
- }.iterator
- }
- iterator = (joinType, buildSide) match {
- case (RightOuter | FullOuter, BuildRight) =>
- streamedRowMatches ++
- unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
- case (LeftOuter | FullOuter, BuildLeft) =>
- streamedRowMatches ++
- unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
- case _ => streamedRowMatches
- }
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- left.close()
- right.close()
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
deleted file mode 100644
index 5fe068a13c..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
- extends UnaryLocalNode(conf) {
-
- private[this] var project: UnsafeProjection = _
-
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-
- override def open(): Unit = {
- project = UnsafeProjection.create(projectList, child.output)
- child.open()
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = {
- project.apply(child.fetch())
- }
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
deleted file mode 100644
index 078fb50deb..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-
-/**
- * Sample the dataset.
- *
- * @param conf the SQLConf
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- * will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the LocalNode
- */
-case class SampleNode(
- conf: SQLConf,
- lowerBound: Double,
- upperBound: Double,
- withReplacement: Boolean,
- seed: Long,
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var iterator: Iterator[InternalRow] = _
-
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- child.open()
- val sampler =
- if (withReplacement) {
- // Disable gap sampling since the gap sampling method buffers two rows internally,
- // requiring us to copy the row, which is more expensive than the random number generator.
- new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false)
- } else {
- new BernoulliCellSampler[InternalRow](lowerBound, upperBound)
- }
- sampler.setSeed(seed)
- iterator = sampler.sample(child.asIterator)
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = child.close()
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
deleted file mode 100644
index 8ebfe3a68b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * An operator that scans some local data collection in the form of Scala Seq.
- */
-case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
- extends LeafLocalNode(conf) {
-
- private[this] var iterator: Iterator[InternalRow] = _
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- iterator = data.iterator
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- // Do nothing
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
deleted file mode 100644
index f52f5f7bb5..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.BoundedPriorityQueue
-
-case class TakeOrderedAndProjectNode(
- conf: SQLConf,
- limit: Int,
- sortOrder: Seq[SortOrder],
- projectList: Option[Seq[NamedExpression]],
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- private[this] var projection: Option[Projection] = _
- private[this] var ord: Ordering[InternalRow] = _
- private[this] var iterator: Iterator[InternalRow] = _
- private[this] var currentRow: InternalRow = _
-
- override def output: Seq[Attribute] = {
- val projectOutput = projectList.map(_.map(_.toAttribute))
- projectOutput.getOrElse(child.output)
- }
-
- override def open(): Unit = {
- child.open()
- projection = projectList.map(UnsafeProjection.create(_, child.output))
- ord = GenerateOrdering.generate(sortOrder, child.output)
- // Priority keeps the largest elements, so let's reverse the ordering.
- val queue = new BoundedPriorityQueue[InternalRow](limit)(ord.reverse)
- while (child.next()) {
- queue += child.fetch()
- }
- // Close it eagerly since we don't need it.
- child.close()
- iterator = queue.toArray.sorted(ord).iterator
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- val _currentRow = iterator.next()
- currentRow = projection match {
- case Some(p) => p(_currentRow)
- case None => _currentRow
- }
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = child.close()
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
deleted file mode 100644
index e53bc220d8..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf) {
-
- override def output: Seq[Attribute] = children.head.output
-
- private[this] var currentChild: LocalNode = _
-
- private[this] var nextChildIndex: Int = _
-
- override def open(): Unit = {
- currentChild = children.head
- currentChild.open()
- nextChildIndex = 1
- }
-
- private def advanceToNextChild(): Boolean = {
- var found = false
- var exit = false
- while (!exit && !found) {
- if (currentChild != null) {
- currentChild.close()
- }
- if (nextChildIndex >= children.size) {
- found = false
- exit = true
- } else {
- currentChild = children(nextChildIndex)
- nextChildIndex += 1
- currentChild.open()
- found = currentChild.next()
- }
- }
- found
- }
-
- override def close(): Unit = {
- if (currentChild != null) {
- currentChild.close()
- }
- }
-
- override def fetch(): InternalRow = currentChild.fetch()
-
- override def next(): Boolean = {
- if (currentChild.next()) {
- true
- } else {
- advanceToNextChild()
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
deleted file mode 100644
index cd9277d3bc..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A dummy [[LocalNode]] that just returns rows from a [[LocalRelation]].
- */
-private[local] case class DummyNode(
- output: Seq[Attribute],
- relation: LocalRelation,
- conf: SQLConf)
- extends LocalNode(conf) {
-
- import DummyNode._
-
- private var index: Int = CLOSED
- private val input: Seq[InternalRow] = relation.data
-
- def this(output: Seq[Attribute], data: Seq[Product], conf: SQLConf = new SQLConf) {
- this(output, LocalRelation.fromProduct(output, data), conf)
- }
-
- def isOpen: Boolean = index != CLOSED
-
- override def children: Seq[LocalNode] = Seq.empty
-
- override def open(): Unit = {
- index = -1
- }
-
- override def next(): Boolean = {
- index += 1
- index < input.size
- }
-
- override def fetch(): InternalRow = {
- assert(index >= 0 && index < input.size)
- input(index)
- }
-
- override def close(): Unit = {
- index = CLOSED
- }
-}
-
-private object DummyNode {
- val CLOSED: Int = Int.MinValue
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
deleted file mode 100644
index bbd94d8da2..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-
-
-class ExpandNodeSuite extends LocalNodeTest {
-
- private def testExpand(inputData: Array[(Int, Int)] = Array.empty): Unit = {
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val projections = Seq(Seq('k + 'v, 'k - 'v), Seq('k * 'v, 'k / 'v))
- val expandNode = new ExpandNode(conf, projections, inputNode.output, inputNode)
- val resolvedNode = resolveExpressions(expandNode)
- val expectedOutput = {
- val firstHalf = inputData.map { case (k, v) => (k + v, k - v) }
- val secondHalf = inputData.map { case (k, v) => (k * v, k / v) }
- firstHalf ++ secondHalf
- }
- val actualOutput = resolvedNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput.toSet === expectedOutput.toSet)
- }
-
- test("empty") {
- testExpand()
- }
-
- test("basic") {
- testExpand((1 to 100).map { i => (i, i * 1000) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
deleted file mode 100644
index 4eadce646d..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-
-
-class FilterNodeSuite extends LocalNodeTest {
-
- private def testFilter(inputData: Array[(Int, Int)] = Array.empty): Unit = {
- val cond = 'k % 2 === 0
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val filterNode = new FilterNode(conf, cond, inputNode)
- val resolvedNode = resolveExpressions(filterNode)
- val expectedOutput = inputData.filter { case (k, _) => k % 2 == 0 }
- val actualOutput = resolvedNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testFilter()
- }
-
- test("basic") {
- testFilter((1 to 100).map { i => (i, i) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
deleted file mode 100644
index 74142ea598..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.mockito.Mockito.{mock, when}
-
-import org.apache.spark.broadcast.TorrentBroadcast
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeProjection}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-class HashJoinNodeSuite extends LocalNodeTest {
-
- // Test all combinations of the two dimensions: with/out unsafe and build sides
- private val buildSides = Seq(BuildLeft, BuildRight)
- buildSides.foreach { buildSide =>
- testJoin(buildSide)
- }
-
- /**
- * Builds a [[HashedRelation]] based on a resolved `buildKeys`
- * and a resolved `buildNode`.
- */
- private def buildHashedRelation(
- conf: SQLConf,
- buildKeys: Seq[Expression],
- buildNode: LocalNode): HashedRelation = {
-
- val buildSideKeyGenerator = UnsafeProjection.create(buildKeys, buildNode.output)
- buildNode.prepare()
- buildNode.open()
- val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
- buildNode.close()
-
- hashedRelation
- }
-
- /**
- * Test inner hash join with varying degrees of matches.
- */
- private def testJoin(buildSide: BuildSide): Unit = {
- val testNamePrefix = buildSide
- val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
- val conf = new SQLConf
-
- // Actual test body
- def runTest(leftInput: Array[(Int, String)], rightInput: Array[(Int, String)]): Unit = {
- val rightInputMap = rightInput.toMap
- val leftNode = new DummyNode(joinNameAttributes, leftInput)
- val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
- val makeBinaryHashJoinNode = (node1: LocalNode, node2: LocalNode) => {
- val binaryHashJoinNode =
- BinaryHashJoinNode(conf, Seq('id1), Seq('id2), buildSide, node1, node2)
- resolveExpressions(binaryHashJoinNode)
- }
- val makeBroadcastJoinNode = (node1: LocalNode, node2: LocalNode) => {
- val leftKeys = Seq('id1.attr)
- val rightKeys = Seq('id2.attr)
- // Figure out the build side and stream side.
- val (buildNode, buildKeys, streamedNode, streamedKeys) = buildSide match {
- case BuildLeft => (node1, leftKeys, node2, rightKeys)
- case BuildRight => (node2, rightKeys, node1, leftKeys)
- }
- // Resolve the expressions of the build side and then create a HashedRelation.
- val resolvedBuildNode = resolveExpressions(buildNode)
- val resolvedBuildKeys = resolveExpressions(buildKeys, resolvedBuildNode)
- val hashedRelation = buildHashedRelation(conf, resolvedBuildKeys, resolvedBuildNode)
- val broadcastHashedRelation = mock(classOf[TorrentBroadcast[HashedRelation]])
- when(broadcastHashedRelation.value).thenReturn(hashedRelation)
-
- val hashJoinNode =
- BroadcastHashJoinNode(
- conf,
- streamedKeys,
- streamedNode,
- buildSide,
- resolvedBuildNode.output,
- broadcastHashedRelation)
- resolveExpressions(hashJoinNode)
- }
-
- val expectedOutput = leftInput
- .filter { case (k, _) => rightInputMap.contains(k) }
- .map { case (k, v) => (k, v, k, rightInputMap(k)) }
-
- Seq(makeBinaryHashJoinNode, makeBroadcastJoinNode).foreach { makeNode =>
- val makeUnsafeNode = wrapForUnsafe(makeNode)
- val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
-
- val actualOutput = hashJoinNode.collect().map { row =>
- // (id, name, id, nickname)
- (row.getInt(0), row.getString(1), row.getInt(2), row.getString(3))
- }
- assert(actualOutput === expectedOutput)
- }
- }
-
- test(s"$testNamePrefix: empty") {
- runTest(Array.empty, Array.empty)
- runTest(someData, Array.empty)
- runTest(Array.empty, someData)
- }
-
- test(s"$testNamePrefix: no matches") {
- val someIrrelevantData = (10000 to 100100).map { i => (i, "piper" + i) }.toArray
- runTest(someData, Array.empty)
- runTest(Array.empty, someData)
- runTest(someData, someIrrelevantData)
- runTest(someIrrelevantData, someData)
- }
-
- test(s"$testNamePrefix: partial matches") {
- val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
- runTest(someData, someOtherData)
- runTest(someOtherData, someData)
- }
-
- test(s"$testNamePrefix: full matches") {
- val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }.toArray
- runTest(someData, someSuperRelevantData)
- runTest(someSuperRelevantData, someData)
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
deleted file mode 100644
index c0ad2021b2..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class IntersectNodeSuite extends LocalNodeTest {
-
- test("basic") {
- val n = 100
- val leftData = (1 to n).filter { i => i % 2 == 0 }.map { i => (i, i) }.toArray
- val rightData = (1 to n).filter { i => i % 3 == 0 }.map { i => (i, i) }.toArray
- val leftNode = new DummyNode(kvIntAttributes, leftData)
- val rightNode = new DummyNode(kvIntAttributes, rightData)
- val intersectNode = new IntersectNode(conf, leftNode, rightNode)
- val expectedOutput = leftData.intersect(rightData)
- val actualOutput = intersectNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
deleted file mode 100644
index fb790636a3..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class LimitNodeSuite extends LocalNodeTest {
-
- private def testLimit(inputData: Array[(Int, Int)] = Array.empty, limit: Int = 10): Unit = {
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val limitNode = new LimitNode(conf, limit, inputNode)
- val expectedOutput = inputData.take(limit)
- val actualOutput = limitNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testLimit()
- }
-
- test("basic") {
- testLimit((1 to 100).map { i => (i, i) }.toArray, 20)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
deleted file mode 100644
index 0d1ed99eec..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class LocalNodeSuite extends LocalNodeTest {
- private val data = (1 to 100).map { i => (i, i) }.toArray
-
- test("basic open, next, fetch, close") {
- val node = new DummyNode(kvIntAttributes, data)
- assert(!node.isOpen)
- node.open()
- assert(node.isOpen)
- data.foreach { case (k, v) =>
- assert(node.next())
- // fetch should be idempotent
- val fetched = node.fetch()
- assert(node.fetch() === fetched)
- assert(node.fetch() === fetched)
- assert(node.fetch().numFields === 2)
- assert(node.fetch().getInt(0) === k)
- assert(node.fetch().getInt(1) === v)
- }
- assert(!node.next())
- node.close()
- assert(!node.isOpen)
- }
-
- test("asIterator") {
- val node = new DummyNode(kvIntAttributes, data)
- val iter = node.asIterator
- node.open()
- data.foreach { case (k, v) =>
- // hasNext should be idempotent
- assert(iter.hasNext)
- assert(iter.hasNext)
- val item = iter.next()
- assert(item.numFields === 2)
- assert(item.getInt(0) === k)
- assert(item.getInt(1) === v)
- }
- intercept[NoSuchElementException] {
- iter.next()
- }
- node.close()
- }
-
- test("collect") {
- val node = new DummyNode(kvIntAttributes, data)
- node.open()
- val collected = node.collect()
- assert(collected.size === data.size)
- assert(collected.forall(_.size === 2))
- assert(collected.map { case row => (row.getInt(0), row.getInt(0)) } === data)
- node.close()
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
deleted file mode 100644
index cd67a66ebf..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-class LocalNodeTest extends SparkFunSuite {
-
- protected val conf: SQLConf = new SQLConf
- protected val kvIntAttributes = Seq(
- AttributeReference("k", IntegerType)(),
- AttributeReference("v", IntegerType)())
- protected val joinNameAttributes = Seq(
- AttributeReference("id1", IntegerType)(),
- AttributeReference("name", StringType)())
- protected val joinNicknameAttributes = Seq(
- AttributeReference("id2", IntegerType)(),
- AttributeReference("nickname", StringType)())
-
- /**
- * Wrap a function processing two [[LocalNode]]s such that:
- * (1) all input rows are automatically converted to unsafe rows
- * (2) all output rows are automatically converted back to safe rows
- */
- protected def wrapForUnsafe(
- f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
- (left: LocalNode, right: LocalNode) => {
- val _left = ConvertToUnsafeNode(conf, left)
- val _right = ConvertToUnsafeNode(conf, right)
- val r = f(_left, _right)
- ConvertToSafeNode(conf, r)
- }
- }
-
- /**
- * Recursively resolve all expressions in a [[LocalNode]] using the node's attributes.
- */
- protected def resolveExpressions(outputNode: LocalNode): LocalNode = {
- outputNode transform {
- case node: LocalNode =>
- val inputMap = node.output.map { a => (a.name, a) }.toMap
- node transformExpressions {
- case UnresolvedAttribute(Seq(u)) =>
- inputMap.getOrElse(u,
- sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
- }
- }
- }
-
- /**
- * Resolve all expressions in `expressions` based on the `output` of `localNode`.
- * It assumes that all expressions in the `localNode` are resolved.
- */
- protected def resolveExpressions(
- expressions: Seq[Expression],
- localNode: LocalNode): Seq[Expression] = {
- require(localNode.expressions.forall(_.resolved))
- val inputMap = localNode.output.map { a => (a.name, a) }.toMap
- expressions.map { expression =>
- expression.transformUp {
- case UnresolvedAttribute(Seq(u)) =>
- inputMap.getOrElse(u,
- sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
- }
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
deleted file mode 100644
index bcc87a9175..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-
-
-class NestedLoopJoinNodeSuite extends LocalNodeTest {
-
- // Test all combinations of the three dimensions: with/out unsafe, build sides, and join types
- private val buildSides = Seq(BuildLeft, BuildRight)
- private val joinTypes = Seq(LeftOuter, RightOuter, FullOuter)
- buildSides.foreach { buildSide =>
- joinTypes.foreach { joinType =>
- testJoin(buildSide, joinType)
- }
- }
-
- /**
- * Test outer nested loop joins with varying degrees of matches.
- */
- private def testJoin(buildSide: BuildSide, joinType: JoinType): Unit = {
- val testNamePrefix = s"$buildSide / $joinType"
- val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
- val conf = new SQLConf
-
- // Actual test body
- def runTest(
- joinType: JoinType,
- leftInput: Array[(Int, String)],
- rightInput: Array[(Int, String)]): Unit = {
- val leftNode = new DummyNode(joinNameAttributes, leftInput)
- val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
- val cond = 'id1 === 'id2
- val makeNode = (node1: LocalNode, node2: LocalNode) => {
- resolveExpressions(
- new NestedLoopJoinNode(conf, node1, node2, buildSide, joinType, Some(cond)))
- }
- val makeUnsafeNode = wrapForUnsafe(makeNode)
- val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
- val expectedOutput = generateExpectedOutput(leftInput, rightInput, joinType)
- val actualOutput = hashJoinNode.collect().map { row =>
- // (
- // id, name,
- // id, nickname
- // )
- (
- Option(row.get(0)).map(_.asInstanceOf[Int]), Option(row.getString(1)),
- Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(3))
- )
- }
- assert(actualOutput.toSet === expectedOutput.toSet)
- }
-
- test(s"$testNamePrefix: empty") {
- runTest(joinType, Array.empty, Array.empty)
- }
-
- test(s"$testNamePrefix: no matches") {
- val someIrrelevantData = (10000 to 10100).map { i => (i, "piper" + i) }.toArray
- runTest(joinType, someData, Array.empty)
- runTest(joinType, Array.empty, someData)
- runTest(joinType, someData, someIrrelevantData)
- runTest(joinType, someIrrelevantData, someData)
- }
-
- test(s"$testNamePrefix: partial matches") {
- val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
- runTest(joinType, someData, someOtherData)
- runTest(joinType, someOtherData, someData)
- }
-
- test(s"$testNamePrefix: full matches") {
- val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }
- runTest(joinType, someData, someSuperRelevantData)
- runTest(joinType, someSuperRelevantData, someData)
- }
- }
-
- /**
- * Helper method to generate the expected output of a test based on the join type.
- */
- private def generateExpectedOutput(
- leftInput: Array[(Int, String)],
- rightInput: Array[(Int, String)],
- joinType: JoinType): Array[(Option[Int], Option[String], Option[Int], Option[String])] = {
- joinType match {
- case LeftOuter =>
- val rightInputMap = rightInput.toMap
- leftInput.map { case (k, v) =>
- val rightKey = rightInputMap.get(k).map { _ => k }
- val rightValue = rightInputMap.get(k)
- (Some(k), Some(v), rightKey, rightValue)
- }
-
- case RightOuter =>
- val leftInputMap = leftInput.toMap
- rightInput.map { case (k, v) =>
- val leftKey = leftInputMap.get(k).map { _ => k }
- val leftValue = leftInputMap.get(k)
- (leftKey, leftValue, Some(k), Some(v))
- }
-
- case FullOuter =>
- val leftInputMap = leftInput.toMap
- val rightInputMap = rightInput.toMap
- val leftOutput = leftInput.map { case (k, v) =>
- val rightKey = rightInputMap.get(k).map { _ => k }
- val rightValue = rightInputMap.get(k)
- (Some(k), Some(v), rightKey, rightValue)
- }
- val rightOutput = rightInput.map { case (k, v) =>
- val leftKey = leftInputMap.get(k).map { _ => k }
- val leftValue = leftInputMap.get(k)
- (leftKey, leftValue, Some(k), Some(v))
- }
- (leftOutput ++ rightOutput).distinct
-
- case other =>
- throw new IllegalArgumentException(s"Join type $other is not applicable")
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
deleted file mode 100644
index 02ecb23d34..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-
-class ProjectNodeSuite extends LocalNodeTest {
- private val pieAttributes = Seq(
- AttributeReference("id", IntegerType)(),
- AttributeReference("age", IntegerType)(),
- AttributeReference("name", StringType)())
-
- private def testProject(inputData: Array[(Int, Int, String)] = Array.empty): Unit = {
- val inputNode = new DummyNode(pieAttributes, inputData)
- val columns = Seq[NamedExpression](inputNode.output(0), inputNode.output(2))
- val projectNode = new ProjectNode(conf, columns, inputNode)
- val expectedOutput = inputData.map { case (id, age, name) => (id, name) }
- val actualOutput = projectNode.collect().map { case row =>
- (row.getInt(0), row.getString(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testProject()
- }
-
- test("basic") {
- testProject((1 to 100).map { i => (i, i + 1, "pie" + i) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
deleted file mode 100644
index a3e83bbd51..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-
-class SampleNodeSuite extends LocalNodeTest {
-
- private def testSample(withReplacement: Boolean): Unit = {
- val seed = 0L
- val lowerb = 0.0
- val upperb = 0.3
- val maybeOut = if (withReplacement) "" else "out"
- test(s"with$maybeOut replacement") {
- val inputData = (1 to 1000).map { i => (i, i) }.toArray
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val sampleNode = new SampleNode(conf, lowerb, upperb, withReplacement, seed, inputNode)
- val sampler =
- if (withReplacement) {
- new PoissonSampler[(Int, Int)](upperb - lowerb, useGapSamplingIfPossible = false)
- } else {
- new BernoulliCellSampler[(Int, Int)](lowerb, upperb)
- }
- sampler.setSeed(seed)
- val expectedOutput = sampler.sample(inputData.iterator).toArray
- val actualOutput = sampleNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
- }
-
- testSample(withReplacement = true)
- testSample(withReplacement = false)
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
deleted file mode 100644
index 42ebc7bfca..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import scala.util.Random
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-
-
-class TakeOrderedAndProjectNodeSuite extends LocalNodeTest {
-
- private def testTakeOrderedAndProject(desc: Boolean): Unit = {
- val limit = 10
- val ascOrDesc = if (desc) "desc" else "asc"
- test(ascOrDesc) {
- val inputData = Random.shuffle((1 to 100).toList).map { i => (i, i) }.toArray
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val firstColumn = inputNode.output(0)
- val sortDirection = if (desc) Descending else Ascending
- val sortOrder = SortOrder(firstColumn, sortDirection)
- val takeOrderAndProjectNode = new TakeOrderedAndProjectNode(
- conf, limit, Seq(sortOrder), Some(Seq(firstColumn)), inputNode)
- val expectedOutput = inputData
- .map { case (k, _) => k }
- .sortBy { k => k * (if (desc) -1 else 1) }
- .take(limit)
- val actualOutput = takeOrderAndProjectNode.collect().map { row => row.getInt(0) }
- assert(actualOutput === expectedOutput)
- }
- }
-
- testTakeOrderedAndProject(desc = false)
- testTakeOrderedAndProject(desc = true)
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
deleted file mode 100644
index 666b0235c0..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class UnionNodeSuite extends LocalNodeTest {
-
- private def testUnion(inputData: Seq[Array[(Int, Int)]]): Unit = {
- val inputNodes = inputData.map { data =>
- new DummyNode(kvIntAttributes, data)
- }
- val unionNode = new UnionNode(conf, inputNodes)
- val expectedOutput = inputData.flatten
- val actualOutput = unionNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testUnion(Seq(Array.empty))
- testUnion(Seq(Array.empty, Array.empty))
- }
-
- test("self") {
- val data = (1 to 100).map { i => (i, i) }.toArray
- testUnion(Seq(data))
- testUnion(Seq(data, data))
- testUnion(Seq(data, data, data))
- }
-
- test("basic") {
- val zero = Array.empty[(Int, Int)]
- val one = (1 to 100).map { i => (i, i) }.toArray
- val two = (50 to 150).map { i => (i, i) }.toArray
- val three = (800 to 900).map { i => (i, i) }.toArray
- testUnion(Seq(zero, one, two, three))
- }
-
-}