aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-07-04 01:57:45 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-04 01:57:45 +0800
commit88134e736829f5f93a82879c08cb191f175ff8af (patch)
treeb6795f4b148b595c2c1aedc2d61fd9f0bd04c130 /sql/catalyst/src/main
parent54b27c1797fcd32b3f3e9d44e1a149ae396a61e6 (diff)
downloadspark-88134e736829f5f93a82879c08cb191f175ff8af.tar.gz
spark-88134e736829f5f93a82879c08cb191f175ff8af.tar.bz2
spark-88134e736829f5f93a82879c08cb191f175ff8af.zip
[SPARK-16288][SQL] Implement inline table generating function
## What changes were proposed in this pull request? This PR implements `inline` table generating function. ## How was this patch tested? Pass the Jenkins tests with new testcase. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13976 from dongjoon-hyun/SPARK-16288.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala35
2 files changed, 36 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index e7f335f4fb..021bec7f5f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -165,6 +165,7 @@ object FunctionRegistry {
expression[Explode]("explode"),
expression[Greatest]("greatest"),
expression[If]("if"),
+ expression[Inline]("inline"),
expression[IsNaN]("isnan"),
expression[IfNull]("ifnull"),
expression[IsNull]("isnull"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 4e91cc5aec..99b97c8ea2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -195,3 +195,38 @@ case class Explode(child: Expression) extends ExplodeBase(child, position = fals
extended = "> SELECT _FUNC_(array(10,20));\n 0\t10\n 1\t20")
// scalastyle:on line.size.limit
case class PosExplode(child: Expression) extends ExplodeBase(child, position = true)
+
+/**
+ * Explodes an array of structs into a table.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_(a) - Explodes an array of structs into a table.",
+ extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n [1,a]\n [2,b]")
+case class Inline(child: Expression) extends UnaryExpression with Generator with CodegenFallback {
+
+ override def children: Seq[Expression] = child :: Nil
+
+ override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
+ case ArrayType(et, _) if et.isInstanceOf[StructType] =>
+ TypeCheckResult.TypeCheckSuccess
+ case _ =>
+ TypeCheckResult.TypeCheckFailure(
+ s"input to function $prettyName should be array of struct type, not ${child.dataType}")
+ }
+
+ override def elementSchema: StructType = child.dataType match {
+ case ArrayType(et : StructType, _) => et
+ }
+
+ private lazy val numFields = elementSchema.fields.length
+
+ override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
+ val inputArray = child.eval(input).asInstanceOf[ArrayData]
+ if (inputArray == null) {
+ Nil
+ } else {
+ for (i <- 0 until inputArray.numElements())
+ yield inputArray.getStruct(i, numFields)
+ }
+ }
+}