aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-06-30 19:02:35 -0700
committerReynold Xin <rxin@databricks.com>2016-06-30 19:02:35 -0700
commit38f4d6f44eaa03bdc703662e4a7be9c09ba86e16 (patch)
tree900b3dccac6929d6cb2fd2f0fc10c29a78779024 /sql/hive/src
parent4a981dc870a31d8b90aac5f6cb22884e02f6fbc6 (diff)
downloadspark-38f4d6f44eaa03bdc703662e4a7be9c09ba86e16.tar.gz
spark-38f4d6f44eaa03bdc703662e4a7be9c09ba86e16.tar.bz2
spark-38f4d6f44eaa03bdc703662e4a7be9c09ba86e16.zip
[SPARK-15954][SQL] Disable loading test tables in Python tests
## What changes were proposed in this pull request? This patch introduces a flag to disable loading test tables in TestHiveSparkSession and disables that in Python. This fixes an issue in which python/run-tests would fail due to failure to load test tables. Note that these test tables are not used outside of HiveCompatibilitySuite. In the long run we should probably decouple the loading of test tables from the test Hive setup. ## How was this patch tested? This is a test only change. Author: Reynold Xin <rxin@databricks.com> Closes #14005 from rxin/SPARK-15954.
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala344
1 files changed, 184 insertions, 160 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b45be0251d..7f892047c7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -73,8 +73,12 @@ class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession)
extends SQLContext(sparkSession) {
- def this(sc: SparkContext) {
- this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)))
+ /**
+ * If loadTestTables is false, no test tables are loaded. Note that this flag can only be true
+ * when running in the JVM, i.e. it needs to be false when calling from Python.
+ */
+ def this(sc: SparkContext, loadTestTables: Boolean = true) {
+ this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
}
override def newSession(): TestHiveContext = {
@@ -103,13 +107,24 @@ class TestHiveContext(
}
-
+/**
+ * A [[SparkSession]] used in [[TestHiveContext]].
+ *
+ * @param sc SparkContext
+ * @param warehousePath path to the Hive warehouse directory
+ * @param scratchDirPath scratch directory used by Hive's metastore client
+ * @param metastoreTemporaryConf configuration options for Hive's metastore
+ * @param existingSharedState optional [[TestHiveSharedState]]
+ * @param loadTestTables if true, load the test tables. They can only be loaded when running
+ * in the JVM, i.e when calling from Python this flag has to be false.
+ */
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
val warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
- @transient private val existingSharedState: Option[TestHiveSharedState])
+ @transient private val existingSharedState: Option[TestHiveSharedState],
+ private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
// TODO: We need to set the temp warehouse path to sc's conf.
@@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession(
// when we creating metadataHive. This flow is not easy to follow and can introduce
// confusion when a developer is debugging an issue. We need to refactor this part
// to just set the temp warehouse path in sc's conf.
- def this(sc: SparkContext) {
+ def this(sc: SparkContext, loadTestTables: Boolean) {
this(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
- None)
+ None,
+ loadTestTables)
}
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
@@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession(
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
- sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState))
+ sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables)
}
private var cacheTables: Boolean = false
@@ -204,165 +220,173 @@ private[hive] class TestHiveSparkSession(
testTables += (testTable.name -> testTable)
}
- // The test tables that are defined in the Hive QTestUtil.
- // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
- // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
- @transient
- val hiveQTestUtilTables = Seq(
- TestTable("src",
- "CREATE TABLE src (key INT, value STRING)".cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
- TestTable("src1",
- "CREATE TABLE src1 (key INT, value STRING)".cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
- TestTable("srcpart", () => {
- sql(
- "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
- for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ if (loadTestTables) {
+ // The test tables that are defined in the Hive QTestUtil.
+ // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+ // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
+ @transient
+ val hiveQTestUtilTables: Seq[TestTable] = Seq(
+ TestTable("src",
+ "CREATE TABLE src (key INT, value STRING)".cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
+ TestTable("src1",
+ "CREATE TABLE src1 (key INT, value STRING)".cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
+ TestTable("srcpart", () => {
sql(
- s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
- |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
- """.stripMargin)
- }
- }),
- TestTable("srcpart1", () => {
- sql(
- "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
- for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
+ "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ sql(
+ s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+ |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
+ """.stripMargin)
+ }
+ }),
+ TestTable("srcpart1", () => {
sql(
- s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
- |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
- """.stripMargin)
- }
- }),
- TestTable("src_thrift", () => {
- import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
- import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
- import org.apache.thrift.protocol.TBinaryProtocol
+ "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
+ sql(
+ s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+ |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
+ """.stripMargin)
+ }
+ }),
+ TestTable("src_thrift", () => {
+ import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
+ import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
+ import org.apache.thrift.protocol.TBinaryProtocol
- sql(
+ sql(
+ s"""
+ |CREATE TABLE src_thrift(fake INT)
+ |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
+ |WITH SERDEPROPERTIES(
+ | 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
+ | 'serialization.format'='${classOf[TBinaryProtocol].getName}'
+ |)
+ |STORED AS
+ |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
+ |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}'
+ |INTO TABLE src_thrift
+ """.stripMargin)
+ }),
+ TestTable("serdeins",
+ s"""CREATE TABLE serdeins (key INT, value STRING)
+ |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
+ |WITH SERDEPROPERTIES ('field.delim'='\\t')
+ """.stripMargin.cmd,
+ "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
+ TestTable("episodes",
+ s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
+ |STORED AS avro
+ |TBLPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
s"""
- |CREATE TABLE src_thrift(fake INT)
- |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
- |WITH SERDEPROPERTIES(
- | 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
- | 'serialization.format'='${classOf[TBinaryProtocol].getName}'
- |)
- |STORED AS
- |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
- |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
- """.stripMargin)
-
- sql(
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
- }),
- TestTable("serdeins",
- s"""CREATE TABLE serdeins (key INT, value STRING)
- |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
- |WITH SERDEPROPERTIES ('field.delim'='\\t')
- """.stripMargin.cmd,
- "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
- TestTable("episodes",
- s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
- |STORED AS avro
- |TBLPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
- ),
- // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
- // PARTITIONING IS NOT YET SUPPORTED
- TestTable("episodes_part",
- s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
- |PARTITIONED BY (doctor_pt INT)
- |STORED AS avro
- |TBLPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- // WORKAROUND: Required to pass schema to SerDe for partitioned tables.
- // TODO: Pass this automatically from the table to partitions.
- s"""
- |ALTER TABLE episodes_part SET SERDEPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- s"""
- INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
- SELECT title, air_date, doctor FROM episodes
- """.cmd
+ |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}'
+ |INTO TABLE episodes
+ """.stripMargin.cmd
),
- TestTable("src_json",
- s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE
- """.stripMargin.cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
- )
+ // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
+ // PARTITIONING IS NOT YET SUPPORTED
+ TestTable("episodes_part",
+ s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
+ |PARTITIONED BY (doctor_pt INT)
+ |STORED AS avro
+ |TBLPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
+ // WORKAROUND: Required to pass schema to SerDe for partitioned tables.
+ // TODO: Pass this automatically from the table to partitions.
+ s"""
+ |ALTER TABLE episodes_part SET SERDEPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
+ s"""
+ INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
+ SELECT title, air_date, doctor FROM episodes
+ """.cmd
+ ),
+ TestTable("src_json",
+ s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE
+ """.stripMargin.cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
+ )
- hiveQTestUtilTables.foreach(registerTestTable)
+ hiveQTestUtilTables.foreach(registerTestTable)
+ }
private val loadedTables = new collection.mutable.HashSet[String]