aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDilip Biswal <dbiswal@us.ibm.com>2015-11-19 19:46:10 -0800
committerYin Huai <yhuai@databricks.com>2015-11-19 19:46:10 -0800
commit7ee7d5a3c4ff77d2cee2afce36ff41f6302e6315 (patch)
tree9a16e6b28be590b8065141dd50a0f911e6cf4c6f
parentee21407747fb00db2f26d1119446ccbb20c19232 (diff)
downloadspark-7ee7d5a3c4ff77d2cee2afce36ff41f6302e6315.tar.gz
spark-7ee7d5a3c4ff77d2cee2afce36ff41f6302e6315.tar.bz2
spark-7ee7d5a3c4ff77d2cee2afce36ff41f6302e6315.zip
[SPARK-11544][SQL][TEST-HADOOP1.0] sqlContext doesn't use PathFilter
Apply the user supplied pathfilter while retrieving the files from fs. Author: Dilip Biswal <dbiswal@us.ibm.com> Closes #9830 from dilipbiswal/spark-11544.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala41
2 files changed, 59 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index b3d3bdf50d..f9465157c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.{JobConf, FileInputFormat}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.{Logging, SparkContext}
@@ -447,9 +448,15 @@ abstract class HadoopFsRelation private[sql](
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
logInfo(s"Listing $qualified on driver")
- Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass())
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ if (pathFilter != null) {
+ Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
+ } else {
+ Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+ }
}.filterNot { status =>
val name = status.getPath.getName
name.toLowerCase == "_temporary" || name.startsWith(".")
@@ -847,8 +854,16 @@ private[sql] object HadoopFsRelation extends Logging {
if (name == "_temporary" || name.startsWith(".")) {
Array.empty
} else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(fs.getConf, this.getClass())
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ if (pathFilter != null) {
+ val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ } else {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6042b1178a..ba7718c864 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -19,19 +19,27 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.sql.{Date, Timestamp}
+import scala.collection.JavaConverters._
import com.fasterxml.jackson.core.JsonFactory
-import org.apache.spark.rdd.RDD
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, PathFilter}
import org.scalactic.Tolerance._
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
+class TestFileFilter extends PathFilter {
+ override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
+}
+
class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
import testImplicits._
@@ -1390,4 +1398,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
}
+
+ test("SPARK-11544 test pathfilter") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = sqlContext.range(2)
+ df.write.json(path + "/p=1")
+ df.write.json(path + "/p=2")
+ assert(sqlContext.read.json(path).count() === 4)
+
+ val clonedConf = new Configuration(hadoopConfiguration)
+ try {
+ // Setting it twice as the name of the propery has changed between hadoop versions.
+ hadoopConfiguration.setClass(
+ "mapred.input.pathFilter.class",
+ classOf[TestFileFilter],
+ classOf[PathFilter])
+ hadoopConfiguration.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[TestFileFilter],
+ classOf[PathFilter])
+ assert(sqlContext.read.json(path).count() === 2)
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ hadoopConfiguration.clear()
+ clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
+ }
+ }
+ }
}