aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala344
1 files changed, 184 insertions, 160 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b45be0251d..7f892047c7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -73,8 +73,12 @@ class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession)
extends SQLContext(sparkSession) {
- def this(sc: SparkContext) {
- this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)))
+ /**
+ * If loadTestTables is false, no test tables are loaded. Note that this flag can only be true
+ * when running in the JVM, i.e. it needs to be false when calling from Python.
+ */
+ def this(sc: SparkContext, loadTestTables: Boolean = true) {
+ this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), loadTestTables))
}
override def newSession(): TestHiveContext = {
@@ -103,13 +107,24 @@ class TestHiveContext(
}
-
+/**
+ * A [[SparkSession]] used in [[TestHiveContext]].
+ *
+ * @param sc SparkContext
+ * @param warehousePath path to the Hive warehouse directory
+ * @param scratchDirPath scratch directory used by Hive's metastore client
+ * @param metastoreTemporaryConf configuration options for Hive's metastore
+ * @param existingSharedState optional [[TestHiveSharedState]]
+ * @param loadTestTables if true, load the test tables. They can only be loaded when running
+ * in the JVM, i.e when calling from Python this flag has to be false.
+ */
private[hive] class TestHiveSparkSession(
@transient private val sc: SparkContext,
val warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
- @transient private val existingSharedState: Option[TestHiveSharedState])
+ @transient private val existingSharedState: Option[TestHiveSharedState],
+ private val loadTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>
// TODO: We need to set the temp warehouse path to sc's conf.
@@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession(
// when we creating metadataHive. This flow is not easy to follow and can introduce
// confusion when a developer is debugging an issue. We need to refactor this part
// to just set the temp warehouse path in sc's conf.
- def this(sc: SparkContext) {
+ def this(sc: SparkContext, loadTestTables: Boolean) {
this(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
- None)
+ None,
+ loadTestTables)
}
assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
@@ -144,7 +160,7 @@ private[hive] class TestHiveSparkSession(
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
- sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState))
+ sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState), loadTestTables)
}
private var cacheTables: Boolean = false
@@ -204,165 +220,173 @@ private[hive] class TestHiveSparkSession(
testTables += (testTable.name -> testTable)
}
- // The test tables that are defined in the Hive QTestUtil.
- // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
- // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
- @transient
- val hiveQTestUtilTables = Seq(
- TestTable("src",
- "CREATE TABLE src (key INT, value STRING)".cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
- TestTable("src1",
- "CREATE TABLE src1 (key INT, value STRING)".cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
- TestTable("srcpart", () => {
- sql(
- "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
- for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ if (loadTestTables) {
+ // The test tables that are defined in the Hive QTestUtil.
+ // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+ // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
+ @transient
+ val hiveQTestUtilTables: Seq[TestTable] = Seq(
+ TestTable("src",
+ "CREATE TABLE src (key INT, value STRING)".cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
+ TestTable("src1",
+ "CREATE TABLE src1 (key INT, value STRING)".cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
+ TestTable("srcpart", () => {
sql(
- s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
- |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
- """.stripMargin)
- }
- }),
- TestTable("srcpart1", () => {
- sql(
- "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
- for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
+ "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
+ sql(
+ s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+ |OVERWRITE INTO TABLE srcpart PARTITION (ds='$ds',hr='$hr')
+ """.stripMargin)
+ }
+ }),
+ TestTable("srcpart1", () => {
sql(
- s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
- |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
- """.stripMargin)
- }
- }),
- TestTable("src_thrift", () => {
- import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
- import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
- import org.apache.thrift.protocol.TBinaryProtocol
+ "CREATE TABLE srcpart1 (key INT, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+ for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- 11 to 12) {
+ sql(
+ s"""LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}'
+ |OVERWRITE INTO TABLE srcpart1 PARTITION (ds='$ds',hr='$hr')
+ """.stripMargin)
+ }
+ }),
+ TestTable("src_thrift", () => {
+ import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer
+ import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
+ import org.apache.thrift.protocol.TBinaryProtocol
- sql(
+ sql(
+ s"""
+ |CREATE TABLE src_thrift(fake INT)
+ |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
+ |WITH SERDEPROPERTIES(
+ | 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
+ | 'serialization.format'='${classOf[TBinaryProtocol].getName}'
+ |)
+ |STORED AS
+ |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
+ |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
+ """.stripMargin)
+
+ sql(
+ s"""
+ |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}'
+ |INTO TABLE src_thrift
+ """.stripMargin)
+ }),
+ TestTable("serdeins",
+ s"""CREATE TABLE serdeins (key INT, value STRING)
+ |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
+ |WITH SERDEPROPERTIES ('field.delim'='\\t')
+ """.stripMargin.cmd,
+ "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
+ TestTable("episodes",
+ s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
+ |STORED AS avro
+ |TBLPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
s"""
- |CREATE TABLE src_thrift(fake INT)
- |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
- |WITH SERDEPROPERTIES(
- | 'serialization.class'='org.apache.spark.sql.hive.test.Complex',
- | 'serialization.format'='${classOf[TBinaryProtocol].getName}'
- |)
- |STORED AS
- |INPUTFORMAT '${classOf[SequenceFileInputFormat[_, _]].getName}'
- |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_, _]].getName}'
- """.stripMargin)
-
- sql(
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
- }),
- TestTable("serdeins",
- s"""CREATE TABLE serdeins (key INT, value STRING)
- |ROW FORMAT SERDE '${classOf[LazySimpleSerDe].getCanonicalName}'
- |WITH SERDEPROPERTIES ('field.delim'='\\t')
- """.stripMargin.cmd,
- "INSERT OVERWRITE TABLE serdeins SELECT * FROM src".cmd),
- TestTable("episodes",
- s"""CREATE TABLE episodes (title STRING, air_date STRING, doctor INT)
- |STORED AS avro
- |TBLPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}' INTO TABLE episodes".cmd
- ),
- // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
- // PARTITIONING IS NOT YET SUPPORTED
- TestTable("episodes_part",
- s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
- |PARTITIONED BY (doctor_pt INT)
- |STORED AS avro
- |TBLPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- // WORKAROUND: Required to pass schema to SerDe for partitioned tables.
- // TODO: Pass this automatically from the table to partitions.
- s"""
- |ALTER TABLE episodes_part SET SERDEPROPERTIES (
- | 'avro.schema.literal'='{
- | "type": "record",
- | "name": "episodes",
- | "namespace": "testing.hive.avro.serde",
- | "fields": [
- | {
- | "name": "title",
- | "type": "string",
- | "doc": "episode title"
- | },
- | {
- | "name": "air_date",
- | "type": "string",
- | "doc": "initial date"
- | },
- | {
- | "name": "doctor",
- | "type": "int",
- | "doc": "main actor playing the Doctor in episode"
- | }
- | ]
- | }'
- |)
- """.stripMargin.cmd,
- s"""
- INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
- SELECT title, air_date, doctor FROM episodes
- """.cmd
+ |LOAD DATA LOCAL INPATH '${getHiveFile("data/files/episodes.avro")}'
+ |INTO TABLE episodes
+ """.stripMargin.cmd
),
- TestTable("src_json",
- s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE
- """.stripMargin.cmd,
- s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
- )
+ // THIS TABLE IS NOT THE SAME AS THE HIVE TEST TABLE episodes_partitioned AS DYNAMIC
+ // PARTITIONING IS NOT YET SUPPORTED
+ TestTable("episodes_part",
+ s"""CREATE TABLE episodes_part (title STRING, air_date STRING, doctor INT)
+ |PARTITIONED BY (doctor_pt INT)
+ |STORED AS avro
+ |TBLPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
+ // WORKAROUND: Required to pass schema to SerDe for partitioned tables.
+ // TODO: Pass this automatically from the table to partitions.
+ s"""
+ |ALTER TABLE episodes_part SET SERDEPROPERTIES (
+ | 'avro.schema.literal'='{
+ | "type": "record",
+ | "name": "episodes",
+ | "namespace": "testing.hive.avro.serde",
+ | "fields": [
+ | {
+ | "name": "title",
+ | "type": "string",
+ | "doc": "episode title"
+ | },
+ | {
+ | "name": "air_date",
+ | "type": "string",
+ | "doc": "initial date"
+ | },
+ | {
+ | "name": "doctor",
+ | "type": "int",
+ | "doc": "main actor playing the Doctor in episode"
+ | }
+ | ]
+ | }'
+ |)
+ """.stripMargin.cmd,
+ s"""
+ INSERT OVERWRITE TABLE episodes_part PARTITION (doctor_pt=1)
+ SELECT title, air_date, doctor FROM episodes
+ """.cmd
+ ),
+ TestTable("src_json",
+ s"""CREATE TABLE src_json (json STRING) STORED AS TEXTFILE
+ """.stripMargin.cmd,
+ s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
+ )
- hiveQTestUtilTables.foreach(registerTestTable)
+ hiveQTestUtilTables.foreach(registerTestTable)
+ }
private val loadedTables = new collection.mutable.HashSet[String]