aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-04-05 00:20:43 +0800
committerCheng Lian <lian@databricks.com>2015-04-05 00:20:43 +0800
commit7bca62f79056e592cf07b49d8b8d04c59dea25fc (patch)
treedd8775fb9a5f8cf7f4d7784fbdfcd4d509888944 /sql
parentda25c86d64ff9ce80f88186ba083f6c21dd9a568 (diff)
downloadspark-7bca62f79056e592cf07b49d8b8d04c59dea25fc.tar.gz
spark-7bca62f79056e592cf07b49d8b8d04c59dea25fc.tar.bz2
spark-7bca62f79056e592cf07b49d8b8d04c59dea25fc.zip
[SPARK-6607][SQL] Check invalid characters for Parquet schema and show error messages
'(' and ')' are special characters used in Parquet schema for type annotation. When we run an aggregation query, we will obtain attribute name such as "MAX(a)". If we directly store the generated DataFrame as Parquet file, it causes failure when reading and parsing the stored schema string. Several methods can be adopted to solve this. This pr uses a simplest one to just replace attribute names before generating Parquet schema based on these attributes. Another possible method might be modifying all aggregation expression names from "func(column)" to "func[column]". Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #5263 from viirya/parquet_aggregation_name and squashes the following commits: 2d70542 [Liang-Chi Hsieh] Address comment. 463dff4 [Liang-Chi Hsieh] Instead of replacing special chars, showing error message to user to suggest using Alias. 1de001d [Liang-Chi Hsieh] Replace special characters '(' and ')' of Parquet schema.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala16
2 files changed, 30 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index da668f0686..60e1bec4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
def convertFromAttributes(attributes: Seq[Attribute],
toThriftSchemaNames: Boolean = false): MessageType = {
+ checkSpecialCharacters(attributes)
val fields = attributes.map(
attribute =>
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
@@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}
+ private def checkSpecialCharacters(schema: Seq[Attribute]) = {
+ // ,;{}()\n\t= and space character are special characters in Parquet schema
+ schema.map(_.name).foreach { name =>
+ if (name.matches(".*[ ,;{}()\n\t=].*")) {
+ sys.error(
+ s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
+ |Please use alias to rename it.
+ """.stripMargin.split("\n").mkString(" "))
+ }
+ }
+ }
+
def convertToString(schema: Seq[Attribute]): String = {
+ checkSpecialCharacters(schema)
StructType.fromAttributes(schema).json
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1319c81dfc..5f71e1bbc2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -688,6 +688,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
sql("DROP TABLE alwaysNullable")
}
+
+ test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
+ val tempDir = Utils.createTempDir()
+ val filePath = new File(tempDir, "testParquet").getCanonicalPath
+ val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
+
+ val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+ val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
+ intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+
+ val df3 = df2.toDF("str", "max_int")
+ df3.saveAsParquetFile(filePath2)
+ val df4 = parquetFile(filePath2)
+ checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
+ assert(df4.columns === Array("str", "max_int"))
+ }
}
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {