aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-03-14 19:22:11 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 19:22:11 -0700
commit4bf460979562a7d8cec403f0bd603f88517fdb2b (patch)
tree2b6790b67f9597293dd75db5b256f4fd309a1836 /sql/core
parent17eec0a71ba8713c559d641e3f43a1be726b037c (diff)
downloadspark-4bf460979562a7d8cec403f0bd603f88517fdb2b.tar.gz
spark-4bf460979562a7d8cec403f0bd603f88517fdb2b.tar.bz2
spark-4bf460979562a7d8cec403f0bd603f88517fdb2b.zip
[SPARK-13882][SQL] Remove org.apache.spark.sql.execution.local
## What changes were proposed in this pull request? We introduced some local operators in org.apache.spark.sql.execution.local package but never fully wired the engine to actually use these. We still plan to implement a full local mode, but it's probably going to be fairly different from what the current iterator-based local mode would look like. Based on what we know right now, we might want a push-based columnar version of these operators. Let's just remove them for now, and we can always re-introduced them in the future by looking at branch-1.6. ## How was this patch tested? This is simply dead code removal. Author: Reynold Xin <rxin@databricks.com> Closes #11705 from rxin/SPARK-13882.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala71
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala111
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala63
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala46
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala157
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala152
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala74
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala68
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala45
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala141
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala87
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala142
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala51
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala50
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala55
30 files changed, 0 insertions, 2060 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 99f8841c87..6235897ed1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.execution.SparkSqlSerializer
-import org.apache.spark.sql.execution.local.LocalNode
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator, Utils}
@@ -157,10 +156,6 @@ private[joins] class UniqueKeyHashedRelation(
private[execution] object HashedRelation {
- def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
- apply(localNode.asIterator, keyGenerator)
- }
-
def apply(
input: Iterator[InternalRow],
keyGenerator: Projection,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
deleted file mode 100644
index 97f9358016..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BinaryHashJoinNode.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A [[HashJoinNode]] that builds the [[HashedRelation]] according to the value of
- * `buildSide`. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BinaryHashJoinNode(
- conf: SQLConf,
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- buildSide: BuildSide,
- left: LocalNode,
- right: LocalNode)
- extends BinaryLocalNode(conf) with HashJoinNode {
-
- protected override val (streamedNode, streamedKeys) = buildSide match {
- case BuildLeft => (right, rightKeys)
- case BuildRight => (left, leftKeys)
- }
-
- private val (buildNode, buildKeys) = buildSide match {
- case BuildLeft => (left, leftKeys)
- case BuildRight => (right, rightKeys)
- }
-
- override def output: Seq[Attribute] = left.output ++ right.output
-
- private def buildSideKeyGenerator: Projection = {
- // We are expecting the data types of buildKeys and streamedKeys are the same.
- assert(buildKeys.map(_.dataType) == streamedKeys.map(_.dataType))
- UnsafeProjection.create(buildKeys, buildNode.output)
- }
-
- protected override def doOpen(): Unit = {
- buildNode.open()
- val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
- // We have built the HashedRelation. So, close buildNode.
- buildNode.close()
-
- streamedNode.open()
- // Set the HashedRelation used by the HashJoinNode.
- withHashedRelation(hashedRelation)
- }
-
- override def close(): Unit = {
- // Please note that we do not need to call the close method of our buildNode because
- // it has been called in this.open.
- streamedNode.close()
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
deleted file mode 100644
index 779f4833fa..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/BroadcastHashJoinNode.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A [[HashJoinNode]] for broadcast join. It takes a streamedNode and a broadcast
- * [[HashedRelation]]. The actual work of this node is defined in [[HashJoinNode]].
- */
-case class BroadcastHashJoinNode(
- conf: SQLConf,
- streamedKeys: Seq[Expression],
- streamedNode: LocalNode,
- buildSide: BuildSide,
- buildOutput: Seq[Attribute],
- hashedRelation: Broadcast[HashedRelation])
- extends UnaryLocalNode(conf) with HashJoinNode {
-
- override val child = streamedNode
-
- // Because we do not pass in the buildNode, we take the output of buildNode to
- // create the inputSet properly.
- override def inputSet: AttributeSet = AttributeSet(child.output ++ buildOutput)
-
- override def output: Seq[Attribute] = buildSide match {
- case BuildRight => streamedNode.output ++ buildOutput
- case BuildLeft => buildOutput ++ streamedNode.output
- }
-
- protected override def doOpen(): Unit = {
- streamedNode.open()
- // Set the HashedRelation used by the HashJoinNode.
- withHashedRelation(hashedRelation.value)
- }
-
- override def close(): Unit = {
- streamedNode.close()
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
deleted file mode 100644
index f79d795a90..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToSafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}
-import org.apache.spark.sql.internal.SQLConf
-
-case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var convertToSafe: Projection = _
-
- override def open(): Unit = {
- child.open()
- convertToSafe = FromUnsafeProjection(child.schema)
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = convertToSafe(child.fetch())
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
deleted file mode 100644
index f3fa474b0f..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ConvertToUnsafeNode.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-
-case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var convertToUnsafe: Projection = _
-
- override def open(): Unit = {
- child.open()
- convertToUnsafe = UnsafeProjection.create(child.schema)
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = convertToUnsafe(child.fetch())
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
deleted file mode 100644
index 6ccd6db0e6..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.internal.SQLConf
-
-case class ExpandNode(
- conf: SQLConf,
- projections: Seq[Seq[Expression]],
- output: Seq[Attribute],
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- assert(projections.size > 0)
-
- private[this] var result: InternalRow = _
- private[this] var idx: Int = _
- private[this] var input: InternalRow = _
- private[this] var groups: Array[Projection] = _
-
- override def open(): Unit = {
- child.open()
- groups = projections.map(ee => newMutableProjection(ee, child.output)()).toArray
- idx = groups.length
- }
-
- override def next(): Boolean = {
- if (idx >= groups.length) {
- if (child.next()) {
- input = child.fetch()
- idx = 0
- } else {
- return false
- }
- }
- result = groups(idx)(input)
- idx += 1
- true
- }
-
- override def fetch(): InternalRow = result
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
deleted file mode 100644
index c5eb33cef4..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
- extends UnaryLocalNode(conf) {
-
- private[this] var predicate: (InternalRow) => Boolean = _
-
- override def output: Seq[Attribute] = child.output
-
- override def open(): Unit = {
- child.open()
- predicate = GeneratePredicate.generate(condition, child.output)
- }
-
- override def next(): Boolean = {
- var found = false
- while (!found && child.next()) {
- found = predicate.apply(child.fetch())
- }
- found
- }
-
- override def fetch(): InternalRow = child.fetch()
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
deleted file mode 100644
index fd7948ffa9..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/HashJoinNode.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.joins._
-
-/**
- * An abstract node for sharing common functionality among different implementations of
- * inner hash equi-join, notably [[BinaryHashJoinNode]] and [[BroadcastHashJoinNode]].
- *
- * Much of this code is similar to [[org.apache.spark.sql.execution.joins.HashJoin]].
- */
-trait HashJoinNode {
-
- self: LocalNode =>
-
- protected def streamedKeys: Seq[Expression]
- protected def streamedNode: LocalNode
- protected def buildSide: BuildSide
-
- private[this] var currentStreamedRow: InternalRow = _
- private[this] var currentHashMatches: Seq[InternalRow] = _
- private[this] var currentMatchPosition: Int = -1
-
- private[this] var joinRow: JoinedRow = _
- private[this] var resultProjection: (InternalRow) => InternalRow = _
-
- private[this] var hashed: HashedRelation = _
- private[this] var joinKeys: Projection = _
-
- private def streamSideKeyGenerator: Projection =
- UnsafeProjection.create(streamedKeys, streamedNode.output)
-
- /**
- * Sets the HashedRelation used by this node. This method needs to be called after
- * before the first `next` gets called.
- */
- protected def withHashedRelation(hashedRelation: HashedRelation): Unit = {
- hashed = hashedRelation
- }
-
- /**
- * Custom open implementation to be overridden by subclasses.
- */
- protected def doOpen(): Unit
-
- override def open(): Unit = {
- doOpen()
- joinRow = new JoinedRow
- resultProjection = UnsafeProjection.create(schema)
- joinKeys = streamSideKeyGenerator
- }
-
- override def next(): Boolean = {
- currentMatchPosition += 1
- if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
- fetchNextMatch()
- } else {
- true
- }
- }
-
- /**
- * Populate `currentHashMatches` with build-side rows matching the next streamed row.
- * @return whether matches are found such that subsequent calls to `fetch` are valid.
- */
- private def fetchNextMatch(): Boolean = {
- currentHashMatches = null
- currentMatchPosition = -1
-
- while (currentHashMatches == null && streamedNode.next()) {
- currentStreamedRow = streamedNode.fetch()
- val key = joinKeys(currentStreamedRow)
- if (!key.anyNull) {
- currentHashMatches = hashed.get(key)
- }
- }
-
- if (currentHashMatches == null) {
- false
- } else {
- currentMatchPosition = 0
- true
- }
- }
-
- override def fetch(): InternalRow = {
- val ret = buildSide match {
- case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
- case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
- }
- resultProjection(ret)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
deleted file mode 100644
index e594e132de..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/IntersectNode.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.internal.SQLConf
-
-case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
- extends BinaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = left.output
-
- private[this] var leftRows: mutable.HashSet[InternalRow] = _
-
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- left.open()
- leftRows = mutable.HashSet[InternalRow]()
- while (left.next()) {
- leftRows += left.fetch().copy()
- }
- left.close()
- right.open()
- }
-
- override def next(): Boolean = {
- currentRow = null
- while (currentRow == null && right.next()) {
- currentRow = right.fetch()
- if (!leftRows.contains(currentRow)) {
- currentRow = null
- }
- }
- currentRow != null
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- left.close()
- right.close()
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
deleted file mode 100644
index 9af45ac0aa..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LimitNode.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {
-
- private[this] var count = 0
-
- override def output: Seq[Attribute] = child.output
-
- override def open(): Unit = child.open()
-
- override def close(): Unit = child.close()
-
- override def fetch(): InternalRow = child.fetch()
-
- override def next(): Boolean = {
- if (count < limit) {
- count += 1
- child.next()
- } else {
- false
- }
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
deleted file mode 100644
index a5d09691dc..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-
-/**
- * A local physical operator, in the form of an iterator.
- *
- * Before consuming the iterator, open function must be called.
- * After consuming the iterator, close function must be called.
- */
-abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Logging {
-
- private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")
-
- /**
- * Called before open(). Prepare can be used to reserve memory needed. It must NOT consume
- * any input data.
- *
- * Implementations of this must also call the `prepare()` function of its children.
- */
- def prepare(): Unit = children.foreach(_.prepare())
-
- /**
- * Initializes the iterator state. Must be called before calling `next()`.
- *
- * Implementations of this must also call the `open()` function of its children.
- */
- def open(): Unit
-
- /**
- * Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
- */
- def next(): Boolean
-
- /**
- * Returns the current tuple.
- */
- def fetch(): InternalRow
-
- /**
- * Closes the iterator and releases all resources. It should be idempotent.
- *
- * Implementations of this must also call the `close()` function of its children.
- */
- def close(): Unit
-
- /**
- * Returns the content through the [[Iterator]] interface.
- */
- final def asIterator: Iterator[InternalRow] = new LocalNodeIterator(this)
-
- /**
- * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
- */
- final def collect(): Seq[Row] = {
- val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
- val result = new scala.collection.mutable.ArrayBuffer[Row]
- open()
- try {
- while (next()) {
- result += converter.apply(fetch()).asInstanceOf[Row]
- }
- } finally {
- close()
- }
- result
- }
-
- protected def newMutableProjection(
- expressions: Seq[Expression],
- inputSchema: Seq[Attribute]): () => MutableProjection = {
- log.debug(
- s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- GenerateMutableProjection.generate(expressions, inputSchema)
- }
-
- protected def newPredicate(
- expression: Expression,
- inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- GeneratePredicate.generate(expression, inputSchema)
- }
-}
-
-
-abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
- override def children: Seq[LocalNode] = Seq.empty
-}
-
-
-abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-
- def child: LocalNode
-
- override def children: Seq[LocalNode] = Seq(child)
-}
-
-abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {
-
- def left: LocalNode
-
- def right: LocalNode
-
- override def children: Seq[LocalNode] = Seq(left, right)
-}
-
-/**
- * An thin wrapper around a [[LocalNode]] that provides an `Iterator` interface.
- */
-private class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
- private var nextRow: InternalRow = _
-
- override def hasNext: Boolean = {
- if (nextRow == null) {
- val res = localNode.next()
- if (res) {
- nextRow = localNode.fetch()
- }
- res
- } else {
- true
- }
- }
-
- override def next(): InternalRow = {
- if (hasNext) {
- val res = nextRow
- nextRow = null
- res
- } else {
- throw new NoSuchElementException
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
deleted file mode 100644
index b5ea08325c..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
-
-case class NestedLoopJoinNode(
- conf: SQLConf,
- left: LocalNode,
- right: LocalNode,
- buildSide: BuildSide,
- joinType: JoinType,
- condition: Option[Expression]) extends BinaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = {
- joinType match {
- case LeftOuter =>
- left.output ++ right.output.map(_.withNullability(true))
- case RightOuter =>
- left.output.map(_.withNullability(true)) ++ right.output
- case FullOuter =>
- left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
- case x =>
- throw new IllegalArgumentException(
- s"NestedLoopJoin should not take $x as the JoinType")
- }
- }
-
- private[this] def genResultProjection: InternalRow => InternalRow = {
- UnsafeProjection.create(schema)
- }
-
- private[this] var currentRow: InternalRow = _
-
- private[this] var iterator: Iterator[InternalRow] = _
-
- override def open(): Unit = {
- val (streamed, build) = buildSide match {
- case BuildRight => (left, right)
- case BuildLeft => (right, left)
- }
- build.open()
- val buildRelation = new CompactBuffer[InternalRow]
- while (build.next()) {
- buildRelation += build.fetch().copy()
- }
- build.close()
-
- val boundCondition =
- newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
-
- val leftNulls = new GenericMutableRow(left.output.size)
- val rightNulls = new GenericMutableRow(right.output.size)
- val joinedRow = new JoinedRow
- val matchedBuildTuples = new BitSet(buildRelation.size)
- val resultProj = genResultProjection
- streamed.open()
-
- // streamedRowMatches also contains null rows if using outer join
- val streamedRowMatches: Iterator[InternalRow] = streamed.asIterator.flatMap { streamedRow =>
- val matchedRows = new CompactBuffer[InternalRow]
-
- var i = 0
- var streamRowMatched = false
-
- // Scan the build relation to look for matches for each streamed row
- while (i < buildRelation.size) {
- val buildRow = buildRelation(i)
- buildSide match {
- case BuildRight => joinedRow(streamedRow, buildRow)
- case BuildLeft => joinedRow(buildRow, streamedRow)
- }
- if (boundCondition(joinedRow)) {
- matchedRows += resultProj(joinedRow).copy()
- streamRowMatched = true
- matchedBuildTuples.set(i)
- }
- i += 1
- }
-
- // If this row had no matches and we're using outer join, join it with the null rows
- if (!streamRowMatched) {
- (joinType, buildSide) match {
- case (LeftOuter | FullOuter, BuildRight) =>
- matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
- case (RightOuter | FullOuter, BuildLeft) =>
- matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
- case _ =>
- }
- }
-
- matchedRows.iterator
- }
-
- // If we're using outer join, find rows on the build side that didn't match anything
- // and join them with the null row
- lazy val unmatchedBuildRows: Iterator[InternalRow] = {
- var i = 0
- buildRelation.filter { row =>
- val r = !matchedBuildTuples.get(i)
- i += 1
- r
- }.iterator
- }
- iterator = (joinType, buildSide) match {
- case (RightOuter | FullOuter, BuildRight) =>
- streamedRowMatches ++
- unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, buildRow)) }
- case (LeftOuter | FullOuter, BuildLeft) =>
- streamedRowMatches ++
- unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, rightNulls)) }
- case _ => streamedRowMatches
- }
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- left.close()
- right.close()
- }
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
deleted file mode 100644
index 5fe068a13c..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, UnsafeProjection}
-import org.apache.spark.sql.internal.SQLConf
-
-
-case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
- extends UnaryLocalNode(conf) {
-
- private[this] var project: UnsafeProjection = _
-
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-
- override def open(): Unit = {
- project = UnsafeProjection.create(projectList, child.output)
- child.open()
- }
-
- override def next(): Boolean = child.next()
-
- override def fetch(): InternalRow = {
- project.apply(child.fetch())
- }
-
- override def close(): Unit = child.close()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
deleted file mode 100644
index 078fb50deb..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SampleNode.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-
-/**
- * Sample the dataset.
- *
- * @param conf the SQLConf
- * @param lowerBound Lower-bound of the sampling probability (usually 0.0)
- * @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
- * will be ub - lb.
- * @param withReplacement Whether to sample with replacement.
- * @param seed the random seed
- * @param child the LocalNode
- */
-case class SampleNode(
- conf: SQLConf,
- lowerBound: Double,
- upperBound: Double,
- withReplacement: Boolean,
- seed: Long,
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- override def output: Seq[Attribute] = child.output
-
- private[this] var iterator: Iterator[InternalRow] = _
-
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- child.open()
- val sampler =
- if (withReplacement) {
- // Disable gap sampling since the gap sampling method buffers two rows internally,
- // requiring us to copy the row, which is more expensive than the random number generator.
- new PoissonSampler[InternalRow](upperBound - lowerBound, useGapSamplingIfPossible = false)
- } else {
- new BernoulliCellSampler[InternalRow](lowerBound, upperBound)
- }
- sampler.setSeed(seed)
- iterator = sampler.sample(child.asIterator)
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = child.close()
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
deleted file mode 100644
index 8ebfe3a68b..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * An operator that scans some local data collection in the form of Scala Seq.
- */
-case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
- extends LeafLocalNode(conf) {
-
- private[this] var iterator: Iterator[InternalRow] = _
- private[this] var currentRow: InternalRow = _
-
- override def open(): Unit = {
- iterator = data.iterator
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- currentRow = iterator.next()
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = {
- // Do nothing
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
deleted file mode 100644
index f52f5f7bb5..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNode.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.BoundedPriorityQueue
-
-case class TakeOrderedAndProjectNode(
- conf: SQLConf,
- limit: Int,
- sortOrder: Seq[SortOrder],
- projectList: Option[Seq[NamedExpression]],
- child: LocalNode) extends UnaryLocalNode(conf) {
-
- private[this] var projection: Option[Projection] = _
- private[this] var ord: Ordering[InternalRow] = _
- private[this] var iterator: Iterator[InternalRow] = _
- private[this] var currentRow: InternalRow = _
-
- override def output: Seq[Attribute] = {
- val projectOutput = projectList.map(_.map(_.toAttribute))
- projectOutput.getOrElse(child.output)
- }
-
- override def open(): Unit = {
- child.open()
- projection = projectList.map(UnsafeProjection.create(_, child.output))
- ord = GenerateOrdering.generate(sortOrder, child.output)
- // Priority keeps the largest elements, so let's reverse the ordering.
- val queue = new BoundedPriorityQueue[InternalRow](limit)(ord.reverse)
- while (child.next()) {
- queue += child.fetch()
- }
- // Close it eagerly since we don't need it.
- child.close()
- iterator = queue.toArray.sorted(ord).iterator
- }
-
- override def next(): Boolean = {
- if (iterator.hasNext) {
- val _currentRow = iterator.next()
- currentRow = projection match {
- case Some(p) => p(_currentRow)
- case None => _currentRow
- }
- true
- } else {
- false
- }
- }
-
- override def fetch(): InternalRow = currentRow
-
- override def close(): Unit = child.close()
-
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
deleted file mode 100644
index e53bc220d8..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/UnionNode.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.internal.SQLConf
-
-case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf) {
-
- override def output: Seq[Attribute] = children.head.output
-
- private[this] var currentChild: LocalNode = _
-
- private[this] var nextChildIndex: Int = _
-
- override def open(): Unit = {
- currentChild = children.head
- currentChild.open()
- nextChildIndex = 1
- }
-
- private def advanceToNextChild(): Boolean = {
- var found = false
- var exit = false
- while (!exit && !found) {
- if (currentChild != null) {
- currentChild.close()
- }
- if (nextChildIndex >= children.size) {
- found = false
- exit = true
- } else {
- currentChild = children(nextChildIndex)
- nextChildIndex += 1
- currentChild.open()
- found = currentChild.next()
- }
- }
- found
- }
-
- override def close(): Unit = {
- if (currentChild != null) {
- currentChild.close()
- }
- }
-
- override def fetch(): InternalRow = currentChild.fetch()
-
- override def next(): Boolean = {
- if (currentChild.next()) {
- true
- } else {
- advanceToNextChild()
- }
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
deleted file mode 100644
index cd9277d3bc..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/DummyNode.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.internal.SQLConf
-
-/**
- * A dummy [[LocalNode]] that just returns rows from a [[LocalRelation]].
- */
-private[local] case class DummyNode(
- output: Seq[Attribute],
- relation: LocalRelation,
- conf: SQLConf)
- extends LocalNode(conf) {
-
- import DummyNode._
-
- private var index: Int = CLOSED
- private val input: Seq[InternalRow] = relation.data
-
- def this(output: Seq[Attribute], data: Seq[Product], conf: SQLConf = new SQLConf) {
- this(output, LocalRelation.fromProduct(output, data), conf)
- }
-
- def isOpen: Boolean = index != CLOSED
-
- override def children: Seq[LocalNode] = Seq.empty
-
- override def open(): Unit = {
- index = -1
- }
-
- override def next(): Boolean = {
- index += 1
- index < input.size
- }
-
- override def fetch(): InternalRow = {
- assert(index >= 0 && index < input.size)
- input(index)
- }
-
- override def close(): Unit = {
- index = CLOSED
- }
-}
-
-private object DummyNode {
- val CLOSED: Int = Int.MinValue
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
deleted file mode 100644
index bbd94d8da2..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-
-
-class ExpandNodeSuite extends LocalNodeTest {
-
- private def testExpand(inputData: Array[(Int, Int)] = Array.empty): Unit = {
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val projections = Seq(Seq('k + 'v, 'k - 'v), Seq('k * 'v, 'k / 'v))
- val expandNode = new ExpandNode(conf, projections, inputNode.output, inputNode)
- val resolvedNode = resolveExpressions(expandNode)
- val expectedOutput = {
- val firstHalf = inputData.map { case (k, v) => (k + v, k - v) }
- val secondHalf = inputData.map { case (k, v) => (k * v, k / v) }
- firstHalf ++ secondHalf
- }
- val actualOutput = resolvedNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput.toSet === expectedOutput.toSet)
- }
-
- test("empty") {
- testExpand()
- }
-
- test("basic") {
- testExpand((1 to 100).map { i => (i, i * 1000) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
deleted file mode 100644
index 4eadce646d..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/FilterNodeSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-
-
-class FilterNodeSuite extends LocalNodeTest {
-
- private def testFilter(inputData: Array[(Int, Int)] = Array.empty): Unit = {
- val cond = 'k % 2 === 0
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val filterNode = new FilterNode(conf, cond, inputNode)
- val resolvedNode = resolveExpressions(filterNode)
- val expectedOutput = inputData.filter { case (k, _) => k % 2 == 0 }
- val actualOutput = resolvedNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testFilter()
- }
-
- test("basic") {
- testFilter((1 to 100).map { i => (i, i) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
deleted file mode 100644
index 74142ea598..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.mockito.Mockito.{mock, when}
-
-import org.apache.spark.broadcast.TorrentBroadcast
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeProjection}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashedRelation}
-import org.apache.spark.sql.internal.SQLConf
-
-class HashJoinNodeSuite extends LocalNodeTest {
-
- // Test all combinations of the two dimensions: with/out unsafe and build sides
- private val buildSides = Seq(BuildLeft, BuildRight)
- buildSides.foreach { buildSide =>
- testJoin(buildSide)
- }
-
- /**
- * Builds a [[HashedRelation]] based on a resolved `buildKeys`
- * and a resolved `buildNode`.
- */
- private def buildHashedRelation(
- conf: SQLConf,
- buildKeys: Seq[Expression],
- buildNode: LocalNode): HashedRelation = {
-
- val buildSideKeyGenerator = UnsafeProjection.create(buildKeys, buildNode.output)
- buildNode.prepare()
- buildNode.open()
- val hashedRelation = HashedRelation(buildNode, buildSideKeyGenerator)
- buildNode.close()
-
- hashedRelation
- }
-
- /**
- * Test inner hash join with varying degrees of matches.
- */
- private def testJoin(buildSide: BuildSide): Unit = {
- val testNamePrefix = buildSide
- val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
- val conf = new SQLConf
-
- // Actual test body
- def runTest(leftInput: Array[(Int, String)], rightInput: Array[(Int, String)]): Unit = {
- val rightInputMap = rightInput.toMap
- val leftNode = new DummyNode(joinNameAttributes, leftInput)
- val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
- val makeBinaryHashJoinNode = (node1: LocalNode, node2: LocalNode) => {
- val binaryHashJoinNode =
- BinaryHashJoinNode(conf, Seq('id1), Seq('id2), buildSide, node1, node2)
- resolveExpressions(binaryHashJoinNode)
- }
- val makeBroadcastJoinNode = (node1: LocalNode, node2: LocalNode) => {
- val leftKeys = Seq('id1.attr)
- val rightKeys = Seq('id2.attr)
- // Figure out the build side and stream side.
- val (buildNode, buildKeys, streamedNode, streamedKeys) = buildSide match {
- case BuildLeft => (node1, leftKeys, node2, rightKeys)
- case BuildRight => (node2, rightKeys, node1, leftKeys)
- }
- // Resolve the expressions of the build side and then create a HashedRelation.
- val resolvedBuildNode = resolveExpressions(buildNode)
- val resolvedBuildKeys = resolveExpressions(buildKeys, resolvedBuildNode)
- val hashedRelation = buildHashedRelation(conf, resolvedBuildKeys, resolvedBuildNode)
- val broadcastHashedRelation = mock(classOf[TorrentBroadcast[HashedRelation]])
- when(broadcastHashedRelation.value).thenReturn(hashedRelation)
-
- val hashJoinNode =
- BroadcastHashJoinNode(
- conf,
- streamedKeys,
- streamedNode,
- buildSide,
- resolvedBuildNode.output,
- broadcastHashedRelation)
- resolveExpressions(hashJoinNode)
- }
-
- val expectedOutput = leftInput
- .filter { case (k, _) => rightInputMap.contains(k) }
- .map { case (k, v) => (k, v, k, rightInputMap(k)) }
-
- Seq(makeBinaryHashJoinNode, makeBroadcastJoinNode).foreach { makeNode =>
- val makeUnsafeNode = wrapForUnsafe(makeNode)
- val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
-
- val actualOutput = hashJoinNode.collect().map { row =>
- // (id, name, id, nickname)
- (row.getInt(0), row.getString(1), row.getInt(2), row.getString(3))
- }
- assert(actualOutput === expectedOutput)
- }
- }
-
- test(s"$testNamePrefix: empty") {
- runTest(Array.empty, Array.empty)
- runTest(someData, Array.empty)
- runTest(Array.empty, someData)
- }
-
- test(s"$testNamePrefix: no matches") {
- val someIrrelevantData = (10000 to 100100).map { i => (i, "piper" + i) }.toArray
- runTest(someData, Array.empty)
- runTest(Array.empty, someData)
- runTest(someData, someIrrelevantData)
- runTest(someIrrelevantData, someData)
- }
-
- test(s"$testNamePrefix: partial matches") {
- val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
- runTest(someData, someOtherData)
- runTest(someOtherData, someData)
- }
-
- test(s"$testNamePrefix: full matches") {
- val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }.toArray
- runTest(someData, someSuperRelevantData)
- runTest(someSuperRelevantData, someData)
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
deleted file mode 100644
index c0ad2021b2..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/IntersectNodeSuite.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class IntersectNodeSuite extends LocalNodeTest {
-
- test("basic") {
- val n = 100
- val leftData = (1 to n).filter { i => i % 2 == 0 }.map { i => (i, i) }.toArray
- val rightData = (1 to n).filter { i => i % 3 == 0 }.map { i => (i, i) }.toArray
- val leftNode = new DummyNode(kvIntAttributes, leftData)
- val rightNode = new DummyNode(kvIntAttributes, rightData)
- val intersectNode = new IntersectNode(conf, leftNode, rightNode)
- val expectedOutput = leftData.intersect(rightData)
- val actualOutput = intersectNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
deleted file mode 100644
index fb790636a3..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LimitNodeSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class LimitNodeSuite extends LocalNodeTest {
-
- private def testLimit(inputData: Array[(Int, Int)] = Array.empty, limit: Int = 10): Unit = {
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val limitNode = new LimitNode(conf, limit, inputNode)
- val expectedOutput = inputData.take(limit)
- val actualOutput = limitNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testLimit()
- }
-
- test("basic") {
- testLimit((1 to 100).map { i => (i, i) }.toArray, 20)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
deleted file mode 100644
index 0d1ed99eec..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class LocalNodeSuite extends LocalNodeTest {
- private val data = (1 to 100).map { i => (i, i) }.toArray
-
- test("basic open, next, fetch, close") {
- val node = new DummyNode(kvIntAttributes, data)
- assert(!node.isOpen)
- node.open()
- assert(node.isOpen)
- data.foreach { case (k, v) =>
- assert(node.next())
- // fetch should be idempotent
- val fetched = node.fetch()
- assert(node.fetch() === fetched)
- assert(node.fetch() === fetched)
- assert(node.fetch().numFields === 2)
- assert(node.fetch().getInt(0) === k)
- assert(node.fetch().getInt(1) === v)
- }
- assert(!node.next())
- node.close()
- assert(!node.isOpen)
- }
-
- test("asIterator") {
- val node = new DummyNode(kvIntAttributes, data)
- val iter = node.asIterator
- node.open()
- data.foreach { case (k, v) =>
- // hasNext should be idempotent
- assert(iter.hasNext)
- assert(iter.hasNext)
- val item = iter.next()
- assert(item.numFields === 2)
- assert(item.getInt(0) === k)
- assert(item.getInt(1) === v)
- }
- intercept[NoSuchElementException] {
- iter.next()
- }
- node.close()
- }
-
- test("collect") {
- val node = new DummyNode(kvIntAttributes, data)
- node.open()
- val collected = node.collect()
- assert(collected.size === data.size)
- assert(collected.forall(_.size === 2))
- assert(collected.map { case row => (row.getInt(0), row.getInt(0)) } === data)
- node.close()
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
deleted file mode 100644
index cd67a66ebf..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-class LocalNodeTest extends SparkFunSuite {
-
- protected val conf: SQLConf = new SQLConf
- protected val kvIntAttributes = Seq(
- AttributeReference("k", IntegerType)(),
- AttributeReference("v", IntegerType)())
- protected val joinNameAttributes = Seq(
- AttributeReference("id1", IntegerType)(),
- AttributeReference("name", StringType)())
- protected val joinNicknameAttributes = Seq(
- AttributeReference("id2", IntegerType)(),
- AttributeReference("nickname", StringType)())
-
- /**
- * Wrap a function processing two [[LocalNode]]s such that:
- * (1) all input rows are automatically converted to unsafe rows
- * (2) all output rows are automatically converted back to safe rows
- */
- protected def wrapForUnsafe(
- f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => LocalNode = {
- (left: LocalNode, right: LocalNode) => {
- val _left = ConvertToUnsafeNode(conf, left)
- val _right = ConvertToUnsafeNode(conf, right)
- val r = f(_left, _right)
- ConvertToSafeNode(conf, r)
- }
- }
-
- /**
- * Recursively resolve all expressions in a [[LocalNode]] using the node's attributes.
- */
- protected def resolveExpressions(outputNode: LocalNode): LocalNode = {
- outputNode transform {
- case node: LocalNode =>
- val inputMap = node.output.map { a => (a.name, a) }.toMap
- node transformExpressions {
- case UnresolvedAttribute(Seq(u)) =>
- inputMap.getOrElse(u,
- sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
- }
- }
- }
-
- /**
- * Resolve all expressions in `expressions` based on the `output` of `localNode`.
- * It assumes that all expressions in the `localNode` are resolved.
- */
- protected def resolveExpressions(
- expressions: Seq[Expression],
- localNode: LocalNode): Seq[Expression] = {
- require(localNode.expressions.forall(_.resolved))
- val inputMap = localNode.output.map { a => (a.name, a) }.toMap
- expressions.map { expression =>
- expression.transformUp {
- case UnresolvedAttribute(Seq(u)) =>
- inputMap.getOrElse(u,
- sys.error(s"Invalid Test: Cannot resolve $u given input $inputMap"))
- }
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
deleted file mode 100644
index bcc87a9175..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-
-
-class NestedLoopJoinNodeSuite extends LocalNodeTest {
-
- // Test all combinations of the three dimensions: with/out unsafe, build sides, and join types
- private val buildSides = Seq(BuildLeft, BuildRight)
- private val joinTypes = Seq(LeftOuter, RightOuter, FullOuter)
- buildSides.foreach { buildSide =>
- joinTypes.foreach { joinType =>
- testJoin(buildSide, joinType)
- }
- }
-
- /**
- * Test outer nested loop joins with varying degrees of matches.
- */
- private def testJoin(buildSide: BuildSide, joinType: JoinType): Unit = {
- val testNamePrefix = s"$buildSide / $joinType"
- val someData = (1 to 100).map { i => (i, "burger" + i) }.toArray
- val conf = new SQLConf
-
- // Actual test body
- def runTest(
- joinType: JoinType,
- leftInput: Array[(Int, String)],
- rightInput: Array[(Int, String)]): Unit = {
- val leftNode = new DummyNode(joinNameAttributes, leftInput)
- val rightNode = new DummyNode(joinNicknameAttributes, rightInput)
- val cond = 'id1 === 'id2
- val makeNode = (node1: LocalNode, node2: LocalNode) => {
- resolveExpressions(
- new NestedLoopJoinNode(conf, node1, node2, buildSide, joinType, Some(cond)))
- }
- val makeUnsafeNode = wrapForUnsafe(makeNode)
- val hashJoinNode = makeUnsafeNode(leftNode, rightNode)
- val expectedOutput = generateExpectedOutput(leftInput, rightInput, joinType)
- val actualOutput = hashJoinNode.collect().map { row =>
- // (
- // id, name,
- // id, nickname
- // )
- (
- Option(row.get(0)).map(_.asInstanceOf[Int]), Option(row.getString(1)),
- Option(row.get(2)).map(_.asInstanceOf[Int]), Option(row.getString(3))
- )
- }
- assert(actualOutput.toSet === expectedOutput.toSet)
- }
-
- test(s"$testNamePrefix: empty") {
- runTest(joinType, Array.empty, Array.empty)
- }
-
- test(s"$testNamePrefix: no matches") {
- val someIrrelevantData = (10000 to 10100).map { i => (i, "piper" + i) }.toArray
- runTest(joinType, someData, Array.empty)
- runTest(joinType, Array.empty, someData)
- runTest(joinType, someData, someIrrelevantData)
- runTest(joinType, someIrrelevantData, someData)
- }
-
- test(s"$testNamePrefix: partial matches") {
- val someOtherData = (50 to 150).map { i => (i, "finnegan" + i) }.toArray
- runTest(joinType, someData, someOtherData)
- runTest(joinType, someOtherData, someData)
- }
-
- test(s"$testNamePrefix: full matches") {
- val someSuperRelevantData = someData.map { case (k, v) => (k, "cooper" + v) }
- runTest(joinType, someData, someSuperRelevantData)
- runTest(joinType, someSuperRelevantData, someData)
- }
- }
-
- /**
- * Helper method to generate the expected output of a test based on the join type.
- */
- private def generateExpectedOutput(
- leftInput: Array[(Int, String)],
- rightInput: Array[(Int, String)],
- joinType: JoinType): Array[(Option[Int], Option[String], Option[Int], Option[String])] = {
- joinType match {
- case LeftOuter =>
- val rightInputMap = rightInput.toMap
- leftInput.map { case (k, v) =>
- val rightKey = rightInputMap.get(k).map { _ => k }
- val rightValue = rightInputMap.get(k)
- (Some(k), Some(v), rightKey, rightValue)
- }
-
- case RightOuter =>
- val leftInputMap = leftInput.toMap
- rightInput.map { case (k, v) =>
- val leftKey = leftInputMap.get(k).map { _ => k }
- val leftValue = leftInputMap.get(k)
- (leftKey, leftValue, Some(k), Some(v))
- }
-
- case FullOuter =>
- val leftInputMap = leftInput.toMap
- val rightInputMap = rightInput.toMap
- val leftOutput = leftInput.map { case (k, v) =>
- val rightKey = rightInputMap.get(k).map { _ => k }
- val rightValue = rightInputMap.get(k)
- (Some(k), Some(v), rightKey, rightValue)
- }
- val rightOutput = rightInput.map { case (k, v) =>
- val leftKey = leftInputMap.get(k).map { _ => k }
- val leftValue = leftInputMap.get(k)
- (leftKey, leftValue, Some(k), Some(v))
- }
- (leftOutput ++ rightOutput).distinct
-
- case other =>
- throw new IllegalArgumentException(s"Join type $other is not applicable")
- }
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
deleted file mode 100644
index 02ecb23d34..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ProjectNodeSuite.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
-import org.apache.spark.sql.types.{IntegerType, StringType}
-
-
-class ProjectNodeSuite extends LocalNodeTest {
- private val pieAttributes = Seq(
- AttributeReference("id", IntegerType)(),
- AttributeReference("age", IntegerType)(),
- AttributeReference("name", StringType)())
-
- private def testProject(inputData: Array[(Int, Int, String)] = Array.empty): Unit = {
- val inputNode = new DummyNode(pieAttributes, inputData)
- val columns = Seq[NamedExpression](inputNode.output(0), inputNode.output(2))
- val projectNode = new ProjectNode(conf, columns, inputNode)
- val expectedOutput = inputData.map { case (id, age, name) => (id, name) }
- val actualOutput = projectNode.collect().map { case row =>
- (row.getInt(0), row.getString(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testProject()
- }
-
- test("basic") {
- testProject((1 to 100).map { i => (i, i + 1, "pie" + i) }.toArray)
- }
-
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
deleted file mode 100644
index a3e83bbd51..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/SampleNodeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
-
-
-class SampleNodeSuite extends LocalNodeTest {
-
- private def testSample(withReplacement: Boolean): Unit = {
- val seed = 0L
- val lowerb = 0.0
- val upperb = 0.3
- val maybeOut = if (withReplacement) "" else "out"
- test(s"with$maybeOut replacement") {
- val inputData = (1 to 1000).map { i => (i, i) }.toArray
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val sampleNode = new SampleNode(conf, lowerb, upperb, withReplacement, seed, inputNode)
- val sampler =
- if (withReplacement) {
- new PoissonSampler[(Int, Int)](upperb - lowerb, useGapSamplingIfPossible = false)
- } else {
- new BernoulliCellSampler[(Int, Int)](lowerb, upperb)
- }
- sampler.setSeed(seed)
- val expectedOutput = sampler.sample(inputData.iterator).toArray
- val actualOutput = sampleNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
- }
-
- testSample(withReplacement = true)
- testSample(withReplacement = false)
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
deleted file mode 100644
index 42ebc7bfca..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/TakeOrderedAndProjectNodeSuite.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.local
-
-import scala.util.Random
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-
-
-class TakeOrderedAndProjectNodeSuite extends LocalNodeTest {
-
- private def testTakeOrderedAndProject(desc: Boolean): Unit = {
- val limit = 10
- val ascOrDesc = if (desc) "desc" else "asc"
- test(ascOrDesc) {
- val inputData = Random.shuffle((1 to 100).toList).map { i => (i, i) }.toArray
- val inputNode = new DummyNode(kvIntAttributes, inputData)
- val firstColumn = inputNode.output(0)
- val sortDirection = if (desc) Descending else Ascending
- val sortOrder = SortOrder(firstColumn, sortDirection)
- val takeOrderAndProjectNode = new TakeOrderedAndProjectNode(
- conf, limit, Seq(sortOrder), Some(Seq(firstColumn)), inputNode)
- val expectedOutput = inputData
- .map { case (k, _) => k }
- .sortBy { k => k * (if (desc) -1 else 1) }
- .take(limit)
- val actualOutput = takeOrderAndProjectNode.collect().map { row => row.getInt(0) }
- assert(actualOutput === expectedOutput)
- }
- }
-
- testTakeOrderedAndProject(desc = false)
- testTakeOrderedAndProject(desc = true)
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
deleted file mode 100644
index 666b0235c0..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/UnionNodeSuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.local
-
-
-class UnionNodeSuite extends LocalNodeTest {
-
- private def testUnion(inputData: Seq[Array[(Int, Int)]]): Unit = {
- val inputNodes = inputData.map { data =>
- new DummyNode(kvIntAttributes, data)
- }
- val unionNode = new UnionNode(conf, inputNodes)
- val expectedOutput = inputData.flatten
- val actualOutput = unionNode.collect().map { case row =>
- (row.getInt(0), row.getInt(1))
- }
- assert(actualOutput === expectedOutput)
- }
-
- test("empty") {
- testUnion(Seq(Array.empty))
- testUnion(Seq(Array.empty, Array.empty))
- }
-
- test("self") {
- val data = (1 to 100).map { i => (i, i) }.toArray
- testUnion(Seq(data))
- testUnion(Seq(data, data))
- testUnion(Seq(data, data, data))
- }
-
- test("basic") {
- val zero = Array.empty[(Int, Int)]
- val one = (1 to 100).map { i => (i, i) }.toArray
- val two = (50 to 150).map { i => (i, i) }.toArray
- val three = (800 to 900).map { i => (i, i) }.toArray
- testUnion(Seq(zero, one, two, three))
- }
-
-}