aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJacky Li <jacky.likun@huawei.com>2015-05-08 15:25:54 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-08 15:25:54 -0700
commit6dad76e5eba3c2925bfc9d142f31f7c2dc649886 (patch)
tree264c07242b5a7b555a7f58b14c2361f2df49db1d /sql
parent90527f560462cc2d693176bd961b02767e460e06 (diff)
downloadspark-6dad76e5eba3c2925bfc9d142f31f7c2dc649886.tar.gz
spark-6dad76e5eba3c2925bfc9d142f31f7c2dc649886.tar.bz2
spark-6dad76e5eba3c2925bfc9d142f31f7c2dc649886.zip
[SPARK-4699] [SQL] Make caseSensitive configurable in spark sql analyzer
based on #3558 Author: Jacky Li <jacky.likun@huawei.com> Author: wangfei <wangfei1@huawei.com> Author: scwf <wangfei1@huawei.com> Closes #5806 from scwf/case and squashes the following commits: cd51712 [wangfei] fix compile d4b724f [wangfei] address michael's comment af512c7 [wangfei] fix conflicts 4ef1be7 [wangfei] fix conflicts 269cf21 [scwf] fix conflicts b73df6c [scwf] style issue 9e11752 [scwf] improve SimpleCatalystConf b35529e [scwf] minor style a3f7659 [scwf] remove unsed imports 2a56515 [scwf] fix conflicts 6db4bf5 [scwf] also fix for HiveContext 7fc4a98 [scwf] fix test case d5a9933 [wangfei] fix style eee75ba [wangfei] fix EmptyConf 6ef31cf [wangfei] revert pom changes 5d7c456 [wangfei] set CASE_SENSITIVE false in TestHive 966e719 [wangfei] set CASE_SENSITIVE false in hivecontext fd30e25 [wangfei] added override 69b3b70 [wangfei] fix AnalysisSuite 5472b08 [wangfei] fix compile issue 56034ca [wangfei] fix conflicts and improve for catalystconf 664d1e9 [Jacky Li] Merge branch 'master' of https://github.com/apache/spark into case 12eca9a [Jacky Li] solve conflict with master 39e369c [Jacky Li] fix confilct after DataFrame PR dee56e9 [Jacky Li] fix test case failure 05b09a3 [Jacky Li] fix conflict base on the latest master branch 73c16b1 [Jacky Li] fix bug in sql/hive 9bf4cc7 [Jacky Li] fix bug in catalyst 005c56d [Jacky Li] make SQLContext caseSensitivity configurable 6332e0f [Jacky Li] fix bug fcbf0d9 [Jacky Li] fix scalastyle check e7bca31 [Jacky Li] make caseSensitive configuration in Analyzer and Catalog 91b1b96 [Jacky Li] make caseSensitive configurable in Analyzer f57f15c [Jacky Li] add testcase 578d167 [Jacky Li] make caseSensitive configurable
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala20
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala14
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala20
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala32
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala4
15 files changed, 127 insertions, 70 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
new file mode 100644
index 0000000000..3f351b07b3
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+private[spark] trait CatalystConf {
+ def caseSensitiveAnalysis: Boolean
+}
+
+/**
+ * A trivial conf that is empty. Used for testing when all
+ * relations are already filled in and the analyser needs only to resolve attribute references.
+ */
+object EmptyConf extends CatalystConf {
+ override def caseSensitiveAnalysis: Boolean = {
+ throw new UnsupportedOperationException
+ }
+}
+
+/** A CatalystConf that can be used for local testing. */
+case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ecbac57ea4..a4c61149dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -19,19 +19,21 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.util.collection.OpenHashSet
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.OpenHashSet
/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyzer needs only to resolve attribute
* references.
*/
-object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
+object SimpleAnalyzer
+ extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true))
/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
@@ -41,11 +43,17 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
class Analyzer(
catalog: Catalog,
registry: FunctionRegistry,
- caseSensitive: Boolean,
+ conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion with CheckAnalysis {
- val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
+ def resolver: Resolver = {
+ if (conf.caseSensitiveAnalysis) {
+ caseSensitiveResolution
+ } else {
+ caseInsensitiveResolution
+ }
+ }
val fixedPoint = FixedPoint(maxIterations)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 18c24b6519..208021c421 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.EmptyConf
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
/**
@@ -34,7 +36,7 @@ class NoSuchDatabaseException extends Exception
*/
trait Catalog {
- def caseSensitive: Boolean
+ val conf: CatalystConf
def tableExists(tableIdentifier: Seq[String]): Boolean
@@ -57,10 +59,10 @@ trait Catalog {
def unregisterAllTables(): Unit
protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = {
- if (!caseSensitive) {
- tableIdentifier.map(_.toLowerCase)
- } else {
+ if (conf.caseSensitiveAnalysis) {
tableIdentifier
+ } else {
+ tableIdentifier.map(_.toLowerCase)
}
}
@@ -78,7 +80,7 @@ trait Catalog {
}
}
-class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
+class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
override def registerTable(
@@ -164,10 +166,10 @@ trait OverrideCatalog extends Catalog {
}
abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- val dbName = if (!caseSensitive) {
- if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
- } else {
+ val dbName = if (conf.caseSensitiveAnalysis) {
databaseName
+ } else {
+ if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
}
val temporaryTables = overrides.filter {
@@ -207,7 +209,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {
- override val caseSensitive: Boolean = true
+ override val conf: CatalystConf = EmptyConf
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
throw new UnsupportedOperationException
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 971e1ff5ec..6f2f35564d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -23,24 +23,26 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._
-
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
class AnalysisSuite extends FunSuite with BeforeAndAfter {
- val caseSensitiveCatalog = new SimpleCatalog(true)
- val caseInsensitiveCatalog = new SimpleCatalog(false)
+ val caseSensitiveConf = new SimpleCatalystConf(true)
+ val caseInsensitiveConf = new SimpleCatalystConf(false)
+
+ val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
+ val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
val caseSensitiveAnalyzer =
- new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
+ new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
val caseInsensitiveAnalyzer =
- new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
+ new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) {
override val extendedResolutionRules = EliminateSubQueries :: Nil
}
-
def caseSensitiveAnalyze(plan: LogicalPlan): Unit =
caseSensitiveAnalyzer.checkAnalysis(caseSensitiveAnalyzer.execute(plan))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 36b03d1c65..565b1cfe01 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -17,14 +17,17 @@
package org.apache.spark.sql.catalyst.analysis
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Union, Project, LocalRelation}
import org.apache.spark.sql.types._
-import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
- val catalog = new SimpleCatalog(false)
- val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
+ val conf = new SimpleCatalystConf(true)
+ val catalog = new SimpleCatalog(conf)
+ val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index bfaddd0f2c..98a75bb4ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql
+import java.util.Properties
+
import scala.collection.immutable
import scala.collection.JavaConversions._
-import java.util.Properties
+import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
@@ -32,6 +34,7 @@ private[spark] object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val DIALECT = "spark.sql.dialect"
+ val CASE_SENSITIVE = "spark.sql.caseSensitive"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
@@ -89,7 +92,8 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
-private[sql] class SQLConf extends Serializable {
+
+private[sql] class SQLConf extends Serializable with CatalystConf {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -159,6 +163,11 @@ private[sql] class SQLConf extends Serializable {
private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
/**
+ * caseSensitive analysis true by default
+ */
+ def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean
+
+ /**
* When set to true, Spark SQL will use managed memory for certain operations. This option only
* takes effect if codegen is enabled.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0ac0936f0f..28fc9d0443 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -148,7 +148,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
// TODO how to handle the temp table per user session?
@transient
- protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
+ protected[sql] lazy val catalog: Catalog = new SimpleCatalog(conf)
// TODO how to handle the temp function per user session?
@transient
@@ -156,7 +156,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, caseSensitive = true) {
+ new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
ExtractPythonUdfs ::
sources.PreInsertCastAndRename ::
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 77be3b8b20..b44eb223c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.GeneratedAggregate
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
@@ -1277,6 +1278,15 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("SELECT COUNT(DISTINCT key,value) FROM distinctData"), Row(2))
}
+ test("SPARK-4699 case sensitivity SQL query") {
+ setConf(SQLConf.CASE_SENSITIVE, "false")
+ val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
+ val rdd = sparkContext.parallelize((0 to 1).map(i => data(i)))
+ rdd.toDF().registerTempTable("testTable1")
+ checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
+ setConf(SQLConf.CASE_SENSITIVE, "true")
+ }
+
test("SPARK-6145: ORDER BY test for nested fields") {
jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
.registerTempTable("nestedOrder")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 33c6735596..9d3090c19b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -18,25 +18,13 @@
package org.apache.spark.sql.sources
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter
abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
- // Case sensitivity is not configurable yet, but we want to test some edge cases.
- // TODO: Remove when it is configurable
- implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
- @transient
- override protected[sql] lazy val analyzer: Analyzer =
- new Analyzer(catalog, functionRegistry, caseSensitive = false) {
- override val extendedResolutionRules =
- PreInsertCastAndRename ::
- Nil
+ // We want to test some edge cases.
+ implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)
- override val extendedCheckRules = Seq(
- sources.PreWriteCheck(catalog)
- )
- }
- }
+ caseInsensisitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false")
}
-
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 538c6c7f0a..3bab648e31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, Query
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -329,7 +330,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/* An analyzer that uses the Hive metastore. */
@transient
override protected[sql] lazy val analyzer =
- new Analyzer(catalog, functionRegistry, caseSensitive = false) {
+ new Analyzer(catalog, functionRegistry, conf) {
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
@@ -350,6 +351,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
protected[hive] class SQLSession extends super.SQLSession {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+ override def caseSensitiveAnalysis: Boolean =
+ getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8fcdf3d0ab..f5398605bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,26 +17,18 @@
package org.apache.spark.sql.hive
-import java.io.IOException
-import java.util.{List => JList}
-
import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => TPartition, Table => TTable}
-import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.metastore.Warehouse
import org.apache.hadoop.hive.ql.metadata._
-import org.apache.hadoop.hive.ql.plan.CreateTableDesc
-import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
-import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.spark.Logging
-import org.apache.spark.sql.hive.client.IsolatedClientLoader
import org.apache.spark.sql.{SaveMode, AnalysisException, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException, Catalog, OverrideCatalog}
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, Catalog, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
@@ -44,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.{ParquetRelation2, Partition => ParquetPartition, PartitionSpec}
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, DDLParser, LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -54,7 +46,7 @@ import scala.collection.JavaConversions._
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
extends Catalog with Logging {
- import org.apache.spark.sql.hive.HiveMetastoreTypes._
+ val conf = hive.conf
/** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
@@ -148,7 +140,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
- val threshold = hive.conf.schemaStringLengthThreshold
+ val threshold = conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
@@ -355,7 +347,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- hive.conf.parquetUseDataSourceApi &&
+ conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -366,7 +358,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
- hive.conf.parquetUseDataSourceApi &&
+ conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -375,7 +367,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
- hive.conf.parquetUseDataSourceApi &&
+ conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
@@ -435,7 +427,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
desc.name,
- hive.conf.defaultDataSourceName,
+ conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
@@ -464,7 +456,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
tblName,
- hive.conf.defaultDataSourceName,
+ conf.defaultDataSourceName,
temporary = false,
mode,
options = Map.empty[String, String],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1f40a5340c..1598d4bd47 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -26,15 +26,16 @@ import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContaine
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.hadoop.hive.ql.processors._
-import org.apache.hadoop.hive.serde2.RegexSerDe
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.hive.serde2.avro.AvroSerDe
-import org.apache.spark.sql.SQLConf
+
+import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.SQLConf
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}
@@ -103,10 +104,11 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
/** Fewer partitions to speed up testing. */
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
-
// TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
// The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
+ override def caseSensitiveAnalysis: Boolean =
+ getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 7d728fe87b..2c9c08a9f3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c605f10175..1353802604 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -493,6 +493,12 @@ class SQLQuerySuite extends QueryTest {
}
}
+ test("SPARK-4699 HiveContext should be case insensitive by default") {
+ checkAnswer(
+ sql("SELECT KEY FROM Src ORDER BY value"),
+ sql("SELECT key FROM src ORDER BY value").collect().toSeq)
+ }
+
test("SPARK-5284 Insert into Hive throws NPE when a inner complex type field has a null value") {
val schema = StructType(
StructField("s",
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index d5dd0bf58e..bf1121ddf0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,14 +21,12 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
+import org.apache.spark.sql.{QueryTest, SQLConf}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
-import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode