aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-05-13 12:47:48 -0700
committerReynold Xin <rxin@databricks.com>2015-05-13 12:47:48 -0700
commit213a6f30fee4a1c416ea76b678c71877fd36ef18 (patch)
tree10d0647738d67c28ebdb2f4748454430da778b21 /sql
parent7ff16e8abef9fbf4a4855e23c256b22e62e560a6 (diff)
downloadspark-213a6f30fee4a1c416ea76b678c71877fd36ef18.tar.gz
spark-213a6f30fee4a1c416ea76b678c71877fd36ef18.tar.bz2
spark-213a6f30fee4a1c416ea76b678c71877fd36ef18.zip
[SPARK-7551][DataFrame] support backticks for DataFrame attribute resolution
Author: Wenchen Fan <cloud0fan@outlook.com> Closes #6074 from cloud-fan/7551 and squashes the following commits: e6f579e [Wenchen Fan] allow space 2b86699 [Wenchen Fan] handle blank e218d99 [Wenchen Fan] address comments 54c4209 [Wenchen Fan] fix 7551
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala55
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala27
3 files changed, 82 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index dbb12d56f9..dba69659af 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -105,7 +105,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
/**
- * Optionally resolves the given string to a [[NamedExpression]] using the input from all child
+ * Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/
@@ -116,7 +116,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
resolve(nameParts, children.flatMap(_.output), resolver, throwErrors)
/**
- * Optionally resolves the given string to a [[NamedExpression]] based on the output of this
+ * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/
@@ -127,6 +127,57 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
resolve(nameParts, output, resolver, throwErrors)
/**
+ * Given an attribute name, split it to name parts by dot, but
+ * don't split the name parts quoted by backticks, for example,
+ * `ab.cd`.`efg` should be split into two parts "ab.cd" and "efg".
+ */
+ def resolveQuoted(
+ name: String,
+ resolver: Resolver): Option[NamedExpression] = {
+ resolve(parseAttributeName(name), resolver, true)
+ }
+
+ /**
+ * Internal method, used to split attribute name by dot with backticks rule.
+ * Backticks must appear in pairs, and the quoted string must be a complete name part,
+ * which means `ab..c`e.f is not allowed.
+ * Escape character is not supported now, so we can't use backtick inside name part.
+ */
+ private def parseAttributeName(name: String): Seq[String] = {
+ val e = new AnalysisException(s"syntax error in attribute name: $name")
+ val nameParts = scala.collection.mutable.ArrayBuffer.empty[String]
+ val tmp = scala.collection.mutable.ArrayBuffer.empty[Char]
+ var inBacktick = false
+ var i = 0
+ while (i < name.length) {
+ val char = name(i)
+ if (inBacktick) {
+ if (char == '`') {
+ inBacktick = false
+ if (i + 1 < name.length && name(i + 1) != '.') throw e
+ } else {
+ tmp += char
+ }
+ } else {
+ if (char == '`') {
+ if (tmp.nonEmpty) throw e
+ inBacktick = true
+ } else if (char == '.') {
+ if (tmp.isEmpty) throw e
+ nameParts += tmp.mkString
+ tmp.clear()
+ } else {
+ tmp += char
+ }
+ }
+ i += 1
+ }
+ if (tmp.isEmpty || inBacktick) throw e
+ nameParts += tmp.mkString
+ nameParts.toSeq
+ }
+
+ /**
* Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
*
* This assumes `name` has multiple parts, where the 1st part is a qualifier
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index c820a67357..4fd5105c27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -160,7 +160,7 @@ class DataFrame private[sql](
}
protected[sql] def resolve(colName: String): NamedExpression = {
- queryExecution.analyzed.resolve(colName.split("\\."), sqlContext.analyzer.resolver).getOrElse {
+ queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
@@ -168,7 +168,7 @@ class DataFrame private[sql](
protected[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
- queryExecution.analyzed.resolve(n.name.split("\\."), sqlContext.analyzer.resolver).get
+ queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 52aa1f6558..1d5f6b3aad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -459,6 +459,33 @@ class DataFrameSuite extends QueryTest {
assert(complexData.filter(complexData("m")(complexData("s")("value")) === 1).count() == 1)
}
+ test("SPARK-7551: support backticks for DataFrame attribute resolution") {
+ val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+ """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
+ checkAnswer(
+ df.select(df("`a.b`.c.`d..e`.`f`")),
+ Row(1)
+ )
+
+ val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+ """{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
+ checkAnswer(
+ df2.select(df2("`a b`.c.d e.f")),
+ Row(1)
+ )
+
+ def checkError(testFun: => Unit): Unit = {
+ val e = intercept[org.apache.spark.sql.AnalysisException] {
+ testFun
+ }
+ assert(e.getMessage.contains("syntax error in attribute name:"))
+ }
+ checkError(df("`abc.`c`"))
+ checkError(df("`abc`..d"))
+ checkError(df("`a`.b."))
+ checkError(df("`a.b`.c.`d"))
+ }
+
test("SPARK-7324 dropDuplicates") {
val testData = TestSQLContext.sparkContext.parallelize(
(2, 1, 2) :: (1, 1, 1) ::