From 5e5ad6558d60cfbf360708584e883e80d363e33e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 26 Feb 2015 20:46:05 -0800 Subject: [SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore. JIRA: https://issues.apache.org/jira/browse/SPARK-6024 Author: Yin Huai Closes #4795 from yhuai/wideSchema and squashes the following commits: 4882e6f [Yin Huai] Address comments. 73e71b4 [Yin Huai] Address comments. 143927a [Yin Huai] Simplify code. cc1d472 [Yin Huai] Make the schema wider. 12bacae [Yin Huai] If the JSON string of a schema is too large, split it before storing it in metastore. e9b4f70 [Yin Huai] Failed test. --- .../main/scala/org/apache/spark/sql/SQLConf.scala | 10 ++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 29 +++++++++++++++++----- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 21 ++++++++++++++++ 3 files changed, 54 insertions(+), 6 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index a08c0f5ce3..4815620c6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -51,6 +51,11 @@ private[spark] object SQLConf { // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default" + // This is used to control the when we will split a schema's JSON string to multiple pieces + // in order to fit the JSON string in metastore's table property (by default, the value has + // a length restriction of 4000 characters). We will split the JSON string of a schema + // to its length exceeds the threshold. + val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold" // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. @@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable { private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet") + // Do not use a value larger than 4000 as the default value of this property. + // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information. + private[spark] def schemaStringLengthThreshold: Int = + getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt + private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8af5a4848f..d3ad364328 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = synchronized { client.getTable(in.database, in.name) } - val schemaString = table.getProperty("spark.sql.sources.schema") val userSpecifiedSchema = - if (schemaString == null) { - None - } else { - Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts => + val parts = (0 until numParts.toInt).map { index => + val part = table.getProperty(s"spark.sql.sources.schema.part.${index}") + if (part == null) { + throw new AnalysisException( + s"Could not read schema from the metastore because it is corrupted " + + s"(missing part ${index} of the schema).") + } + + part + } + // Stick all parts back to a single schema string in the JSON representation + // and convert it back to a StructType. + DataType.fromJson(parts.mkString).asInstanceOf[StructType] } + // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap @@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tbl.setProperty("spark.sql.sources.provider", provider) if (userSpecifiedSchema.isDefined) { - tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json) + val threshold = hive.conf.schemaStringLengthThreshold + val schemaJsonString = userSpecifiedSchema.get.json + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString) + parts.zipWithIndex.foreach { case (part, index) => + tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part) + } } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0bd82773f3..00306f1cd7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource) } } + + test("SPARK-6024 wide schema support") { + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true))) + assert( + schema.json.size > conf.schemaStringLengthThreshold, + "To correctly test the fix of SPARK-6024, the value of " + + s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}") + // Manually create a metastore data source table. + catalog.createDataSourceTable( + tableName = "wide_schema", + userSpecifiedSchema = Some(schema), + provider = "json", + options = Map("path" -> "just a dummy path"), + isExternal = false) + + invalidateTable("wide_schema") + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + } } -- cgit v1.2.3