aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorlazymam500 <lazyman500@gmail.com>2015-04-11 18:33:14 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-11 18:33:14 -0700
commit1f39a61118184e136f38381a9f3ba0b2d5d589d9 (patch)
treeb43bc2f15ee24525e9c78111bf1a9098f671cc7b /sql
parent2f53588738e95a2191f9844818e47f0d2ebbfd54 (diff)
downloadspark-1f39a61118184e136f38381a9f3ba0b2d5d589d9.tar.gz
spark-1f39a61118184e136f38381a9f3ba0b2d5d589d9.tar.bz2
spark-1f39a61118184e136f38381a9f3ba0b2d5d589d9.zip
[Spark-5068][SQL]Fix bug query data when path doesn't exist for HiveContext
This PR follow up PR #3907 & #3891 & #4356. According to marmbrus liancheng 's comments, I try to use fs.globStatus to retrieve all FileStatus objects under path(s), and then do the filtering locally. [1]. get pathPattern by path, and put it into pathPatternSet. (hdfs://cluster/user/demo/2016/08/12 -> hdfs://cluster/user/demo/*/*/*) [2]. retrieve all FileStatus objects ,and cache them by undating existPathSet. [3]. do the filtering locally [4]. if we have new pathPattern,do 1,2 step again. (external table maybe have more than one partition pathPattern) chenghao-intel jeanlyn Author: lazymam500 <lazyman500@gmail.com> Author: lazyman <lazyman500@gmail.com> Closes #5059 from lazyman500/SPARK-5068 and squashes the following commits: 5bfcbfd [lazyman] move spark.sql.hive.verifyPartitionPath to SQLConf,fix scala style e1d6386 [lazymam500] fix scala style f23133f [lazymam500] bug fix 47e0023 [lazymam500] fix scala style,add config flag,break the chaining 04c443c [lazyman] SPARK-5068: fix bug when partition path doesn't exists #2 41f60ce [lazymam500] Merge pull request #1 from apache/master
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala41
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala64
3 files changed, 110 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 4815620c6f..ee641bdfeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -39,6 +39,8 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
+ val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
+
val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
@@ -119,6 +121,10 @@ private[sql] class SQLConf extends Serializable {
private[spark] def parquetUseDataSourceApi =
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+ /** When true uses verifyPartitionPath to prune the path which is not exists. */
+ private[spark] def verifyPartitionPath =
+ getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
+
/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 3563472c7a..d35291543c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -142,7 +142,46 @@ class HadoopTableReader(
partitionToDeserializer: Map[HivePartition,
Class[_ <: Deserializer]],
filterOpt: Option[PathFilter]): RDD[Row] = {
- val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
+
+ // SPARK-5068:get FileStatus and do the filtering locally when the path is not exists
+ def verifyPartitionPath(
+ partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
+ Map[HivePartition, Class[_ <: Deserializer]] = {
+ if (!sc.conf.verifyPartitionPath) {
+ partitionToDeserializer
+ } else {
+ var existPathSet = collection.mutable.Set[String]()
+ var pathPatternSet = collection.mutable.Set[String]()
+ partitionToDeserializer.filter {
+ case (partition, partDeserializer) =>
+ def updateExistPathSetByPathPattern(pathPatternStr: String) {
+ val pathPattern = new Path(pathPatternStr)
+ val fs = pathPattern.getFileSystem(sc.hiveconf)
+ val matches = fs.globStatus(pathPattern)
+ matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
+ }
+ // convert /demo/data/year/month/day to /demo/data/*/*/*/
+ def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
+ var path = tempPath
+ for (i <- (1 to parNum)) path = path.getParent
+ val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
+ path.toString + tails
+ }
+
+ val partPath = HiveShim.getDataLocationPath(partition)
+ val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
+ var pathPatternStr = getPathPatternByPath(partNum, partPath)
+ if (!pathPatternSet.contains(pathPatternStr)) {
+ pathPatternSet += pathPatternStr
+ updateExistPathSetByPathPattern(pathPatternStr)
+ }
+ existPathSet.contains(partPath.toString)
+ }
+ }
+ }
+
+ val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
+ .map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = HiveShim.getDataLocationPath(partition)
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
new file mode 100644
index 0000000000..83f97128c5
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.util.Utils
+/* Implicits */
+import org.apache.spark.sql.hive.test.TestHive._
+
+
+
+class QueryPartitionSuite extends QueryTest {
+ import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+ test("SPARK-5068: query data when path doesn't exists"){
+ val testData = TestHive.sparkContext.parallelize(
+ (1 to 10).map(i => TestData(i, i.toString))).toDF()
+ testData.registerTempTable("testData")
+
+ val tmpDir = Files.createTempDir()
+ //create the table for test
+ sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') SELECT key,value FROM testData")
+ sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') SELECT key,value FROM testData")
+
+ //test for the exist path
+ checkAnswer(sql("select key,value from table_with_partition"),
+ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
+ ++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect)
+
+ //delect the path of one partition
+ val folders = tmpDir.listFiles.filter(_.isDirectory)
+ Utils.deleteRecursively(folders(0))
+
+ //test for affter delete the path
+ checkAnswer(sql("select key,value from table_with_partition"),
+ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
+ ++ testData.toSchemaRDD.collect)
+
+ sql("DROP TABLE table_with_partition")
+ sql("DROP TABLE createAndInsertTest")
+ }
+}