aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorXiao Li <gatorsmile@gmail.com>2017-03-14 14:19:02 +0800
committerWenchen Fan <wenchen@databricks.com>2017-03-14 14:19:02 +0800
commit415f9f3423aacc395097e40427364c921a2ed7f1 (patch)
treef84d5421f41d1ecb41e32f269790ffafe37ee312 /sql
parent4dc3a8171c31e11aafa85200d3928b1745aa32bd (diff)
downloadspark-415f9f3423aacc395097e40427364c921a2ed7f1.tar.gz
spark-415f9f3423aacc395097e40427364c921a2ed7f1.tar.bz2
spark-415f9f3423aacc395097e40427364c921a2ed7f1.zip
[SPARK-19921][SQL][TEST] Enable end-to-end testing using different Hive metastore versions.
### What changes were proposed in this pull request? To improve the quality of our Spark SQL in different Hive metastore versions, this PR is to enable end-to-end testing using different versions. This PR allows the test cases in sql/hive to pass the existing Hive client to create a SparkSession. - Since Derby does not allow concurrent connections, the pre-built Hive clients use different database from the TestHive's built-in 1.2.1 client. - Since our test cases in sql/hive only can create a single Spark context in the same JVM, the newly created SparkSession share the same spark context with the existing TestHive's corresponding SparkSession. ### How was this patch tested? Fixed the existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Closes #17260 from gatorsmile/versionSuite.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala69
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala85
4 files changed, 112 insertions, 46 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 86129fa87f..1ef9d52713 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -87,7 +87,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
/**
* A catalog that interacts with external systems.
*/
- val externalCatalog: ExternalCatalog =
+ lazy val externalCatalog: ExternalCatalog =
SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index fd633869dd..33802ae623 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -62,7 +62,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
/**
* A Hive client used to interact with the metastore.
*/
- val client: HiveClient = {
+ lazy val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 076c40d459..b63ed76967 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -24,23 +24,24 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.FunctionRegistry
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{ExperimentalMethods, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.util.{ShutdownHookManager, Utils}
// SPARK-3729: Test key required to check for initialization errors with config.
@@ -58,6 +59,37 @@ object TestHive
.set("spark.ui.enabled", "false")))
+case class TestHiveVersion(hiveClient: HiveClient)
+ extends TestHiveContext(TestHive.sparkContext, hiveClient)
+
+
+private[hive] class TestHiveExternalCatalog(
+ conf: SparkConf,
+ hadoopConf: Configuration,
+ hiveClient: Option[HiveClient] = None)
+ extends HiveExternalCatalog(conf, hadoopConf) with Logging {
+
+ override lazy val client: HiveClient =
+ hiveClient.getOrElse {
+ HiveUtils.newClientForMetadata(conf, hadoopConf)
+ }
+}
+
+
+private[hive] class TestHiveSharedState(
+ sc: SparkContext,
+ hiveClient: Option[HiveClient] = None)
+ extends SharedState(sc) {
+
+ override lazy val externalCatalog: ExternalCatalog = {
+ new TestHiveExternalCatalog(
+ sc.conf,
+ sc.hadoopConfiguration,
+ hiveClient)
+ }
+}
+
+
/**
* A locally running test instance of Spark's Hive execution engine.
*
@@ -81,6 +113,12 @@ class TestHiveContext(
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
}
+ def this(sc: SparkContext, hiveClient: HiveClient) {
+ this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc),
+ hiveClient,
+ loadTestTables = false))
+ }
+
override def newSession(): TestHiveContext = {
new TestHiveContext(sparkSession.newSession())
}
@@ -115,7 +153,7 @@ class TestHiveContext(
*/
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
- @transient private val existingSharedState: Option[SharedState],
+ @transient private val existingSharedState: Option[TestHiveSharedState],
private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
@@ -126,6 +164,13 @@ private[hive] class TestHiveSparkSession(
loadTestTables)
}
+ def this(sc: SparkContext, hiveClient: HiveClient, loadTestTables: Boolean) {
+ this(
+ sc,
+ existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))),
+ loadTestTables)
+ }
+
{ // set the metastore temporary configuration
val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map(
ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true",
@@ -141,8 +186,8 @@ private[hive] class TestHiveSparkSession(
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
@transient
- override lazy val sharedState: SharedState = {
- existingSharedState.getOrElse(new SharedState(sc))
+ override lazy val sharedState: TestHiveSharedState = {
+ existingSharedState.getOrElse(new TestHiveSharedState(sc))
}
@transient
@@ -463,6 +508,14 @@ private[hive] class TestHiveSparkSession(
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
+ // HDFS root scratch dir requires the write all (733) permission. For each connecting user,
+ // an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with
+ // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to
+ // delete it. Later, it will be re-created with the right permission.
+ val location = new Path(sc.hadoopConfiguration.get(ConfVars.SCRATCHDIR.varname))
+ val fs = location.getFileSystem(sc.hadoopConfiguration)
+ fs.delete(location, true)
+
// Some tests corrupt this value on purpose, which breaks the RESET call below.
sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString)
// It is important that we RESET first as broken hooks that might have been set could break
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 6025f8adbc..cb13861110 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,21 +21,20 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
+import org.apache.spark.sql.hive.test.TestHiveVersion
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
@@ -48,11 +47,31 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
-class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging {
+class VersionsSuite extends SparkFunSuite with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ */
+ protected def withTempDir(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir().getCanonicalFile
+ try f(dir) finally Utils.deleteRecursively(dir)
+ }
+
+ /**
+ * Drops table `tableName` after calling `f`.
+ */
+ protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ try f finally {
+ tableNames.foreach { name =>
+ versionSpark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+
test("success sanity check") {
val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration())
val db = new CatalogDatabase("default", "desc", new URI("loc"), Map())
@@ -93,6 +112,8 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
private var client: HiveClient = null
+ private var versionSpark: TestHiveVersion = null
+
versions.foreach { version =>
test(s"$version: create client") {
client = null
@@ -105,6 +126,10 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
+ if (versionSpark != null) versionSpark.reset()
+ versionSpark = TestHiveVersion(client)
+ assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ .version.fullVersion.startsWith(version))
}
def table(database: String, tableName: String): CatalogTable = {
@@ -545,22 +570,22 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
- spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
- assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
+ versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+ assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1)))
}
}
test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
- spark.sql(
+ versionSpark.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
(1 to 3).map { i =>
- spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+ versionSpark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
}
def listFiles(path: File): List[String] = {
val dir = path.listFiles()
@@ -569,7 +594,9 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
folders.flatMap(listFiles) ++: filePaths
}
// expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid`
- assert(listFiles(tmpDir).length == 2)
+ // 0.12, 0.13, 1.0 and 1.1 also has another two more files ._SUCCESS.crc and _SUCCESS
+ val metadataFiles = Seq("._SUCCESS.crc", "_SUCCESS")
+ assert(listFiles(tmpDir).filterNot(metadataFiles.contains).length == 2)
}
}
}
@@ -609,7 +636,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
withTable(tableName, tempTableName) {
// Creates the external partitioned Avro table to be tested.
- sql(
+ versionSpark.sql(
s"""CREATE EXTERNAL TABLE $tableName
|PARTITIONED BY (ds STRING)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
@@ -622,7 +649,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
)
// Creates an temporary Avro table used to prepare testing Avro file.
- sql(
+ versionSpark.sql(
s"""CREATE EXTERNAL TABLE $tempTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
@@ -634,43 +661,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
)
// Generates Avro data.
- sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
+ versionSpark.sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
// Adds generated Avro data as a new partition to the testing table.
- sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
+ versionSpark.sql(
+ s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
// The following query fails before SPARK-13709 is fixed. This is because when reading
// data from table partitions, Avro deserializer needs the Avro schema, which is defined
// in table property "avro.schema.literal". However, we only initializes the deserializer
// using partition properties, which doesn't include the wanted property entry. Merging
// two sets of properties solves the problem.
- checkAnswer(
- sql(s"SELECT * FROM $tableName"),
- Row(1, Row(2, 2.5D), "foo")
- )
+ assert(versionSpark.sql(s"SELECT * FROM $tableName").collect() ===
+ Array(Row(1, Row(2, 2.5D), "foo")))
}
}
}
test(s"$version: CTAS for managed data source tables") {
withTable("t", "t1") {
- import spark.implicits._
-
- val tPath = new Path(spark.sessionState.conf.warehousePath, "t")
- Seq("1").toDF("a").write.saveAsTable("t")
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-
- assert(table.location == makeQualifiedPath(tPath.toString))
- assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath))
- checkAnswer(spark.table("t"), Row("1") :: Nil)
-
- val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1")
- spark.sql("create table t1 using parquet as select 2 as a")
- val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-
- assert(table1.location == makeQualifiedPath(t1Path.toString))
- assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path))
- checkAnswer(spark.table("t1"), Row(2) :: Nil)
+ versionSpark.range(1).write.saveAsTable("t")
+ assert(versionSpark.table("t").collect() === Array(Row(0)))
+ versionSpark.sql("create table t1 using parquet as select 2 as a")
+ assert(versionSpark.table("t1").collect() === Array(Row(2)))
}
}
// TODO: add more tests.