diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-07-04 01:57:45 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-04 01:57:45 +0800 |
commit | 88134e736829f5f93a82879c08cb191f175ff8af (patch) | |
tree | b6795f4b148b595c2c1aedc2d61fd9f0bd04c130 /sql/catalyst/src/main | |
parent | 54b27c1797fcd32b3f3e9d44e1a149ae396a61e6 (diff) | |
download | spark-88134e736829f5f93a82879c08cb191f175ff8af.tar.gz spark-88134e736829f5f93a82879c08cb191f175ff8af.tar.bz2 spark-88134e736829f5f93a82879c08cb191f175ff8af.zip |
[SPARK-16288][SQL] Implement inline table generating function
## What changes were proposed in this pull request?
This PR implements `inline` table generating function.
## How was this patch tested?
Pass the Jenkins tests with new testcase.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13976 from dongjoon-hyun/SPARK-16288.
Diffstat (limited to 'sql/catalyst/src/main')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 1 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala | 35 |
2 files changed, 36 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index e7f335f4fb..021bec7f5f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -165,6 +165,7 @@ object FunctionRegistry { expression[Explode]("explode"), expression[Greatest]("greatest"), expression[If]("if"), + expression[Inline]("inline"), expression[IsNaN]("isnan"), expression[IfNull]("ifnull"), expression[IsNull]("isnull"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 4e91cc5aec..99b97c8ea2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -195,3 +195,38 @@ case class Explode(child: Expression) extends ExplodeBase(child, position = fals extended = "> SELECT _FUNC_(array(10,20));\n 0\t10\n 1\t20") // scalastyle:on line.size.limit case class PosExplode(child: Expression) extends ExplodeBase(child, position = true) + +/** + * Explodes an array of structs into a table. + */ +@ExpressionDescription( + usage = "_FUNC_(a) - Explodes an array of structs into a table.", + extended = "> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));\n [1,a]\n [2,b]") +case class Inline(child: Expression) extends UnaryExpression with Generator with CodegenFallback { + + override def children: Seq[Expression] = child :: Nil + + override def checkInputDataTypes(): TypeCheckResult = child.dataType match { + case ArrayType(et, _) if et.isInstanceOf[StructType] => + TypeCheckResult.TypeCheckSuccess + case _ => + TypeCheckResult.TypeCheckFailure( + s"input to function $prettyName should be array of struct type, not ${child.dataType}") + } + + override def elementSchema: StructType = child.dataType match { + case ArrayType(et : StructType, _) => et + } + + private lazy val numFields = elementSchema.fields.length + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { + val inputArray = child.eval(input).asInstanceOf[ArrayData] + if (inputArray == null) { + Nil + } else { + for (i <- 0 until inputArray.numElements()) + yield inputArray.getStruct(i, numFields) + } + } +} |