aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-03-16 13:11:11 -0700
committerYin Huai <yhuai@databricks.com>2016-03-16 13:11:11 -0700
commitc4bd57602c0b14188d364bb475631bf473d25082 (patch)
treed5c081e53719b8305f1fcb0061b2454462fb3d25
parent1d1de28a3c3c3a4bc37dc7565b9178a712df493a (diff)
downloadspark-c4bd57602c0b14188d364bb475631bf473d25082.tar.gz
spark-c4bd57602c0b14188d364bb475631bf473d25082.tar.bz2
spark-c4bd57602c0b14188d364bb475631bf473d25082.zip
[SPARK-12721][SQL] SQL Generation for Script Transformation
#### What changes were proposed in this pull request? This PR is to convert to SQL from analyzed logical plans containing operator `ScriptTransformation`. For example, below is the SQL containing `Transform` ``` SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2 ``` Its logical plan is like ``` ScriptTransformation [a#210L,b#211L,c#212L,d#213L], cat, [key#208,value#209], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true) +- SubqueryAlias parquet_t2 +- Relation[a#210L,b#211L,c#212L,d#213L] ParquetRelation ``` The generated SQL will be like ``` SELECT TRANSFORM (`parquet_t2`.`a`, `parquet_t2`.`b`, `parquet_t2`.`c`, `parquet_t2`.`d`) USING 'cat' AS (`key` string, `value` string) FROM `default`.`parquet_t2` ``` #### How was this patch tested? Seven test cases are added to `LogicalPlanToSQLSuite`. Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #11503 from gatorsmile/transformToSQL.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala48
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala57
3 files changed, 134 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 3bc8e9a5e0..f3446a364b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
/**
@@ -184,6 +185,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
p.partitionExpressions.map(_.sql).mkString(", ")
)
+ case p: ScriptTransformation =>
+ scriptTransformationToSQL(p)
+
case OneRowRelation =>
""
@@ -209,6 +213,31 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}
+ private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
+ val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
+ val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+ throw new UnsupportedOperationException(
+ s"unsupported row format ${ioSchema.inputRowFormat}"))
+ val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+ throw new UnsupportedOperationException(
+ s"unsupported row format ${ioSchema.outputRowFormat}"))
+
+ val outputSchema = plan.output.map { attr =>
+ s"${attr.sql} ${attr.dataType.simpleString}"
+ }.mkString(", ")
+
+ build(
+ "SELECT TRANSFORM",
+ "(" + plan.input.map(_.sql).mkString(", ") + ")",
+ inputRowFormatSQL,
+ s"USING \'${plan.script}\'",
+ "AS (" + outputSchema + ")",
+ outputRowFormatSQL,
+ if (plan.child == OneRowRelation) "" else "FROM",
+ toSQL(plan.child)
+ )
+ }
+
private def aggregateToSQL(plan: Aggregate): String = {
val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ")
build(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3b53716898..62e7c1223c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -400,4 +400,52 @@ case class HiveScriptIOSchema (
instance
}
}
+
+ def inputRowFormatSQL: Option[String] =
+ getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+ def outputRowFormatSQL: Option[String] =
+ getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+ /**
+ * Get the row format specification
+ * Note:
+ * 1. Changes are needed when readerClause and writerClause are supported.
+ * 2. Changes are needed when "ESCAPED BY" is supported.
+ */
+ private def getRowFormatSQL(
+ rowFormat: Seq[(String, String)],
+ serdeClass: Option[String],
+ serdeProps: Seq[(String, String)]): Option[String] = {
+ if (schemaLess) return Some("")
+
+ val rowFormatDelimited =
+ rowFormat.map {
+ case ("TOK_TABLEROWFORMATFIELD", value) =>
+ "FIELDS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+ "COLLECTION ITEMS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+ "MAP KEYS TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATLINES", value) =>
+ "LINES TERMINATED BY " + value
+ case ("TOK_TABLEROWFORMATNULL", value) =>
+ "NULL DEFINED AS " + value
+ case o => return None
+ }
+
+ val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+ val serdePropsSQL =
+ if (serdeClass.nonEmpty) {
+ val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+ if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+ } else {
+ ""
+ }
+ if (rowFormat.nonEmpty) {
+ Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+ } else {
+ Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index f02ecb48d5..ca46c229f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -383,6 +383,63 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
}
+ test("script transformation - schemaless") {
+ checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2")
+ checkHiveQl("SELECT TRANSFORM (*) USING 'cat' FROM parquet_t2")
+ }
+
+ test("script transformation - alias list") {
+ checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' AS (d1, d2, d3, d4) FROM parquet_t2")
+ }
+
+ test("script transformation - alias list with type") {
+ checkHiveQl(
+ """FROM
+ |(FROM parquet_t1 SELECT TRANSFORM(key, value) USING 'cat' AS (thing1 int, thing2 string)) t
+ |SELECT thing1 + 1
+ """.stripMargin)
+ }
+
+ test("script transformation - row format delimited clause with only one format property") {
+ checkHiveQl(
+ """SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+ |USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("script transformation - row format delimited clause with multiple format properties") {
+ checkHiveQl(
+ """SELECT TRANSFORM (key)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t'
+ |USING 'cat' AS (tKey)
+ |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t'
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("script transformation - row format serde clauses with SERDEPROPERTIES") {
+ checkHiveQl(
+ """SELECT TRANSFORM (key, value)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |WITH SERDEPROPERTIES('field.delim' = '|')
+ |USING 'cat' AS (tKey, tValue)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |WITH SERDEPROPERTIES('field.delim' = '|')
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
+ test("script transformation - row format serde clauses without SERDEPROPERTIES") {
+ checkHiveQl(
+ """SELECT TRANSFORM (key, value)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |USING 'cat' AS (tKey, tValue)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+ |FROM parquet_t1
+ """.stripMargin)
+ }
+
test("plans with non-SQL expressions") {
sqlContext.udf.register("foo", (_: Int) * 2)
intercept[UnsupportedOperationException](new SQLBuilder(sql("SELECT foo(id) FROM t0")).toSQL)