aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-05-14 00:14:59 +0800
committerCheng Lian <lian@databricks.com>2015-05-14 00:14:59 +0800
commit0da254fb2903c01e059fa7d0dc81df5740312b35 (patch)
treec99831581e4987626b950f0df96ce36a26e0ee53 /sql/core
parentaa6ba3f2166edcc8bcda3abc70482fa8605e83b7 (diff)
downloadspark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.gz
spark-0da254fb2903c01e059fa7d0dc81df5740312b35.tar.bz2
spark-0da254fb2903c01e059fa7d0dc81df5740312b35.zip
[SPARK-6734] [SQL] Add UDTF.close support in Generate
Some third-party UDTF extensions generate additional rows in the "GenericUDTF.close()" method, which is supported / documented by Hive. https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF However, Spark SQL ignores the "GenericUDTF.close()", and it causes bug while porting job from Hive to Spark SQL. Author: Cheng Hao <hao.cheng@intel.com> Closes #5383 from chenghao-intel/udtf_close and squashes the following commits: 98b4e4b [Cheng Hao] Support UDTF.close
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala38
1 files changed, 28 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
index 08d9079335..dd02c1f457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala
@@ -22,6 +22,18 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
/**
+ * For lazy computing, be sure the generator.terminate() called in the very last
+ * TODO reusing the CompletionIterator?
+ */
+private[execution] sealed case class LazyIterator(func: () => TraversableOnce[Row])
+ extends Iterator[Row] {
+
+ lazy val results = func().toIterator
+ override def hasNext: Boolean = results.hasNext
+ override def next(): Row = results.next()
+}
+
+/**
* :: DeveloperApi ::
* Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
@@ -47,27 +59,33 @@ case class Generate(
val boundGenerator = BindReferences.bindReference(generator, child.output)
protected override def doExecute(): RDD[Row] = {
+ // boundGenerator.terminate() should be triggered after all of the rows in the partition
if (join) {
child.execute().mapPartitions { iter =>
- val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))
- // Used to produce rows with no matches when outer = true.
- val outerProjection =
- newProjection(child.output ++ nullValues, child.output)
-
- val joinProjection = newProjection(output, output)
+ val generatorNullRow = Row.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow
- iter.flatMap {row =>
+ iter.flatMap { row =>
+ // we should always set the left (child output)
+ joinedRow.withLeft(row)
val outputRows = boundGenerator.eval(row)
if (outer && outputRows.isEmpty) {
- outerProjection(row) :: Nil
+ joinedRow.withRight(generatorNullRow) :: Nil
} else {
- outputRows.map(or => joinProjection(joinedRow(row, or)))
+ outputRows.map(or => joinedRow.withRight(or))
}
+ } ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
+ // we leave the left side as the last element of its child output
+ // keep it the same as Hive does
+ joinedRow.withRight(row)
}
}
} else {
- child.execute().mapPartitions(iter => iter.flatMap(row => boundGenerator.eval(row)))
+ child.execute().mapPartitions { iter =>
+ iter.flatMap(row => boundGenerator.eval(row)) ++
+ LazyIterator(() => boundGenerator.terminate())
+ }
}
}
}
+