aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJuliusz Sompolski <julek@databricks.com>2017-04-21 09:49:42 +0800
committerWenchen Fan <wenchen@databricks.com>2017-04-21 09:49:42 +0800
commit0368eb9d86634c83b3140ce3190cb9e0d0b7fd86 (patch)
tree5e065ab20c9b78b738d082eedc9d5b287fefa44c
parent592f5c89349f3c5b6ec0531c6514b8f7d95ad8da (diff)
downloadspark-0368eb9d86634c83b3140ce3190cb9e0d0b7fd86.tar.gz
spark-0368eb9d86634c83b3140ce3190cb9e0d0b7fd86.tar.bz2
spark-0368eb9d86634c83b3140ce3190cb9e0d0b7fd86.zip
[SPARK-20367] Properly unescape column names of partitioning columns parsed from paths.
## What changes were proposed in this pull request? When infering partitioning schema from paths, the column in parsePartitionColumn should be unescaped with unescapePathName, just like it is being done in e.g. parsePathFragmentAsSeq. ## How was this patch tested? Added a test to FileIndexSuite. Author: Juliusz Sompolski <julek@databricks.com> Closes #17703 from juliuszsompolski/SPARK-20367.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala12
2 files changed, 13 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index c3583209ef..2d70172487 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -243,7 +243,7 @@ object PartitioningUtils {
if (equalSignIndex == -1) {
None
} else {
- val columnName = columnSpec.take(equalSignIndex)
+ val columnName = unescapePathName(columnSpec.take(equalSignIndex))
assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index a9511cbd9e..b4616826e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
@@ -236,6 +237,17 @@ class FileIndexSuite extends SharedSQLContext {
val fileStatusCache = FileStatusCache.getOrCreate(spark)
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
}
+
+ test("SPARK-20367 - properly unescape column names in inferPartitioning") {
+ withTempPath { path =>
+ val colToUnescape = "Column/#%'?"
+ spark
+ .range(1)
+ .select(col("id").as(colToUnescape), col("id"))
+ .write.partitionBy(colToUnescape).parquet(path.getAbsolutePath)
+ assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape))
+ }
+ }
}
class FakeParentPathFileSystem extends RawLocalFileSystem {