diff options
author | Yin Huai <yhuai@databricks.com> | 2015-04-02 16:02:31 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-04-02 16:02:40 -0700 |
commit | aecec07d6bbcfe1fc5a7c299735664c80b8323a1 (patch) | |
tree | d40974c85815670f38d03d039d8e5d74397eab82 | |
parent | 78ba24584b0a04c3e2106834dcc110d7d5051de3 (diff) | |
download | spark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.tar.gz spark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.tar.bz2 spark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.zip |
[SPARK-6655][SQL] We need to read the schema of a data source table stored in spark.sql.sources.schema property
https://issues.apache.org/jira/browse/SPARK-6655
Author: Yin Huai <yhuai@databricks.com>
Closes #5313 from yhuai/SPARK-6655 and squashes the following commits:
1e00c03 [Yin Huai] Unnecessary change.
f131bd9 [Yin Huai] Fix.
f1218c1 [Yin Huai] Failed test.
(cherry picked from commit 251698fb7335a3bb465f1cd0c29e7e74e0361f4a)
Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 18 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 23 |
2 files changed, 37 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f0076cef13..14cdb42073 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -70,7 +70,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val userSpecifiedSchema = + + def schemaStringFromParts: Option[String] = { Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts => val parts = (0 until numParts.toInt).map { index => val part = table.getProperty(s"spark.sql.sources.schema.part.${index}") @@ -82,10 +83,19 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with part } - // Stick all parts back to a single schema string in the JSON representation - // and convert it back to a StructType. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] + // Stick all parts back to a single schema string. + parts.mkString } + } + + // Originally, we used spark.sql.sources.schema to store the schema of a data source table. + // After SPARK-6024, we removed this flag. + // Although we are not using spark.sql.sources.schema any more, we need to still support. + val schemaString = + Option(table.getProperty("spark.sql.sources.schema")).orElse(schemaStringFromParts) + + val userSpecifiedSchema = + schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType]) // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 8857ccd7d2..be8bd95802 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -23,6 +23,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.metastore.TableType +import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.mapred.InvalidInputException import org.apache.spark.sql.catalyst.util @@ -682,6 +684,27 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { assert(schema === actualSchema) } + test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") { + val tableName = "spark6655" + val schema = StructType(StructField("int", IntegerType, true) :: Nil) + // Manually create the metadata in metastore. + val tbl = new Table("default", tableName) + tbl.setProperty("spark.sql.sources.provider", "json") + tbl.setProperty("spark.sql.sources.schema", schema.json) + tbl.setProperty("EXTERNAL", "FALSE") + tbl.setTableType(TableType.MANAGED_TABLE) + tbl.setSerdeParam("path", catalog.hiveDefaultTableFilePath(tableName)) + catalog.synchronized { + catalog.client.createTable(tbl) + } + + invalidateTable(tableName) + val actualSchema = table(tableName).schema + assert(schema === actualSchema) + sql(s"drop table $tableName") + } + + test("insert into a table") { def createDF(from: Int, to: Int): DataFrame = createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2") |