aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-14 21:12:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-14 21:12:11 -0700
commit609ce3c07d4962a9242e488ad0ed48c183896802 (patch)
tree0659fae47b6a0b9cb9ef5f1ccdb8c6e16ebfeb54
parent6c4fdbec33af287d24cd0995ecbd7191545d05c9 (diff)
downloadspark-609ce3c07d4962a9242e488ad0ed48c183896802.tar.gz
spark-609ce3c07d4962a9242e488ad0ed48c183896802.tar.bz2
spark-609ce3c07d4962a9242e488ad0ed48c183896802.zip
[SPARK-9984] [SQL] Create local physical operator interface.
This pull request creates a new operator interface that is more similar to traditional database query iterators (with open/close/next/get). These local operators are not currently used anywhere, but will become the basis for SPARK-9983 (local physical operators for query execution). cc zsxwing Author: Reynold Xin <rxin@databricks.com> Closes #8212 from rxin/SPARK-9984.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala86
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala49
4 files changed, 224 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
new file mode 100644
index 0000000000..a485a1a1d7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala
@@ -0,0 +1,47 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+
+
+case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {
+
+ private[this] var predicate: (InternalRow) => Boolean = _
+
+ override def output: Seq[Attribute] = child.output
+
+ override def open(): Unit = {
+ child.open()
+ predicate = GeneratePredicate.generate(condition, child.output)
+ }
+
+ override def next(): Boolean = {
+ var found = false
+ while (child.next() && !found) {
+ found = predicate.apply(child.get())
+ }
+ found
+ }
+
+ override def get(): InternalRow = child.get()
+
+ override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
new file mode 100644
index 0000000000..341c81438e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -0,0 +1,86 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A local physical operator, in the form of an iterator.
+ *
+ * Before consuming the iterator, open function must be called.
+ * After consuming the iterator, close function must be called.
+ */
+abstract class LocalNode extends TreeNode[LocalNode] {
+
+ def output: Seq[Attribute]
+
+ /**
+ * Initializes the iterator state. Must be called before calling `next()`.
+ *
+ * Implementations of this must also call the `open()` function of its children.
+ */
+ def open(): Unit
+
+ /**
+ * Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
+ */
+ def next(): Boolean
+
+ /**
+ * Returns the current tuple.
+ */
+ def get(): InternalRow
+
+ /**
+ * Closes the iterator and releases all resources.
+ *
+ * Implementations of this must also call the `close()` function of its children.
+ */
+ def close(): Unit
+
+ /**
+ * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
+ */
+ def collect(): Seq[Row] = {
+ val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
+ val result = new scala.collection.mutable.ArrayBuffer[Row]
+ open()
+ while (next()) {
+ result += converter.apply(get()).asInstanceOf[Row]
+ }
+ close()
+ result
+ }
+}
+
+
+abstract class LeafLocalNode extends LocalNode {
+ override def children: Seq[LocalNode] = Seq.empty
+}
+
+
+abstract class UnaryLocalNode extends LocalNode {
+
+ def child: LocalNode
+
+ override def children: Seq[LocalNode] = Seq(child)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
new file mode 100644
index 0000000000..e574d1473c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression}
+
+
+case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode {
+
+ private[this] var project: UnsafeProjection = _
+
+ override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+
+ override def open(): Unit = {
+ project = UnsafeProjection.create(projectList, child.output)
+ child.open()
+ }
+
+ override def next(): Boolean = child.next()
+
+ override def get(): InternalRow = {
+ project.apply(child.get())
+ }
+
+ override def close(): Unit = child.close()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
new file mode 100644
index 0000000000..994de8afa9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala
@@ -0,0 +1,49 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * An operator that scans some local data collection in the form of Scala Seq.
+ */
+case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode {
+
+ private[this] var iterator: Iterator[InternalRow] = _
+ private[this] var currentRow: InternalRow = _
+
+ override def open(): Unit = {
+ iterator = data.iterator
+ }
+
+ override def next(): Boolean = {
+ if (iterator.hasNext) {
+ currentRow = iterator.next()
+ true
+ } else {
+ false
+ }
+ }
+
+ override def get(): InternalRow = currentRow
+
+ override def close(): Unit = {
+ // Do nothing
+ }
+}