aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-04-02 16:02:31 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-02 16:02:40 -0700
commitaecec07d6bbcfe1fc5a7c299735664c80b8323a1 (patch)
treed40974c85815670f38d03d039d8e5d74397eab82
parent78ba24584b0a04c3e2106834dcc110d7d5051de3 (diff)
downloadspark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.tar.gz
spark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.tar.bz2
spark-aecec07d6bbcfe1fc5a7c299735664c80b8323a1.zip
[SPARK-6655][SQL] We need to read the schema of a data source table stored in spark.sql.sources.schema property
https://issues.apache.org/jira/browse/SPARK-6655 Author: Yin Huai <yhuai@databricks.com> Closes #5313 from yhuai/SPARK-6655 and squashes the following commits: 1e00c03 [Yin Huai] Unnecessary change. f131bd9 [Yin Huai] Fix. f1218c1 [Yin Huai] Failed test. (cherry picked from commit 251698fb7335a3bb465f1cd0c29e7e74e0361f4a) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala23
2 files changed, 37 insertions, 4 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f0076cef13..14cdb42073 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -70,7 +70,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
- val userSpecifiedSchema =
+
+ def schemaStringFromParts: Option[String] = {
Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
val parts = (0 until numParts.toInt).map { index =>
val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
@@ -82,10 +83,19 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
part
}
- // Stick all parts back to a single schema string in the JSON representation
- // and convert it back to a StructType.
- DataType.fromJson(parts.mkString).asInstanceOf[StructType]
+ // Stick all parts back to a single schema string.
+ parts.mkString
}
+ }
+
+ // Originally, we used spark.sql.sources.schema to store the schema of a data source table.
+ // After SPARK-6024, we removed this flag.
+ // Although we are not using spark.sql.sources.schema any more, we need to still support.
+ val schemaString =
+ Option(table.getProperty("spark.sql.sources.schema")).orElse(schemaStringFromParts)
+
+ val userSpecifiedSchema =
+ schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType])
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 8857ccd7d2..be8bd95802 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -23,6 +23,8 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.metastore.TableType
+import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.sql.catalyst.util
@@ -682,6 +684,27 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
assert(schema === actualSchema)
}
+ test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") {
+ val tableName = "spark6655"
+ val schema = StructType(StructField("int", IntegerType, true) :: Nil)
+ // Manually create the metadata in metastore.
+ val tbl = new Table("default", tableName)
+ tbl.setProperty("spark.sql.sources.provider", "json")
+ tbl.setProperty("spark.sql.sources.schema", schema.json)
+ tbl.setProperty("EXTERNAL", "FALSE")
+ tbl.setTableType(TableType.MANAGED_TABLE)
+ tbl.setSerdeParam("path", catalog.hiveDefaultTableFilePath(tableName))
+ catalog.synchronized {
+ catalog.client.createTable(tbl)
+ }
+
+ invalidateTable(tableName)
+ val actualSchema = table(tableName).schema
+ assert(schema === actualSchema)
+ sql(s"drop table $tableName")
+ }
+
+
test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame =
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")