aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-11-18 18:41:40 -0800
committerYin Huai <yhuai@databricks.com>2015-11-18 18:41:40 -0800
commit9c0654d36c6d171dd273850c2cc2f415cc2a5a6b (patch)
tree654a735d1c8f639e5ef5651b716a5f22423e9d39
parent6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d (diff)
downloadspark-9c0654d36c6d171dd273850c2cc2f415cc2a5a6b.tar.gz
spark-9c0654d36c6d171dd273850c2cc2f415cc2a5a6b.tar.bz2
spark-9c0654d36c6d171dd273850c2cc2f415cc2a5a6b.zip
Revert "[SPARK-11544][SQL] sqlContext doesn't use PathFilter"
This reverts commit 54db79702513e11335c33bcf3a03c59e965e6f16.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala25
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala36
2 files changed, 7 insertions, 54 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index f9465157c9..b3d3bdf50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
-import org.apache.hadoop.mapred.{JobConf, FileInputFormat}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.{Logging, SparkContext}
@@ -448,15 +447,9 @@ abstract class HadoopFsRelation private[sql](
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+
logInfo(s"Listing $qualified on driver")
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass())
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- if (pathFilter != null) {
- Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
- } else {
- Try(fs.listStatus(qualified)).getOrElse(Array.empty)
- }
+ Try(fs.listStatus(qualified)).getOrElse(Array.empty)
}.filterNot { status =>
val name = status.getPath.getName
name.toLowerCase == "_temporary" || name.startsWith(".")
@@ -854,16 +847,8 @@ private[sql] object HadoopFsRelation extends Logging {
if (name == "_temporary" || name.startsWith(".")) {
Array.empty
} else {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(fs.getConf, this.getClass())
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- if (pathFilter != null) {
- val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- } else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- }
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index f09b61e838..6042b1178a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -19,27 +19,19 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.sql.{Date, Timestamp}
-import scala.collection.JavaConverters._
import com.fasterxml.jackson.core.JsonFactory
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.spark.rdd.RDD
import org.scalactic.Tolerance._
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class TestFileFilter extends PathFilter {
- override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
-}
-
class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
import testImplicits._
@@ -1398,28 +1390,4 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
}
}
-
- test("SPARK-11544 test pathfilter") {
- withTempPath { dir =>
- val path = dir.getCanonicalPath
-
- val df = sqlContext.range(2)
- df.write.json(path + "/p=1")
- df.write.json(path + "/p=2")
- assert(sqlContext.read.json(path).count() === 4)
-
- val clonedConf = new Configuration(hadoopConfiguration)
- try {
- hadoopConfiguration.setClass(
- "mapreduce.input.pathFilter.class",
- classOf[TestFileFilter],
- classOf[PathFilter])
- assert(sqlContext.read.json(path).count() === 2)
- } finally {
- // Hadoop 1 doesn't have `Configuration.unset`
- hadoopConfiguration.clear()
- clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
- }
- }
- }
}