diff options
author | Michael Armbrust <michael@databricks.com> | 2015-02-16 12:32:56 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-16 12:32:56 -0800 |
commit | 104b2c45805ce0a9c86e2823f402de6e9f0aee81 (patch) | |
tree | af1fbfad7997e5560c730f5ff3058d91cd085c5a /sql/catalyst | |
parent | 275a0c08134dea1896eab73a8e017256900fb1db (diff) | |
download | spark-104b2c45805ce0a9c86e2823f402de6e9f0aee81.tar.gz spark-104b2c45805ce0a9c86e2823f402de6e9f0aee81.tar.bz2 spark-104b2c45805ce0a9c86e2823f402de6e9f0aee81.zip |
[SQL] Initial support for reporting location of error in sql string
Author: Michael Armbrust <michael@databricks.com>
Closes #4587 from marmbrus/position and squashes the following commits:
0810052 [Michael Armbrust] fix tests
395c019 [Michael Armbrust] Merge remote-tracking branch 'marmbrus/position' into position
e155dce [Michael Armbrust] more errors
f3efa51 [Michael Armbrust] Update AnalysisException.scala
d45ff60 [Michael Armbrust] [SQL] Initial support for reporting location of error in sql string
Diffstat (limited to 'sql/catalyst')
6 files changed, 111 insertions, 15 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index 871d560b9d..15add84878 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -17,7 +17,22 @@ package org.apache.spark.sql +import org.apache.spark.annotation.DeveloperApi + /** + * :: DeveloperApi :: * Thrown when a query fails to analyze, usually because the query itself is invalid. */ -class AnalysisException(message: String) extends Exception(message) with Serializable +@DeveloperApi +class AnalysisException protected[sql] ( + val message: String, + val line: Option[Int] = None, + val startPosition: Option[Int] = None) + extends Exception with Serializable { + + override def getMessage: String = { + val lineAnnotation = line.map(l => s" line $l").getOrElse("") + val positionAnnotation = startPosition.map(p => s" pos $p").getOrElse("") + s"$message;$lineAnnotation$positionAnnotation" + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 58a7003977..aa4320bd58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -85,7 +85,7 @@ class Analyzer(catalog: Catalog, operator transformExpressionsUp { case a: Attribute if !a.resolved => val from = operator.inputSet.map(_.name).mkString(", ") - failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") + a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") case c: Cast if !c.resolved => failAnalysis( @@ -246,12 +246,21 @@ class Analyzer(catalog: Catalog, * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ object ResolveRelations extends Rule[LogicalPlan] { + def getTable(u: UnresolvedRelation) = { + try { + catalog.lookupRelation(u.tableIdentifier, u.alias) + } catch { + case _: NoSuchTableException => + u.failAnalysis(s"no such table ${u.tableIdentifier}") + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) => + case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _) => i.copy( - table = EliminateSubQueries(catalog.lookupRelation(tableIdentifier, alias))) - case UnresolvedRelation(tableIdentifier, alias) => - catalog.lookupRelation(tableIdentifier, alias) + table = EliminateSubQueries(getTable(u))) + case u: UnresolvedRelation => + getTable(u) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index f57eab2460..bf97215ee6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -22,6 +22,12 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} /** + * Thrown by a catalog when a table cannot be found. The analzyer will rethrow the exception + * as an AnalysisException with the correct position information. + */ +class NoSuchTableException extends Exception + +/** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ trait Catalog { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala index 5dc9d0e566..e95f19e69e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.trees.TreeNode + /** * Provides a logical query plan [[Analyzer]] and supporting classes for performing analysis. * Analysis consists of translating [[UnresolvedAttribute]]s and [[UnresolvedRelation]]s @@ -32,4 +35,11 @@ package object analysis { val caseInsensitiveResolution = (a: String, b: String) => a.equalsIgnoreCase(b) val caseSensitiveResolution = (a: String, b: String) => a == b + + implicit class AnalysisErrorAt(t: TreeNode[_]) { + /** Fails the analysis at the point where a specific tree node was parsed. */ + def failAnalysis(msg: String) = { + throw new AnalysisException(msg, t.origin.line, t.origin.startPosition) + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e0930b056d..109671bdca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -22,9 +22,42 @@ import org.apache.spark.sql.catalyst.errors._ /** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */ private class MutableInt(var i: Int) +case class Origin( + line: Option[Int] = None, + startPosition: Option[Int] = None) + +/** + * Provides a location for TreeNodes to ask about the context of their origin. For example, which + * line of code is currently being parsed. + */ +object CurrentOrigin { + private val value = new ThreadLocal[Origin]() { + override def initialValue: Origin = Origin() + } + + def get = value.get() + def set(o: Origin) = value.set(o) + + def reset() = value.set(Origin()) + + def setPosition(line: Int, start: Int) = { + value.set( + value.get.copy(line = Some(line), startPosition = Some(start))) + } + + def withOrigin[A](o: Origin)(f: => A): A = { + set(o) + val ret = try f finally { reset() } + reset() + ret + } +} + abstract class TreeNode[BaseType <: TreeNode[BaseType]] { self: BaseType with Product => + val origin = CurrentOrigin.get + /** Returns a Seq of the children of this node */ def children: Seq[BaseType] @@ -150,7 +183,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { * @param rule the function used to transform this nodes children */ def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = { - val afterRule = rule.applyOrElse(this, identity[BaseType]) + val afterRule = CurrentOrigin.withOrigin(origin) { + rule.applyOrElse(this, identity[BaseType]) + } + // Check if unchanged and then possibly return old copy to avoid gc churn. if (this fastEquals afterRule) { transformChildrenDown(rule) @@ -210,9 +246,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = { val afterRuleOnChildren = transformChildrenUp(rule); if (this fastEquals afterRuleOnChildren) { - rule.applyOrElse(this, identity[BaseType]) + CurrentOrigin.withOrigin(origin) { + rule.applyOrElse(this, identity[BaseType]) + } } else { - rule.applyOrElse(afterRuleOnChildren, identity[BaseType]) + CurrentOrigin.withOrigin(origin) { + rule.applyOrElse(afterRuleOnChildren, identity[BaseType]) + } } } @@ -268,12 +308,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { */ def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") { try { - // Skip no-arg constructors that are just there for kryo. - val defaultCtor = getClass.getConstructors.find(_.getParameterTypes.size != 0).head - if (otherCopyArgs.isEmpty) { - defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] - } else { - defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] + CurrentOrigin.withOrigin(origin) { + // Skip no-arg constructors that are just there for kryo. + val defaultCtor = getClass.getConstructors.find(_.getParameterTypes.size != 0).head + if (otherCopyArgs.isEmpty) { + defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] + } else { + defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] + } } } catch { case e: java.lang.IllegalArgumentException => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index cdb843f959..e7ce92a216 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -104,4 +104,18 @@ class TreeNodeSuite extends FunSuite { assert(actual === Dummy(None)) } + test("preserves origin") { + CurrentOrigin.setPosition(1,1) + val add = Add(Literal(1), Literal(1)) + CurrentOrigin.reset() + + val transformed = add transform { + case Literal(1, _) => Literal(2) + } + + assert(transformed.origin.line.isDefined) + assert(transformed.origin.startPosition.isDefined) + } + + } |