aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-02 19:01:47 -0800
committerReynold Xin <rxin@databricks.com>2015-02-02 19:01:47 -0800
commit554403fd913685da879cf6a280c58a9fad19448a (patch)
treeb3a63382e7385fa1480b54707b348b0bde02190d /sql/hive
parenteccb9fbb2d1bf6f7c65fb4f017e9205bb3034ec6 (diff)
downloadspark-554403fd913685da879cf6a280c58a9fad19448a.tar.gz
spark-554403fd913685da879cf6a280c58a9fad19448a.tar.bz2
spark-554403fd913685da879cf6a280c58a9fad19448a.zip
[SQL] Improve DataFrame API error reporting
1. Throw UnsupportedOperationException if a Column is not computable. 2. Perform eager analysis on DataFrame so we can catch errors when they happen (not when an action is run). Author: Reynold Xin <rxin@databricks.com> Author: Davies Liu <davies@databricks.com> Closes #4296 from rxin/col-computability and squashes the following commits: 6527b86 [Reynold Xin] Merge pull request #8 from davies/col-computability fd92bc7 [Reynold Xin] Merge branch 'master' into col-computability f79034c [Davies Liu] fix python tests 5afe1ff [Reynold Xin] Fix scala test. 17f6bae [Reynold Xin] Various fixes. b932e86 [Reynold Xin] Added eager analysis for error reporting. e6f00b8 [Reynold Xin] [SQL][API] ComputableColumn vs IncomputableColumn
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala13
2 files changed, 9 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index b746942cb1..5efc3b1e30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -72,7 +72,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
if (conf.dialect == "sql") {
super.sql(substituted)
} else if (conf.dialect == "hiveql") {
- new DataFrame(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(substituted)))
+ DataFrame(this,
+ ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 83244ce1e3..fa997288a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.hive
+import org.apache.spark.sql.catalyst.expressions.Row
+
import scala.collection.JavaConversions._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{Column, DataFrame, SQLContext, Strategy}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
@@ -29,7 +31,6 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.CreateTableUsing
@@ -56,14 +57,14 @@ private[hive] trait HiveStrategies {
@Experimental
object ParquetConversion extends Strategy {
implicit class LogicalPlanHacks(s: DataFrame) {
- def lowerCase = new DataFrame(s.sqlContext, s.logicalPlan)
+ def lowerCase = DataFrame(s.sqlContext, s.logicalPlan)
def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
- new DataFrame(
+ DataFrame(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
@@ -96,13 +97,13 @@ private[hive] trait HiveStrategies {
// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
- val unresolvedOtherPredicates = new Column(otherPredicates.map(_ transform {
+ val unresolvedOtherPredicates = Column(otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
}).reduceOption(And).getOrElse(Literal(true)))
val unresolvedProjection: Seq[Column] = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
- }).map(new Column(_))
+ }).map(Column(_))
try {
if (relation.hiveQlTable.isPartitioned) {