diff options
author | Reynold Xin <rxin@databricks.com> | 2015-08-14 21:12:11 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-14 21:12:11 -0700 |
commit | 609ce3c07d4962a9242e488ad0ed48c183896802 (patch) | |
tree | 0659fae47b6a0b9cb9ef5f1ccdb8c6e16ebfeb54 | |
parent | 6c4fdbec33af287d24cd0995ecbd7191545d05c9 (diff) | |
download | spark-609ce3c07d4962a9242e488ad0ed48c183896802.tar.gz spark-609ce3c07d4962a9242e488ad0ed48c183896802.tar.bz2 spark-609ce3c07d4962a9242e488ad0ed48c183896802.zip |
[SPARK-9984] [SQL] Create local physical operator interface.
This pull request creates a new operator interface that is more similar to traditional database query iterators (with open/close/next/get).
These local operators are not currently used anywhere, but will become the basis for SPARK-9983 (local physical operators for query execution).
cc zsxwing
Author: Reynold Xin <rxin@databricks.com>
Closes #8212 from rxin/SPARK-9984.
4 files changed, 224 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala new file mode 100644 index 0000000000..a485a1a1d7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/FilterNode.scala @@ -0,0 +1,47 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + + +case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode { + + private[this] var predicate: (InternalRow) => Boolean = _ + + override def output: Seq[Attribute] = child.output + + override def open(): Unit = { + child.open() + predicate = GeneratePredicate.generate(condition, child.output) + } + + override def next(): Boolean = { + var found = false + while (child.next() && !found) { + found = predicate.apply(child.get()) + } + found + } + + override def get(): InternalRow = child.get() + + override def close(): Unit = child.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala new file mode 100644 index 0000000000..341c81438e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala @@ -0,0 +1,86 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.types.StructType + +/** + * A local physical operator, in the form of an iterator. + * + * Before consuming the iterator, open function must be called. + * After consuming the iterator, close function must be called. + */ +abstract class LocalNode extends TreeNode[LocalNode] { + + def output: Seq[Attribute] + + /** + * Initializes the iterator state. Must be called before calling `next()`. + * + * Implementations of this must also call the `open()` function of its children. + */ + def open(): Unit + + /** + * Advances the iterator to the next tuple. Returns true if there is at least one more tuple. + */ + def next(): Boolean + + /** + * Returns the current tuple. + */ + def get(): InternalRow + + /** + * Closes the iterator and releases all resources. + * + * Implementations of this must also call the `close()` function of its children. + */ + def close(): Unit + + /** + * Returns the content of the iterator from the beginning to the end in the form of a Scala Seq. + */ + def collect(): Seq[Row] = { + val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output)) + val result = new scala.collection.mutable.ArrayBuffer[Row] + open() + while (next()) { + result += converter.apply(get()).asInstanceOf[Row] + } + close() + result + } +} + + +abstract class LeafLocalNode extends LocalNode { + override def children: Seq[LocalNode] = Seq.empty +} + + +abstract class UnaryLocalNode extends LocalNode { + + def child: LocalNode + + override def children: Seq[LocalNode] = Seq(child) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala new file mode 100644 index 0000000000..e574d1473c --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ProjectNode.scala @@ -0,0 +1,42 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression} + + +case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode { + + private[this] var project: UnsafeProjection = _ + + override def output: Seq[Attribute] = projectList.map(_.toAttribute) + + override def open(): Unit = { + project = UnsafeProjection.create(projectList, child.output) + child.open() + } + + override def next(): Boolean = child.next() + + override def get(): InternalRow = { + project.apply(child.get()) + } + + override def close(): Unit = child.close() +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala new file mode 100644 index 0000000000..994de8afa9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/SeqScanNode.scala @@ -0,0 +1,49 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.local + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute + +/** + * An operator that scans some local data collection in the form of Scala Seq. + */ +case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode { + + private[this] var iterator: Iterator[InternalRow] = _ + private[this] var currentRow: InternalRow = _ + + override def open(): Unit = { + iterator = data.iterator + } + + override def next(): Boolean = { + if (iterator.hasNext) { + currentRow = iterator.next() + true + } else { + false + } + } + + override def get(): InternalRow = currentRow + + override def close(): Unit = { + // Do nothing + } +} |