diff options
Diffstat (limited to 'sql')
4 files changed, 112 insertions, 46 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 86129fa87f..1ef9d52713 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -87,7 +87,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { /** * A catalog that interacts with external systems. */ - val externalCatalog: ExternalCatalog = + lazy val externalCatalog: ExternalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(sparkContext.conf), sparkContext.conf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index fd633869dd..33802ae623 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -62,7 +62,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat /** * A Hive client used to interact with the metastore. */ - val client: HiveClient = { + lazy val client: HiveClient = { HiveUtils.newClientForMetadata(conf, hadoopConf) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 076c40d459..b63ed76967 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -24,23 +24,24 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{ExperimentalMethods, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedRelation} -import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.ExternalCatalog import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner} +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.util.{ShutdownHookManager, Utils} // SPARK-3729: Test key required to check for initialization errors with config. @@ -58,6 +59,37 @@ object TestHive .set("spark.ui.enabled", "false"))) +case class TestHiveVersion(hiveClient: HiveClient) + extends TestHiveContext(TestHive.sparkContext, hiveClient) + + +private[hive] class TestHiveExternalCatalog( + conf: SparkConf, + hadoopConf: Configuration, + hiveClient: Option[HiveClient] = None) + extends HiveExternalCatalog(conf, hadoopConf) with Logging { + + override lazy val client: HiveClient = + hiveClient.getOrElse { + HiveUtils.newClientForMetadata(conf, hadoopConf) + } +} + + +private[hive] class TestHiveSharedState( + sc: SparkContext, + hiveClient: Option[HiveClient] = None) + extends SharedState(sc) { + + override lazy val externalCatalog: ExternalCatalog = { + new TestHiveExternalCatalog( + sc.conf, + sc.hadoopConfiguration, + hiveClient) + } +} + + /** * A locally running test instance of Spark's Hive execution engine. * @@ -81,6 +113,12 @@ class TestHiveContext( this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables)) } + def this(sc: SparkContext, hiveClient: HiveClient) { + this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), + hiveClient, + loadTestTables = false)) + } + override def newSession(): TestHiveContext = { new TestHiveContext(sparkSession.newSession()) } @@ -115,7 +153,7 @@ class TestHiveContext( */ private[hive] class TestHiveSparkSession( @transient private val sc: SparkContext, - @transient private val existingSharedState: Option[SharedState], + @transient private val existingSharedState: Option[TestHiveSharedState], private val loadTestTables: Boolean) extends SparkSession(sc) with Logging { self => @@ -126,6 +164,13 @@ private[hive] class TestHiveSparkSession( loadTestTables) } + def this(sc: SparkContext, hiveClient: HiveClient, loadTestTables: Boolean) { + this( + sc, + existingSharedState = Some(new TestHiveSharedState(sc, Some(hiveClient))), + loadTestTables) + } + { // set the metastore temporary configuration val metastoreTempConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false) ++ Map( ConfVars.METASTORE_INTEGER_JDO_PUSHDOWN.varname -> "true", @@ -141,8 +186,8 @@ private[hive] class TestHiveSparkSession( assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive") @transient - override lazy val sharedState: SharedState = { - existingSharedState.getOrElse(new SharedState(sc)) + override lazy val sharedState: TestHiveSharedState = { + existingSharedState.getOrElse(new TestHiveSharedState(sc)) } @transient @@ -463,6 +508,14 @@ private[hive] class TestHiveSparkSession( FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } + // HDFS root scratch dir requires the write all (733) permission. For each connecting user, + // an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with + // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to + // delete it. Later, it will be re-created with the right permission. + val location = new Path(sc.hadoopConfiguration.get(ConfVars.SCRATCHDIR.varname)) + val fs = location.getFileSystem(sc.hadoopConfiguration) + fs.delete(location, true) + // Some tests corrupt this value on purpose, which breaks the RESET call below. sessionState.conf.setConfString("fs.defaultFS", new File(".").toURI.toString) // It is important that we RESET first as broken hooks that might have been set could break diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 6025f8adbc..cb13861110 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -21,21 +21,20 @@ import java.io.{ByteArrayOutputStream, File, PrintStream} import java.net.URI import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat +import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly -import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} +import org.apache.spark.sql.hive.test.TestHiveVersion import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.tags.ExtendedHiveTest @@ -48,11 +47,31 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} * is not fully tested. */ @ExtendedHiveTest -class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging { +class VersionsSuite extends SparkFunSuite with Logging { private val clientBuilder = new HiveClientBuilder import clientBuilder.buildClient + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + + /** + * Drops table `tableName` after calling `f`. + */ + protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try f finally { + tableNames.foreach { name => + versionSpark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + test("success sanity check") { val badClient = buildClient(HiveUtils.hiveExecutionVersion, new Configuration()) val db = new CatalogDatabase("default", "desc", new URI("loc"), Map()) @@ -93,6 +112,8 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w private var client: HiveClient = null + private var versionSpark: TestHiveVersion = null + versions.foreach { version => test(s"$version: create client") { client = null @@ -105,6 +126,10 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w hadoopConf.set("datanucleus.schema.autoCreateAll", "true") } client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf)) + if (versionSpark != null) versionSpark.reset() + versionSpark = TestHiveVersion(client) + assert(versionSpark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + .version.fullVersion.startsWith(version)) } def table(database: String, tableName: String): CatalogTable = { @@ -545,22 +570,22 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w test(s"$version: CREATE TABLE AS SELECT") { withTable("tbl") { - spark.sql("CREATE TABLE tbl AS SELECT 1 AS a") - assert(spark.table("tbl").collect().toSeq == Seq(Row(1))) + versionSpark.sql("CREATE TABLE tbl AS SELECT 1 AS a") + assert(versionSpark.table("tbl").collect().toSeq == Seq(Row(1))) } } test(s"$version: Delete the temporary staging directory and files after each insert") { withTempDir { tmpDir => withTable("tab") { - spark.sql( + versionSpark.sql( s""" |CREATE TABLE tab(c1 string) |location '${tmpDir.toURI.toString}' """.stripMargin) (1 to 3).map { i => - spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") + versionSpark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") } def listFiles(path: File): List[String] = { val dir = path.listFiles() @@ -569,7 +594,9 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w folders.flatMap(listFiles) ++: filePaths } // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid` - assert(listFiles(tmpDir).length == 2) + // 0.12, 0.13, 1.0 and 1.1 also has another two more files ._SUCCESS.crc and _SUCCESS + val metadataFiles = Seq("._SUCCESS.crc", "_SUCCESS") + assert(listFiles(tmpDir).filterNot(metadataFiles.contains).length == 2) } } } @@ -609,7 +636,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w withTable(tableName, tempTableName) { // Creates the external partitioned Avro table to be tested. - sql( + versionSpark.sql( s"""CREATE EXTERNAL TABLE $tableName |PARTITIONED BY (ds STRING) |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' @@ -622,7 +649,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w ) // Creates an temporary Avro table used to prepare testing Avro file. - sql( + versionSpark.sql( s"""CREATE EXTERNAL TABLE $tempTableName |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' |STORED AS @@ -634,43 +661,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w ) // Generates Avro data. - sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)") + versionSpark.sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)") // Adds generated Avro data as a new partition to the testing table. - sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'") + versionSpark.sql( + s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'") // The following query fails before SPARK-13709 is fixed. This is because when reading // data from table partitions, Avro deserializer needs the Avro schema, which is defined // in table property "avro.schema.literal". However, we only initializes the deserializer // using partition properties, which doesn't include the wanted property entry. Merging // two sets of properties solves the problem. - checkAnswer( - sql(s"SELECT * FROM $tableName"), - Row(1, Row(2, 2.5D), "foo") - ) + assert(versionSpark.sql(s"SELECT * FROM $tableName").collect() === + Array(Row(1, Row(2, 2.5D), "foo"))) } } } test(s"$version: CTAS for managed data source tables") { withTable("t", "t1") { - import spark.implicits._ - - val tPath = new Path(spark.sessionState.conf.warehousePath, "t") - Seq("1").toDF("a").write.saveAsTable("t") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - - assert(table.location == makeQualifiedPath(tPath.toString)) - assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath)) - checkAnswer(spark.table("t"), Row("1") :: Nil) - - val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1") - spark.sql("create table t1 using parquet as select 2 as a") - val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - - assert(table1.location == makeQualifiedPath(t1Path.toString)) - assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) - checkAnswer(spark.table("t1"), Row(2) :: Nil) + versionSpark.range(1).write.saveAsTable("t") + assert(versionSpark.table("t").collect() === Array(Row(0))) + versionSpark.sql("create table t1 using parquet as select 2 as a") + assert(versionSpark.table("t1").collect() === Array(Row(2))) } } // TODO: add more tests. |