aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala16
2 files changed, 30 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index da668f0686..60e1bec4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
def convertFromAttributes(attributes: Seq[Attribute],
toThriftSchemaNames: Boolean = false): MessageType = {
+ checkSpecialCharacters(attributes)
val fields = attributes.map(
attribute =>
fromDataType(attribute.dataType, attribute.name, attribute.nullable,
@@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}
+ private def checkSpecialCharacters(schema: Seq[Attribute]) = {
+ // ,;{}()\n\t= and space character are special characters in Parquet schema
+ schema.map(_.name).foreach { name =>
+ if (name.matches(".*[ ,;{}()\n\t=].*")) {
+ sys.error(
+ s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
+ |Please use alias to rename it.
+ """.stripMargin.split("\n").mkString(" "))
+ }
+ }
+ }
+
def convertToString(schema: Seq[Attribute]): String = {
+ checkSpecialCharacters(schema)
StructType.fromAttributes(schema).json
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1319c81dfc..5f71e1bbc2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -688,6 +688,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
sql("DROP TABLE alwaysNullable")
}
+
+ test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
+ val tempDir = Utils.createTempDir()
+ val filePath = new File(tempDir, "testParquet").getCanonicalPath
+ val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
+
+ val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+ val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
+ intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+
+ val df3 = df2.toDF("str", "max_int")
+ df3.saveAsParquetFile(filePath2)
+ val df4 = parquetFile(filePath2)
+ checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
+ assert(df4.columns === Array("str", "max_int"))
+ }
}
class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {