diff options
author | Cheng Lian <lian@databricks.com> | 2015-05-27 10:09:12 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-05-27 10:09:20 -0700 |
commit | a25ce91f9685604cfb567a6860182ba467ceed8d (patch) | |
tree | ed10e05333b7b748860eb9adfc5fd350a1536605 | |
parent | 90525c9ba1ca8528567ea30e611511251d55f685 (diff) | |
download | spark-a25ce91f9685604cfb567a6860182ba467ceed8d.tar.gz spark-a25ce91f9685604cfb567a6860182ba467ceed8d.tar.bz2 spark-a25ce91f9685604cfb567a6860182ba467ceed8d.zip |
[SPARK-7847] [SQL] Fixes dynamic partition directory escaping
Please refer to [SPARK-7847] [1] for details.
[1]: https://issues.apache.org/jira/browse/SPARK-7847
Author: Cheng Lian <lian@databricks.com>
Closes #6389 from liancheng/spark-7847 and squashes the following commits:
935c652 [Cheng Lian] Adds test case for writing various data types as dynamic partition value
f4fc398 [Cheng Lian] Converts partition columns to Scala type when writing dynamic partitions
d0aeca0 [Cheng Lian] Fixes dynamic partition directory escaping
(cherry picked from commit 15459db4f6867e95076cf53fade2fca833c4cf4e)
Signed-off-by: Yin Huai <yhuai@databricks.com>
4 files changed, 152 insertions, 60 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index cb1e60883d..8b3e1b2b59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import java.net.URI import java.util.{List => JList} import scala.collection.JavaConversions._ @@ -282,21 +283,28 @@ private[sql] class ParquetRelation2( val cacheMetadata = useMetadataCache @transient val cachedStatuses = inputFiles.map { f => - // In order to encode the authority of a Path containing special characters such as /, - // we need to use the string returned by the URI of the path to create a new Path. - val pathWithAuthority = new Path(f.getPath.toUri.toString) - + // In order to encode the authority of a Path containing special characters such as '/' + // (which does happen in some S3N credentials), we need to use the string returned by the + // URI of the path to create a new Path. + val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) new FileStatus( f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime, - f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority) + f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority) }.toSeq @transient val cachedFooters = footers.map { f => // In order to encode the authority of a Path containing special characters such as /, // we need to use the string returned by the URI of the path to create a new Path. - new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata) + new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata) }.toSeq + private def escapePathUserInfo(path: Path): Path = { + val uri = path.toUri + new Path(new URI( + uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, uri.getPath, + uri.getQuery, uri.getFragment)) + } + // Overridden so we can inject our own cached files statuses. override def getPartitions: Array[SparkPartition] = { val inputFormat = if (cacheMetadata) { @@ -377,7 +385,7 @@ private[sql] class ParquetRelation2( .orElse(readSchema()) .orElse(maybeMetastoreSchema) .getOrElse(sys.error("Failed to get the schema.")) - + // If this Parquet relation is converted from a Hive Metastore table, must reconcile case // case insensitivity issue and possible schema mismatch (probably caused by schema // evolution). diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index e0ead23d78..dafdf0f8b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.Shell import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} @@ -221,7 +222,7 @@ private[sql] object PartitioningUtils { // Then falls back to string .getOrElse { if (raw == defaultPartitionName) Literal.create(null, NullType) - else Literal.create(raw, StringType) + else Literal.create(unescapePathName(raw), StringType) } } @@ -243,4 +244,77 @@ private[sql] object PartitioningUtils { Literal.create(Cast(l, desiredType).eval(), desiredType) } } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). + ////////////////////////////////////////////////////////////////////////////////////////////////// + + val charToEscape = { + val bitSet = new java.util.BitSet(128) + + /** + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + val clist = Array( + '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', + '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', + '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', + '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', + '{', '[', ']', '^') + + clist.foreach(bitSet.set(_)) + + if (Shell.WINDOWS) { + Array(' ', '<', '>', '|').foreach(bitSet.set(_)) + } + + bitSet + } + + def needsEscaping(c: Char): Boolean = { + c >= 0 && c < charToEscape.size() && charToEscape.get(c) + } + + def escapePathName(path: String): String = { + val builder = new StringBuilder() + path.foreach { c => + if (needsEscaping(c)) { + builder.append('%') + builder.append(f"${c.asInstanceOf[Int]}%02x") + } else { + builder.append(c) + } + } + + builder.toString() + } + + def unescapePathName(path: String): String = { + val sb = new StringBuilder + var i = 0 + + while (i < path.length) { + val c = path.charAt(i) + if (c == '%' && i + 2 < path.length) { + val code: Int = try { + Integer.valueOf(path.substring(i + 1, i + 3), 16) + } catch { case e: Exception => + -1: Integer + } + if (code >= 0) { + sb.append(code.asInstanceOf[Char]) + i += 3 + } else { + sb.append(c) + i += 1 + } + } else { + sb.append(c) + i += 1 + } + } + + sb.toString() + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index fbd98ef038..3132067d56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -24,7 +24,6 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} -import org.apache.hadoop.util.Shell import parquet.hadoop.util.ContextUtil import org.apache.spark._ @@ -35,7 +34,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode} private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, @@ -208,9 +208,11 @@ private[sql] case class InsertIntoHadoopFsRelation( writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart) } } else { + val partitionSchema = StructType.fromAttributes(partitionOutput) + val converter = CatalystTypeConverters.createToScalaConverter(partitionSchema) while (iterator.hasNext) { val row = iterator.next() - val partitionPart = partitionProj(row) + val partitionPart = converter(partitionProj(row)).asInstanceOf[Row] val dataPart = dataProj(row) writerContainer.outputWriterForRow(partitionPart).write(dataPart) } @@ -416,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer( val valueString = if (string == null || string.isEmpty) { defaultPartitionName } else { - DynamicPartitionWriterContainer.escapePathName(string) + PartitioningUtils.escapePathName(string) } s"/$col=$valueString" }.mkString.stripPrefix(Path.SEPARATOR) @@ -448,50 +450,3 @@ private[sql] class DynamicPartitionWriterContainer( } } } - -private[sql] object DynamicPartitionWriterContainer { - ////////////////////////////////////////////////////////////////////////////////////////////////// - // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). - ////////////////////////////////////////////////////////////////////////////////////////////////// - - val charToEscape = { - val bitSet = new java.util.BitSet(128) - - /** - * ASCII 01-1F are HTTP control characters that need to be escaped. - * \u000A and \u000D are \n and \r, respectively. - */ - val clist = Array( - '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', - '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', - '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', - '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', - '{', '[', ']', '^') - - clist.foreach(bitSet.set(_)) - - if (Shell.WINDOWS) { - Array(' ', '<', '>', '|').foreach(bitSet.set(_)) - } - - bitSet - } - - def needsEscaping(c: Char): Boolean = { - c >= 0 && c < charToEscape.size() && charToEscape.get(c) - } - - def escapePathName(path: String): String = { - val builder = new StringBuilder() - path.foreach { c => - if (DynamicPartitionWriterContainer.needsEscaping(c)) { - builder.append('%') - builder.append(f"${c.asInstanceOf[Int]}%02x") - } else { - builder.append(c) - } - } - - builder.toString() - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 90d4528efc..f231589e96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.parquet import java.io.File +import java.math.BigInteger +import java.sql.{Timestamp, Date} import scala.collection.mutable.ArrayBuffer @@ -27,7 +29,7 @@ import org.apache.spark.sql.sources.PartitioningUtils._ import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types._ -import org.apache.spark.sql.{QueryTest, Row, SQLContext} +import org.apache.spark.sql.{Column, QueryTest, Row, SQLContext} // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) @@ -377,4 +379,57 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } } } + + test("SPARK-7847: Dynamic partition directory path escaping and unescaping") { + withTempPath { dir => + val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s") + df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath) + checkAnswer(read.parquet(dir.getCanonicalPath), df.collect()) + } + } + + test("Various partition value types") { + val row = + Row( + 100.toByte, + 40000.toShort, + Int.MaxValue, + Long.MaxValue, + 1.5.toFloat, + 4.5, + new java.math.BigDecimal(new BigInteger("212500"), 5), + new java.math.BigDecimal(2.125), + java.sql.Date.valueOf("2015-05-23"), + new Timestamp(0), + "This is a string, /[]?=:", + "This is not a partition column") + + // BooleanType is not supported yet + val partitionColumnTypes = + Seq( + ByteType, + ShortType, + IntegerType, + LongType, + FloatType, + DoubleType, + DecimalType(10, 5), + DecimalType.Unlimited, + DateType, + TimestampType, + StringType) + + val partitionColumns = partitionColumnTypes.zipWithIndex.map { + case (t, index) => StructField(s"p_$index", t) + } + + val schema = StructType(partitionColumns :+ StructField(s"i", StringType)) + val df = createDataFrame(sparkContext.parallelize(row :: Nil), schema) + + withTempPath { dir => + df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) + val fields = schema.map(f => Column(f.name).cast(f.dataType)) + checkAnswer(read.load(dir.toString).select(fields: _*), row) + } + } } |