aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-26 20:46:05 -0800
committerReynold Xin <rxin@databricks.com>2015-02-26 20:46:05 -0800
commit5e5ad6558d60cfbf360708584e883e80d363e33e (patch)
treefbd88acd39179dd93d38f9b1fb48696c3a8aa638 /sql/hive
parent4ad5153f5449319a7e82c9013ccff4494ab58ef1 (diff)
downloadspark-5e5ad6558d60cfbf360708584e883e80d363e33e.tar.gz
spark-5e5ad6558d60cfbf360708584e883e80d363e33e.tar.bz2
spark-5e5ad6558d60cfbf360708584e883e80d363e33e.zip
[SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore.
JIRA: https://issues.apache.org/jira/browse/SPARK-6024 Author: Yin Huai <yhuai@databricks.com> Closes #4795 from yhuai/wideSchema and squashes the following commits: 4882e6f [Yin Huai] Address comments. 73e71b4 [Yin Huai] Address comments. 143927a [Yin Huai] Simplify code. cc1d472 [Yin Huai] Make the schema wider. 12bacae [Yin Huai] If the JSON string of a schema is too large, split it before storing it in metastore. e9b4f70 [Yin Huai] Failed test.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala29
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala21
2 files changed, 44 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8af5a4848f..d3ad364328 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val table = synchronized {
client.getTable(in.database, in.name)
}
- val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
- if (schemaString == null) {
- None
- } else {
- Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
+ Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
+ val parts = (0 until numParts.toInt).map { index =>
+ val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
+ if (part == null) {
+ throw new AnalysisException(
+ s"Could not read schema from the metastore because it is corrupted " +
+ s"(missing part ${index} of the schema).")
+ }
+
+ part
+ }
+ // Stick all parts back to a single schema string in the JSON representation
+ // and convert it back to a StructType.
+ DataType.fromJson(parts.mkString).asInstanceOf[StructType]
}
+
// It does not appear that the ql client for the metastore has a way to enumerate all the
// SerDe properties directly...
val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
@@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
tbl.setProperty("spark.sql.sources.provider", provider)
if (userSpecifiedSchema.isDefined) {
- tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
+ val threshold = hive.conf.schemaStringLengthThreshold
+ val schemaJsonString = userSpecifiedSchema.get.json
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString)
+ parts.zipWithIndex.foreach { case (part, index) =>
+ tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
+ }
}
options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0bd82773f3..00306f1cd7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
}
}
+
+ test("SPARK-6024 wide schema support") {
+ // We will need 80 splits for this schema if the threshold is 4000.
+ val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
+ assert(
+ schema.json.size > conf.schemaStringLengthThreshold,
+ "To correctly test the fix of SPARK-6024, the value of " +
+ s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
+ // Manually create a metastore data source table.
+ catalog.createDataSourceTable(
+ tableName = "wide_schema",
+ userSpecifiedSchema = Some(schema),
+ provider = "json",
+ options = Map("path" -> "just a dummy path"),
+ isExternal = false)
+
+ invalidateTable("wide_schema")
+
+ val actualSchema = table("wide_schema").schema
+ assert(schema === actualSchema)
+ }
}