aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-27 10:09:12 -0700
committerYin Huai <yhuai@databricks.com>2015-05-27 10:09:12 -0700
commit15459db4f6867e95076cf53fade2fca833c4cf4e (patch)
treeeeb3b1e7f4c732a5fca3df5a2d31a989883005dc /sql/core
parentff0ddff46935ae3d036b7dbc437fff8a6c19d6a4 (diff)
downloadspark-15459db4f6867e95076cf53fade2fca833c4cf4e.tar.gz
spark-15459db4f6867e95076cf53fade2fca833c4cf4e.tar.bz2
spark-15459db4f6867e95076cf53fade2fca833c4cf4e.zip
[SPARK-7847] [SQL] Fixes dynamic partition directory escaping
Please refer to [SPARK-7847] [1] for details. [1]: https://issues.apache.org/jira/browse/SPARK-7847 Author: Cheng Lian <lian@databricks.com> Closes #6389 from liancheng/spark-7847 and squashes the following commits: 935c652 [Cheng Lian] Adds test case for writing various data types as dynamic partition value f4fc398 [Cheng Lian] Converts partition columns to Scala type when writing dynamic partitions d0aeca0 [Cheng Lian] Fixes dynamic partition directory escaping
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala76
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala57
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala57
4 files changed, 152 insertions, 60 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index cb1e60883d..8b3e1b2b59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.parquet
+import java.net.URI
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -282,21 +283,28 @@ private[sql] class ParquetRelation2(
val cacheMetadata = useMetadataCache
@transient val cachedStatuses = inputFiles.map { f =>
- // In order to encode the authority of a Path containing special characters such as /,
- // we need to use the string returned by the URI of the path to create a new Path.
- val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
+ // In order to encode the authority of a Path containing special characters such as '/'
+ // (which does happen in some S3N credentials), we need to use the string returned by the
+ // URI of the path to create a new Path.
+ val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
new FileStatus(
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
- f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
+ f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
}.toSeq
@transient val cachedFooters = footers.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
- new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+ new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
}.toSeq
+ private def escapePathUserInfo(path: Path): Path = {
+ val uri = path.toUri
+ new Path(new URI(
+ uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath,
+ uri.getQuery, uri.getFragment))
+ }
+
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
@@ -377,7 +385,7 @@ private[sql] class ParquetRelation2(
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.getOrElse(sys.error("Failed to get the schema."))
-
+
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index e0ead23d78..dafdf0f8b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -221,7 +222,7 @@ private[sql] object PartitioningUtils {
// Then falls back to string
.getOrElse {
if (raw == defaultPartitionName) Literal.create(null, NullType)
- else Literal.create(raw, StringType)
+ else Literal.create(unescapePathName(raw), StringType)
}
}
@@ -243,4 +244,77 @@ private[sql] object PartitioningUtils {
Literal.create(Cast(l, desiredType).eval(), desiredType)
}
}
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+ // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val charToEscape = {
+ val bitSet = new java.util.BitSet(128)
+
+ /**
+ * ASCII 01-1F are HTTP control characters that need to be escaped.
+ * \u000A and \u000D are \n and \r, respectively.
+ */
+ val clist = Array(
+ '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
+ '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
+ '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
+ '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
+ '{', '[', ']', '^')
+
+ clist.foreach(bitSet.set(_))
+
+ if (Shell.WINDOWS) {
+ Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+ }
+
+ bitSet
+ }
+
+ def needsEscaping(c: Char): Boolean = {
+ c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+ }
+
+ def escapePathName(path: String): String = {
+ val builder = new StringBuilder()
+ path.foreach { c =>
+ if (needsEscaping(c)) {
+ builder.append('%')
+ builder.append(f"${c.asInstanceOf[Int]}%02x")
+ } else {
+ builder.append(c)
+ }
+ }
+
+ builder.toString()
+ }
+
+ def unescapePathName(path: String): String = {
+ val sb = new StringBuilder
+ var i = 0
+
+ while (i < path.length) {
+ val c = path.charAt(i)
+ if (c == '%' && i + 2 < path.length) {
+ val code: Int = try {
+ Integer.valueOf(path.substring(i + 1, i + 3), 16)
+ } catch { case e: Exception =>
+ -1: Integer
+ }
+ if (code >= 0) {
+ sb.append(code.asInstanceOf[Char])
+ i += 3
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ } else {
+ sb.append(c)
+ i += 1
+ }
+ }
+
+ sb.toString()
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index fbd98ef038..3132067d56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.hadoop.util.Shell
import parquet.hadoop.util.ContextUtil
import org.apache.spark._
@@ -35,7 +34,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -208,9 +208,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart)
}
} else {
+ val partitionSchema = StructType.fromAttributes(partitionOutput)
+ val converter = CatalystTypeConverters.createToScalaConverter(partitionSchema)
while (iterator.hasNext) {
val row = iterator.next()
- val partitionPart = partitionProj(row)
+ val partitionPart = converter(partitionProj(row)).asInstanceOf[Row]
val dataPart = dataProj(row)
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
}
@@ -416,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
val valueString = if (string == null || string.isEmpty) {
defaultPartitionName
} else {
- DynamicPartitionWriterContainer.escapePathName(string)
+ PartitioningUtils.escapePathName(string)
}
s"/$col=$valueString"
}.mkString.stripPrefix(Path.SEPARATOR)
@@ -448,50 +450,3 @@ private[sql] class DynamicPartitionWriterContainer(
}
}
}
-
-private[sql] object DynamicPartitionWriterContainer {
- //////////////////////////////////////////////////////////////////////////////////////////////////
- // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils).
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- val charToEscape = {
- val bitSet = new java.util.BitSet(128)
-
- /**
- * ASCII 01-1F are HTTP control characters that need to be escaped.
- * \u000A and \u000D are \n and \r, respectively.
- */
- val clist = Array(
- '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009',
- '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013',
- '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C',
- '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F',
- '{', '[', ']', '^')
-
- clist.foreach(bitSet.set(_))
-
- if (Shell.WINDOWS) {
- Array(' ', '<', '>', '|').foreach(bitSet.set(_))
- }
-
- bitSet
- }
-
- def needsEscaping(c: Char): Boolean = {
- c >= 0 && c < charToEscape.size() && charToEscape.get(c)
- }
-
- def escapePathName(path: String): String = {
- val builder = new StringBuilder()
- path.foreach { c =>
- if (DynamicPartitionWriterContainer.needsEscaping(c)) {
- builder.append('%')
- builder.append(f"${c.asInstanceOf[Int]}%02x")
- } else {
- builder.append(c)
- }
- }
-
- builder.toString()
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 90d4528efc..f231589e96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
import java.io.File
+import java.math.BigInteger
+import java.sql.{Timestamp, Date}
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +29,7 @@ import org.apache.spark.sql.sources.PartitioningUtils._
import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.{Column, QueryTest, Row, SQLContext}
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -377,4 +379,57 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
}
}
}
+
+ test("SPARK-7847: Dynamic partition directory path escaping and unescaping") {
+ withTempPath { dir =>
+ val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s")
+ df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath)
+ checkAnswer(read.parquet(dir.getCanonicalPath), df.collect())
+ }
+ }
+
+ test("Various partition value types") {
+ val row =
+ Row(
+ 100.toByte,
+ 40000.toShort,
+ Int.MaxValue,
+ Long.MaxValue,
+ 1.5.toFloat,
+ 4.5,
+ new java.math.BigDecimal(new BigInteger("212500"), 5),
+ new java.math.BigDecimal(2.125),
+ java.sql.Date.valueOf("2015-05-23"),
+ new Timestamp(0),
+ "This is a string, /[]?=:",
+ "This is not a partition column")
+
+ // BooleanType is not supported yet
+ val partitionColumnTypes =
+ Seq(
+ ByteType,
+ ShortType,
+ IntegerType,
+ LongType,
+ FloatType,
+ DoubleType,
+ DecimalType(10, 5),
+ DecimalType.Unlimited,
+ DateType,
+ TimestampType,
+ StringType)
+
+ val partitionColumns = partitionColumnTypes.zipWithIndex.map {
+ case (t, index) => StructField(s"p_$index", t)
+ }
+
+ val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
+ val df = createDataFrame(sparkContext.parallelize(row :: Nil), schema)
+
+ withTempPath { dir =>
+ df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
+ val fields = schema.map(f => Column(f.name).cast(f.dataType))
+ checkAnswer(read.load(dir.toString).select(fields: _*), row)
+ }
+ }
}