diff options
author | Yin Huai <yhuai@databricks.com> | 2015-02-26 20:46:05 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-02-26 20:46:05 -0800 |
commit | 5e5ad6558d60cfbf360708584e883e80d363e33e (patch) | |
tree | fbd88acd39179dd93d38f9b1fb48696c3a8aa638 /sql/hive | |
parent | 4ad5153f5449319a7e82c9013ccff4494ab58ef1 (diff) | |
download | spark-5e5ad6558d60cfbf360708584e883e80d363e33e.tar.gz spark-5e5ad6558d60cfbf360708584e883e80d363e33e.tar.bz2 spark-5e5ad6558d60cfbf360708584e883e80d363e33e.zip |
[SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore.
JIRA: https://issues.apache.org/jira/browse/SPARK-6024
Author: Yin Huai <yhuai@databricks.com>
Closes #4795 from yhuai/wideSchema and squashes the following commits:
4882e6f [Yin Huai] Address comments.
73e71b4 [Yin Huai] Address comments.
143927a [Yin Huai] Simplify code.
cc1d472 [Yin Huai] Make the schema wider.
12bacae [Yin Huai] If the JSON string of a schema is too large, split it before storing it in metastore.
e9b4f70 [Yin Huai] Failed test.
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 29 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 21 |
2 files changed, 44 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8af5a4848f..d3ad364328 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val schemaString = table.getProperty("spark.sql.sources.schema") val userSpecifiedSchema = - if (schemaString == null) { - None - } else { - Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = table.getProperty(s"spark.sql.sources.schema.part.${index}") + if (part == null) { + throw new AnalysisException( + s"Could not read schema from the metastore because it is corrupted " + + s"(missing part ${index} of the schema).") + } + + part + } + // Stick all parts back to a single schema string in the JSON representation + // and convert it back to a StructType. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] } + // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap @@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tbl.setProperty("spark.sql.sources.provider", provider) if (userSpecifiedSchema.isDefined) { - tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json) + val threshold = hive.conf.schemaStringLengthThreshold + val schemaJsonString = userSpecifiedSchema.get.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => + tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) + } } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0bd82773f3..00306f1cd7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } } + + test("SPARK-6024 wide schema support") { + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true))) + assert( + schema.json.size > conf.schemaStringLengthThreshold, + "To correctly test the fix of SPARK-6024, the value of " + + s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}") + // Manually create a metastore data source table. + catalog.createDataSourceTable( + tableName = "wide_schema", + userSpecifiedSchema = Some(schema), + provider = "json", + options = Map("path" -> "just a dummy path"), + isExternal = false) + + invalidateTable("wide_schema") + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + } } |