aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-12 15:19:19 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-12 15:19:19 -0800
commitee04a8b19be8330bfc48f470ef365622162c915f (patch)
tree9224e8284d7e991f9f310fe1d1e8d4299908497f /sql/catalyst
parentc352ffbdb9112714c176a747edff6115e9369e58 (diff)
downloadspark-ee04a8b19be8330bfc48f470ef365622162c915f.tar.gz
spark-ee04a8b19be8330bfc48f470ef365622162c915f.tar.bz2
spark-ee04a8b19be8330bfc48f470ef365622162c915f.zip
[SPARK-5573][SQL] Add explode to dataframes
Author: Michael Armbrust <michael@databricks.com> Closes #4546 from marmbrus/explode and squashes the following commits: eefd33a [Michael Armbrust] whitespace a8d496c [Michael Armbrust] Merge remote-tracking branch 'apache/master' into explode 4af740e [Michael Armbrust] Merge remote-tracking branch 'origin/master' into explode dc86a5c [Michael Armbrust] simple version d633d01 [Michael Armbrust] add scala specific 950707a [Michael Armbrust] fix comments ba8854c [Michael Armbrust] [SPARK-5573][SQL] Add explode to dataframes
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala19
1 files changed, 19 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 43b6482c01..0983d274de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -74,6 +74,25 @@ abstract class Generator extends Expression {
}
/**
+ * A generator that produces its output using the provided lambda function.
+ */
+case class UserDefinedGenerator(
+ schema: Seq[Attribute],
+ function: Row => TraversableOnce[Row],
+ children: Seq[Expression])
+ extends Generator{
+
+ override protected def makeOutput(): Seq[Attribute] = schema
+
+ override def eval(input: Row): TraversableOnce[Row] = {
+ val inputRow = new InterpretedProjection(children)
+ function(inputRow(input))
+ }
+
+ override def toString = s"UserDefinedGenerator(${children.mkString(",")})"
+}
+
+/**
* Given an input array produces a sequence of rows for each value in the array.
*/
case class Explode(attributeNames: Seq[String], child: Expression)