aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-17 15:47:59 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-17 15:47:59 -0800
commit117121a4ecaadda156a82255333670775e7727db (patch)
treed20cb0ce4d7e58623a5526a6e5fd749cdb1ee95a /sql/core
parent4d4cc760fa9687ce563320094557ef9144488676 (diff)
downloadspark-117121a4ecaadda156a82255333670775e7727db.tar.gz
spark-117121a4ecaadda156a82255333670775e7727db.tar.bz2
spark-117121a4ecaadda156a82255333670775e7727db.zip
[SPARK-5852][SQL]Fail to convert a newly created empty metastore parquet table to a data source parquet table.
The problem is that after we create an empty hive metastore parquet table (e.g. `CREATE TABLE test (a int) STORED AS PARQUET`), Hive will create an empty dir for us, which cause our data source `ParquetRelation2` fail to get the schema of the table. See JIRA for the case to reproduce the bug and the exception. This PR is based on #4562 from chenghao-intel. JIRA: https://issues.apache.org/jira/browse/SPARK-5852 Author: Yin Huai <yhuai@databricks.com> Author: Cheng Hao <hao.cheng@intel.com> Closes #4655 from yhuai/CTASParquet and squashes the following commits: b8b3450 [Yin Huai] Update tests. 2ac94f7 [Yin Huai] Update tests. 3db3d20 [Yin Huai] Minor update. d7e2308 [Yin Huai] Revert changes in HiveMetastoreCatalog.scala. 36978d1 [Cheng Hao] Update the code as feedback a04930b [Cheng Hao] fix bug of scan an empty parquet based table 442ffe0 [Cheng Hao] passdown the schema for Parquet File in HiveContext
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala18
1 files changed, 14 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 95bea92011..16b771344b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -287,7 +287,16 @@ private[sql] case class ParquetRelation2(
}
}
- parquetSchema = maybeSchema.getOrElse(readSchema())
+ // To get the schema. We first try to get the schema defined in maybeSchema.
+ // If maybeSchema is not defined, we will try to get the schema from existing parquet data
+ // (through readSchema). If data does not exist, we will try to get the schema defined in
+ // maybeMetastoreSchema (defined in the options of the data source).
+ // Finally, if we still could not get the schema. We throw an error.
+ parquetSchema =
+ maybeSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(sys.error("Failed to get the schema."))
partitionKeysIncludedInParquetSchema =
isPartitioned &&
@@ -308,7 +317,7 @@ private[sql] case class ParquetRelation2(
}
}
- private def readSchema(): StructType = {
+ private def readSchema(): Option[StructType] = {
// Sees which file(s) we need to touch in order to figure out the schema.
val filesToTouch =
// Always tries the summary files first if users don't require a merged schema. In this case,
@@ -611,7 +620,8 @@ private[sql] object ParquetRelation2 {
// internally.
private[sql] val METASTORE_SCHEMA = "metastoreSchema"
- private[parquet] def readSchema(footers: Seq[Footer], sqlContext: SQLContext): StructType = {
+ private[parquet] def readSchema(
+ footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
footers.map { footer =>
val metadata = footer.getParquetMetadata.getFileMetaData
val parquetSchema = metadata.getSchema
@@ -630,7 +640,7 @@ private[sql] object ParquetRelation2 {
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp))
}
- }.reduce { (left, right) =>
+ }.reduceOption { (left, right) =>
try left.merge(right) catch { case e: Throwable =>
throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e)
}