aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala104
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala6
4 files changed, 117 insertions, 91 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index fea54a2514..7ca44f7b81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -23,12 +23,11 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import parquet.filter2.predicate.FilterApi
-import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop._
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
@@ -175,8 +174,8 @@ private[sql] class ParquetRelation2(
override def dataSchema: StructType = metadataCache.dataSchema
override private[sql] def refresh(): Unit = {
- metadataCache.refresh()
super.refresh()
+ metadataCache.refresh()
}
// Parquet data source always uses Catalyst internal representations.
@@ -234,15 +233,15 @@ private[sql] class ParquetRelation2(
override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
+ inputFiles: Array[FileStatus]): RDD[Row] = {
val job = new Job(SparkHadoopUtil.get.conf)
val conf = ContextUtil.getConfiguration(job)
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])
- if (inputPaths.nonEmpty) {
- FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*)
+ if (inputFiles.nonEmpty) {
+ FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
// Try to push down filters when filter push-down is enabled.
@@ -269,10 +268,7 @@ private[sql] class ParquetRelation2(
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
- val inputFileStatuses =
- metadataCache.dataStatuses.filter(f => inputPaths.contains(f.getPath.toString))
-
- val footers = inputFileStatuses.map(metadataCache.footers)
+ val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
@@ -287,7 +283,7 @@ private[sql] class ParquetRelation2(
val cacheMetadata = useMetadataCache
- @transient val cachedStatuses = inputFileStatuses.map { f =>
+ @transient val cachedStatuses = inputFiles.map { f =>
// In order to encode the authority of a Path containing special characters such as /,
// we need to use the string returned by the URI of the path to create a new Path.
val pathWithAuthority = new Path(f.getPath.toUri.toString)
@@ -333,7 +329,7 @@ private[sql] class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
- var footers: Map[FileStatus, Footer] = _
+ var footers: Map[Path, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
@@ -349,35 +345,30 @@ private[sql] class ParquetRelation2(
* Refreshes `FileStatus`es, footers, partition spec, and table schema.
*/
def refresh(): Unit = {
- // Support either reading a collection of raw Parquet part-files, or a collection of folders
- // containing Parquet files (e.g. partitioned Parquet table).
- val baseStatuses = paths.distinct.flatMap { p =>
- val path = new Path(p)
- val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- Try(fs.getFileStatus(qualified)).toOption
- }
- assert(baseStatuses.forall(!_.isDir) || baseStatuses.forall(_.isDir))
-
// Lists `FileStatus`es of all leaf nodes (files) under all base directories.
- val leaves = baseStatuses.flatMap { f =>
- val fs = FileSystem.get(f.getPath.toUri, SparkHadoopUtil.get.conf)
- SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f =>
- isSummaryFile(f.getPath) ||
- !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
- }
- }
+ val leaves = cachedLeafStatuses().filter { f =>
+ isSummaryFile(f.getPath) ||
+ !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith("."))
+ }.toArray
dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath))
metadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)
- footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
- val parquetMetadata = ParquetFileReader.readFooter(
- SparkHadoopUtil.get.conf, f, ParquetMetadataConverter.NO_FILTER)
- f -> new Footer(f.getPath, parquetMetadata)
- }.seq.toMap
+ footers = {
+ val conf = SparkHadoopUtil.get.conf
+ val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
+ val rawFooters = if (shouldMergeSchemas) {
+ ParquetFileReader.readAllFootersInParallel(
+ conf, seqAsJavaList(leaves), taskSideMetaData)
+ } else {
+ ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
+ conf, seqAsJavaList(leaves), taskSideMetaData)
+ }
+
+ rawFooters.map(footer => footer.getFile -> footer).toMap
+ }
// If we already get the schema, don't need to re-compute it since the schema merging is
// time-consuming.
@@ -448,7 +439,7 @@ private[sql] class ParquetRelation2(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
- ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
+ ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e6324b20b3..1615a6dcbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,20 +17,16 @@
package org.apache.spark.sql.sources
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{UnionRDD, RDD}
-import org.apache.spark.sql.Row
+import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{StructType, UTF8String, StringType}
-import org.apache.spark.sql._
+import org.apache.spark.sql.types.{StringType, StructType, UTF8String}
+import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -58,7 +54,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
filters,
(a, _) => t.buildScan(a)) :: Nil
- // Scanning partitioned FSBasedRelation
+ // Scanning partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -86,22 +82,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
t.partitionSpec.partitionColumns,
selectedPartitions) :: Nil
- // Scanning non-partitioned FSBasedRelation
+ // Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
- val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
- val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
- val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.listLeafStatuses(fs, qualifiedPath).map(_.getPath).filterNot { path =>
- val name = path.getName
- name.startsWith("_") || name.startsWith(".")
- }.map(fs.makeQualified(_).toString)
- }
-
pruneFilterProject(
l,
projectList,
filters,
- (a, f) => t.buildScan(a, f, inputPaths)) :: Nil
+ (a, f) => t.buildScan(a, f, t.paths)) :: Nil
case l @ LogicalRelation(t: TableScan) =>
createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
@@ -130,16 +117,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
- // Paths to all data files within this partition
- val dataFilePaths = {
- val dirPath = new Path(dir)
- val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf)
- fs.listStatus(dirPath).map(_.getPath).filterNot { path =>
- val name = path.getName
- name.startsWith("_") || name.startsWith(".")
- }.map(fs.makeQualified(_).toString)
- }
-
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
// some partition column(s).
@@ -155,7 +132,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
- val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
+ val dataRows = relation.buildScan(nonPartitionColumns, filters, Array(dir))
// Merges data values with partition values.
mergeWithPartitionValues(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a82a6758d2..9b52d1be3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -17,14 +17,14 @@
package org.apache.spark.sql.sources
+import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
@@ -368,18 +368,61 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
private var _partitionSpec: PartitionSpec = _
+ private class FileStatusCache {
+ var leafFiles = mutable.Map.empty[Path, FileStatus]
+
+ var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+
+ var leafDirs = mutable.Map.empty[Path, FileStatus]
+
+ def refresh(): Unit = {
+ def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
+ files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
+ }
+
+ leafDirs.clear()
+ leafFiles.clear()
+
+ // We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
+ // may take advantages over them (e.g. Parquet _metadata and _common_metadata files).
+ val statuses = paths.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ Try(fs.getFileStatus(qualified)).toOption.toArray.flatMap(listLeafFilesAndDirs(fs, _))
+ }
+
+ val (dirs, files) = statuses.partition(_.isDir)
+ leafDirs ++= dirs.map(d => d.getPath -> d).toMap
+ leafFiles ++= files.map(f => f.getPath -> f).toMap
+ leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
+ }
+ }
+
+ private lazy val fileStatusCache = {
+ val cache = new FileStatusCache
+ cache.refresh()
+ cache
+ }
+
+ protected def cachedLeafStatuses(): Set[FileStatus] = {
+ fileStatusCache.leafFiles.values.toSet
+ }
+
final private[sql] def partitionSpec: PartitionSpec = {
if (_partitionSpec == null) {
_partitionSpec = maybePartitionSpec
.map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
.orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
.getOrElse {
- if (sqlContext.conf.partitionDiscoveryEnabled()) {
- discoverPartitions()
- } else {
- PartitionSpec(StructType(Nil), Array.empty[Partition])
+ if (sqlContext.conf.partitionDiscoveryEnabled()) {
+ discoverPartitions()
+ } else {
+ PartitionSpec(StructType(Nil), Array.empty[Partition])
+ }
}
- }
}
_partitionSpec
}
@@ -409,20 +452,14 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
def userDefinedPartitionColumns: Option[StructType] = None
private[sql] def refresh(): Unit = {
+ fileStatusCache.refresh()
if (sqlContext.conf.partitionDiscoveryEnabled()) {
_partitionSpec = discoverPartitions()
}
}
private def discoverPartitions(): PartitionSpec = {
- val basePaths = paths.map(new Path(_))
- val leafDirs = basePaths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
- .filter(_.isDir)
- .map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
- .getOrElse(Seq.empty[FileStatus])
- }.map(_.getPath)
+ val leafDirs = fileStatusCache.leafDirs.keys.toSeq
if (leafDirs.nonEmpty) {
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
@@ -444,6 +481,27 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}
+ private[sources] final def buildScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[String]): RDD[Row] = {
+ val inputStatuses = inputPaths.flatMap { input =>
+ val path = new Path(input)
+
+ // First assumes `input` is a directory path, and tries to get all files contained in it.
+ fileStatusCache.leafDirToChildrenFiles.getOrElse(
+ path,
+ // Otherwise, `input` might be a file path
+ fileStatusCache.leafFiles.get(path).toArray
+ ).filter { status =>
+ val name = status.getPath.getName
+ !name.startsWith("_") && !name.startsWith(".")
+ }
+ }
+
+ buildScan(requiredColumns, filters, inputStatuses)
+ }
+
/**
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
* columns are contained in the data files, they should also appear in `dataSchema`.
@@ -457,13 +515,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* this relation. For partitioned relations, this method is called for each selected partition,
* and builds an `RDD[Row]` containing all rows within that single partition.
*
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
* @since 1.4.0
*/
- def buildScan(inputPaths: Array[String]): RDD[Row] = {
+ def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = {
throw new UnsupportedOperationException(
"At least one buildScan() method should be overridden to read the relation.")
}
@@ -474,13 +532,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* and builds an `RDD[Row]` containing all rows within that single partition.
*
* @param requiredColumns Required columns.
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
* @since 1.4.0
*/
- def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
+ def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = {
// Yeah, to workaround serialization...
val dataSchema = this.dataSchema
val codegenEnabled = this.codegenEnabled
@@ -490,7 +548,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable)
}.toSeq
- buildScan(inputPaths).mapPartitions { rows =>
+ buildScan(inputFiles).mapPartitions { rows =>
val buildProjection = if (codegenEnabled) {
GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes)
} else {
@@ -512,7 +570,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
* of all `filters`. The pushed down filters are currently purely an optimization as they
* will all be evaluated again. This means it is safe to use them with methods that produce
* false positives such as filtering partitions based on a bloom filter.
- * @param inputPaths For a non-partitioned relation, it contains paths of all data files in the
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
* relation. For a partitioned relation, it contains paths of all data files in a single
* selected partition.
*
@@ -521,8 +579,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[String]): RDD[Row] = {
- buildScan(requiredColumns, inputPaths)
+ inputFiles: Array[FileStatus]): RDD[Row] = {
+ buildScan(requiredColumns, inputFiles)
}
/**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 29b21586f9..09eed6646c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -21,7 +21,7 @@ import java.text.NumberFormat
import java.util.UUID
import com.google.common.base.Objects
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
@@ -101,10 +101,10 @@ class SimpleTextRelation(
override def hashCode(): Int =
Objects.hashCode(paths, maybeDataSchema, dataSchema)
- override def buildScan(inputPaths: Array[String]): RDD[Row] = {
+ override def buildScan(inputStatuses: Array[FileStatus]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)
- sparkContext.textFile(inputPaths.mkString(",")).map { record =>
+ sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
Row(record.split(",").zip(fields).map { case (value, dataType) =>
Cast(Literal(value), dataType).eval()
}: _*)