From 59272dad77eb95c5ae8e0652e00d02a2675cda53 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 10 Feb 2015 11:54:30 -0800 Subject: [SPARK-5592][SQL] java.net.URISyntaxException when insert data to a partitioned table flowing sql get URISyntaxException: ``` create table sc as select * from (select '2011-01-11', '2011-01-11+14:18:26' from src tablesample (1 rows) union all select '2011-01-11', '2011-01-11+15:18:26' from src tablesample (1 rows) union all select '2011-01-11', '2011-01-11+16:18:26' from src tablesample (1 rows) ) s; create table sc_part (key string) partitioned by (ts string) stored as rcfile; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table sc_part partition(ts) select * from sc; ``` java.net.URISyntaxException: Relative path in absolute URI: ts=2011-01-11+15:18:26 at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.(Path.java:172) at org.apache.hadoop.fs.Path.(Path.java:94) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:230) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:243) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:243) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:243) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:113) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:105) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:105) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:87) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:87) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) Caused by: java.net.URISyntaxException: Relative path in absolute URI: ts=2011-01-11+15:18:26 at java.net.URI.checkPath(URI.java:1804) at java.net.URI.(URI.java:752) at org.apache.hadoop.fs.Path.initialize(Path.java:203) Author: wangfei Author: Fei Wang Closes #4368 from scwf/SPARK-5592 and squashes the following commits: aa55ef4 [Fei Wang] comments addressed f8f8bb1 [wangfei] added test case f24624f [wangfei] Merge branch 'master' of https://github.com/apache/spark into SPARK-5592 9998177 [wangfei] added test case ea81daf [wangfei] fix URISyntaxException --- .../org/apache/spark/sql/hive/hiveWriterContainers.scala | 12 +++++++++--- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) (limited to 'sql') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index aae175e426..f136e43acc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred._ +import org.apache.hadoop.hive.common.FileUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row @@ -212,9 +213,14 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( .zip(row.toSeq.takeRight(dynamicPartColNames.length)) .map { case (col, rawVal) => val string = if (rawVal == null) null else String.valueOf(rawVal) - s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}" - } - .mkString + val colString = + if (string == null || string.isEmpty) { + defaultPartName + } else { + FileUtils.escapePathName(string) + } + s"/$col=$colString" + }.mkString def newWriter = { val newFileSinkDesc = new FileSinkDesc( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 27047ce4b1..405b200d05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -859,6 +859,22 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } } + test("SPARK-5592: get java.net.URISyntaxException when dynamic partitioning") { + sql(""" + |create table sc as select * + |from (select '2011-01-11', '2011-01-11+14:18:26' from src tablesample (1 rows) + |union all + |select '2011-01-11', '2011-01-11+15:18:26' from src tablesample (1 rows) + |union all + |select '2011-01-11', '2011-01-11+16:18:26' from src tablesample (1 rows) ) s + """.stripMargin) + sql("create table sc_part (key string) partitioned by (ts string) stored as rcfile") + sql("set hive.exec.dynamic.partition=true") + sql("set hive.exec.dynamic.partition.mode=nonstrict") + sql("insert overwrite table sc_part partition(ts) select * from sc") + sql("drop table sc_part") + } + test("Partition spec validation") { sql("DROP TABLE IF EXISTS dp_test") sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") -- cgit v1.2.3