aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-19 17:32:23 -0700
committerYin Huai <yhuai@databricks.com>2016-04-19 17:32:23 -0700
commit10f273d8db999cdc2e6c73bdbe98757de5d11676 (patch)
tree09150dc6ea97e6959b92aacdaed303c01207c611 /sql/core/src/main/scala/org
parent3664142350afb6bf40a8bcb3508b56670603dae4 (diff)
downloadspark-10f273d8db999cdc2e6c73bdbe98757de5d11676.tar.gz
spark-10f273d8db999cdc2e6c73bdbe98757de5d11676.tar.bz2
spark-10f273d8db999cdc2e6c73bdbe98757de5d11676.zip
[SPARK-14407][SQL] Hides HadoopFsRelation related data source API into execution/datasources package #12178
## What changes were proposed in this pull request? This PR moves `HadoopFsRelation` related data source API into `execution/datasources` package. Note that to avoid conflicts, this PR is based on #12153. Effective changes for this PR only consist of the last three commits. Will rebase after merging #12153. ## How was this patch tested? Existing tests. Author: Yin Huai <yhuai@databricks.com> Author: Cheng Lian <lian@databricks.com> Closes #12361 from liancheng/spark-14407-hide-hadoop-fs-relation.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala530
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala512
12 files changed, 547 insertions, 530 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 54d250867f..1deeb8a2d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -27,11 +27,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.util.Utils
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 1dc1b51e94..12d03a7df8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.toCommentSafeString
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
-import org.apache.spark.sql.types.{AtomicType, DataType}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.DataType
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f86911e002..90694d9af4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition => RDDPartition, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
@@ -45,7 +45,7 @@ case class PartitionedFile(
*
* TODO: This currently does not take locality information about the files into account.
*/
-case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends Partition
+case class FilePartition(index: Int, files: Seq[PartitionedFile]) extends RDDPartition
class FileScanRDD(
@transient val sqlContext: SQLContext,
@@ -53,7 +53,7 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private val inputMetrics = context.taskMetrics().inputMetrics
private val existingBytesRead = inputMetrics.bytesRead
@@ -130,5 +130,5 @@ class FileScanRDD(
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
}
- override protected def getPartitions: Array[Partition] = filePartitions.toArray
+ override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 815d1d01ef..b9527db6d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.UnsafeKVExternalSorter
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 54fb03b6d3..ed40cd0c81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -30,8 +30,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._
object CSVRelation extends Logging {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 06a371b88b..34db10f822 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -25,17 +25,15 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce._
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
-import org.apache.spark.util.collection.BitSet
/**
* Provides access to CSV data from pure SQL statements.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
new file mode 100644
index 0000000000..d37a939b54
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.FileRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * ::Experimental::
+ * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriterFactory extends Serializable {
+ /**
+ * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
+ * to instantiate new [[OutputWriter]]s.
+ *
+ * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
+ * this may not point to the final output file. For example, `FileOutputFormat` writes to
+ * temporary directories and then merge written files back to the final destination. In
+ * this case, `path` points to a temporary output file under the temporary directory.
+ * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
+ * schema if the relation being written is partitioned.
+ * @param context The Hadoop MapReduce task context.
+ * @since 1.4.0
+ */
+ private[sql] def newInstance(
+ path: String,
+ bucketId: Option[Int], // TODO: This doesn't belong here...
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter
+}
+
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
+ * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side. This instance is used to persist rows to this single output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
+ /**
+ * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
+ * tables, dynamic partition columns are not included in rows to be written.
+ *
+ * @since 1.4.0
+ */
+ def write(row: Row): Unit
+
+ /**
+ * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
+ * the task output is committed.
+ *
+ * @since 1.4.0
+ */
+ def close(): Unit
+
+ private var converter: InternalRow => Row = _
+
+ protected[sql] def initConverter(dataSchema: StructType) = {
+ converter =
+ CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+ }
+
+ protected[sql] def writeInternal(row: InternalRow): Unit = {
+ write(converter(row))
+ }
+}
+
+/**
+ * Acts as a container for all of the metadata required to read from a datasource. All discovery,
+ * resolution and merging logic for schemas and partitions has been removed.
+ *
+ * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
+ * this relation.
+ * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
+ * @param dataSchema The schema of any remaining columns. Note that if any partition columns are
+ * present in the actual data files as well, they are preserved.
+ * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
+ * @param fileFormat A file format that can be used to read and write the data in files.
+ * @param options Configuration used when reading / writing data.
+ */
+case class HadoopFsRelation(
+ sqlContext: SQLContext,
+ location: FileCatalog,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ bucketSpec: Option[BucketSpec],
+ fileFormat: FileFormat,
+ options: Map[String, String]) extends BaseRelation with FileRelation {
+
+ val schema: StructType = {
+ val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
+ StructType(dataSchema ++ partitionSchema.filterNot { column =>
+ dataSchemaColumnNames.contains(column.name.toLowerCase)
+ })
+ }
+
+ def partitionSchemaOption: Option[StructType] =
+ if (partitionSchema.isEmpty) None else Some(partitionSchema)
+ def partitionSpec: PartitionSpec = location.partitionSpec()
+
+ def refresh(): Unit = location.refresh()
+
+ override def toString: String =
+ s"HadoopFiles"
+
+ /** Returns the list of files that will be read when scanning this relation. */
+ override def inputFiles: Array[String] =
+ location.allFiles().map(_.getPath.toUri.toString).toArray
+
+ override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
+}
+
+/**
+ * Used to read and write data stored in files to/from the [[InternalRow]] format.
+ */
+trait FileFormat {
+ /**
+ * When possible, this method should return the schema of the given `files`. When the format
+ * does not support inference, or no valid files are given should return None. In these cases
+ * Spark will require that user specify the schema manually.
+ */
+ def inferSchema(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType]
+
+ /**
+ * Prepares a read job and returns a potentially updated data source option [[Map]]. This method
+ * can be useful for collecting necessary global information for scanning input data.
+ */
+ def prepareRead(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Map[String, String] = options
+
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+ */
+ def prepareWrite(
+ sqlContext: SQLContext,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory
+
+ /**
+ * Returns whether this format support returning columnar batch or not.
+ *
+ * TODO: we should just have different traits for the different formats.
+ */
+ def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = {
+ false
+ }
+
+ /**
+ * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
+ *
+ * @param dataSchema The global data schema. It can be either specified by the user, or
+ * reconciled/merged from all underlying data files. If any partition columns
+ * are contained in the files, they are preserved in this schema.
+ * @param partitionSchema The schema of the partition column row that will be present in each
+ * PartitionedFile. These columns should be appended to the rows that
+ * are produced by the iterator.
+ * @param requiredSchema The schema of the data that should be output for each row. This may be a
+ * subset of the columns that are present in the file if column pruning has
+ * occurred.
+ * @param filters A set of filters than can optionally be used to reduce the number of rows output
+ * @param options A set of string -> string configuration options.
+ * @return
+ */
+ def buildReader(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ // TODO: Remove this default implementation when the other formats have been ported
+ // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
+ throw new UnsupportedOperationException(s"buildReader is not supported for $this")
+ }
+}
+
+/**
+ * A collection of data files from a partitioned relation, along with the partition values in the
+ * form of an [[InternalRow]].
+ */
+case class Partition(values: InternalRow, files: Seq[FileStatus])
+
+/**
+ * An interface for objects capable of enumerating the files that comprise a relation as well
+ * as the partitioning characteristics of those files.
+ */
+trait FileCatalog {
+ def paths: Seq[Path]
+
+ def partitionSpec(): PartitionSpec
+
+ /**
+ * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+ * unpartitioned, this will return a single partition with not partition values.
+ *
+ * @param filters the filters used to prune which partitions are returned. These filters must
+ * only refer to partition columns and this method will only return files
+ * where these predicates are guaranteed to evaluate to `true`. Thus, these
+ * filters will not need to be evaluated again on the returned data.
+ */
+ def listFiles(filters: Seq[Expression]): Seq[Partition]
+
+ def allFiles(): Seq[FileStatus]
+
+ def getStatus(path: Path): Array[FileStatus]
+
+ def refresh(): Unit
+}
+
+/**
+ * A file catalog that caches metadata gathered by scanning all the files present in `paths`
+ * recursively.
+ *
+ * @param parameters as set of options to control discovery
+ * @param paths a list of paths to scan
+ * @param partitionSchema an optional partition schema that will be use to provide types for the
+ * discovered partitions
+ */
+class HDFSFileCatalog(
+ val sqlContext: SQLContext,
+ val parameters: Map[String, String],
+ val paths: Seq[Path],
+ val partitionSchema: Option[StructType])
+ extends FileCatalog with Logging {
+
+ private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+ var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
+ var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
+ var cachedPartitionSpec: PartitionSpec = _
+
+ def partitionSpec(): PartitionSpec = {
+ if (cachedPartitionSpec == null) {
+ cachedPartitionSpec = inferPartitioning(partitionSchema)
+ }
+
+ cachedPartitionSpec
+ }
+
+ refresh()
+
+ override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
+ if (partitionSpec().partitionColumns.isEmpty) {
+ Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
+ } else {
+ prunePartitions(filters, partitionSpec()).map {
+ case PartitionDirectory(values, path) =>
+ Partition(
+ values,
+ getStatus(path).filterNot(_.getPath.getName startsWith "_"))
+ }
+ }
+ }
+
+ protected def prunePartitions(
+ predicates: Seq[Expression],
+ partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
+ val PartitionSpec(partitionColumns, partitions) = partitionSpec
+ val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = InterpretedPredicate.create(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable = true)
+ })
+
+ val selected = partitions.filter {
+ case PartitionDirectory(values, _) => boundPredicate(values)
+ }
+ logInfo {
+ val total = partitions.length
+ val selectedSize = selected.length
+ val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+ s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
+ }
+
+ selected
+ } else {
+ partitions
+ }
+ }
+
+ def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
+
+ def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path)
+
+ private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
+ HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
+ } else {
+ val statuses = paths.flatMap { path =>
+ val fs = path.getFileSystem(hadoopConf)
+ logInfo(s"Listing $path on driver")
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(hadoopConf, this.getClass())
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ if (pathFilter != null) {
+ Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty)
+ } else {
+ Try(fs.listStatus(path)).getOrElse(Array.empty)
+ }
+ }.filterNot { status =>
+ val name = status.getPath.getName
+ HadoopFsRelation.shouldFilterOut(name)
+ }
+
+ val (dirs, files) = statuses.partition(_.isDirectory)
+
+ // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
+ if (dirs.isEmpty) {
+ mutable.LinkedHashSet(files: _*)
+ } else {
+ mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
+ }
+ }
+ }
+
+ def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
+ // We use leaf dirs containing data files to discover the schema.
+ val leafDirs = leafDirToChildrenFiles.keys.toSeq
+ schema match {
+ case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+ val spec = PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = false,
+ basePaths = basePaths)
+
+ // Without auto inference, all of value in the `row` should be null or in StringType,
+ // we need to cast into the data type that user specified.
+ def castPartitionValuesToUserSchema(row: InternalRow) = {
+ InternalRow((0 until row.numFields).map { i =>
+ Cast(
+ Literal.create(row.getUTF8String(i), StringType),
+ userProvidedSchema.fields(i).dataType).eval()
+ }: _*)
+ }
+
+ PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+ part.copy(values = castPartitionValuesToUserSchema(part.values))
+ })
+ case _ =>
+ PartitioningUtils.parsePartitions(
+ leafDirs,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
+ basePaths = basePaths)
+ }
+ }
+
+ /**
+ * Contains a set of paths that are considered as the base dirs of the input datasets.
+ * The partitioning discovery logic will make sure it will stop when it reaches any
+ * base path. By default, the paths of the dataset provided by users will be base paths.
+ * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
+ * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
+ * `something`. If users want to override the basePath. They can set `basePath` in the options
+ * to pass the new base path to the data source.
+ * For the above example, if the user-provided base path is `/path/`, the returned
+ * DataFrame will have the column of `something`.
+ */
+ private def basePaths: Set[Path] = {
+ val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
+ userDefinedBasePath.getOrElse {
+ // If the user does not provide basePath, we will just use paths.
+ paths.toSet
+ }.map { hdfsPath =>
+ // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
+ val fs = hdfsPath.getFileSystem(hadoopConf)
+ hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+ }
+
+ def refresh(): Unit = {
+ val files = listLeafFiles(paths)
+
+ leafFiles.clear()
+ leafDirToChildrenFiles.clear()
+
+ leafFiles ++= files.map(f => f.getPath -> f)
+ leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
+
+ cachedPartitionSpec = null
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet
+ case _ => false
+ }
+
+ override def hashCode(): Int = paths.toSet.hashCode()
+}
+
+/**
+ * Helper methods for gathering metadata from HDFS.
+ */
+private[sql] object HadoopFsRelation extends Logging {
+
+ /** Checks if we should filter out this path name. */
+ def shouldFilterOut(pathName: String): Boolean = {
+ // TODO: We should try to filter out all files/dirs starting with "." or "_".
+ // The only reason that we are not doing it now is that Parquet needs to find those
+ // metadata files from leaf files returned by this methods. We should refactor
+ // this logic to not mix metadata files with data files.
+ pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
+ }
+
+ // We don't filter files/directories whose name start with "_" except "_temporary" here, as
+ // specific data sources may take advantages over them (e.g. Parquet _metadata and
+ // _common_metadata files). "_temporary" directories are explicitly ignored since failed
+ // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name
+ // start with "." are also ignored.
+ def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
+ logInfo(s"Listing ${status.getPath}")
+ val name = status.getPath.getName.toLowerCase
+ if (shouldFilterOut(name)) {
+ Array.empty
+ } else {
+ // Dummy jobconf to get to the pathFilter defined in configuration
+ val jobConf = new JobConf(fs.getConf, this.getClass())
+ val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
+ val statuses =
+ if (pathFilter != null) {
+ val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ } else {
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
+ files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
+ }
+ statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
+ }
+ }
+
+ // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
+ // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
+ // Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from
+ // executor side and reconstruct it on driver side.
+ case class FakeFileStatus(
+ path: String,
+ length: Long,
+ isDir: Boolean,
+ blockReplication: Short,
+ blockSize: Long,
+ modificationTime: Long,
+ accessTime: Long)
+
+ def listLeafFilesInParallel(
+ paths: Seq[Path],
+ hadoopConf: Configuration,
+ sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
+ logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+
+ val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+ val serializedPaths = paths.map(_.toString)
+
+ val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path =>
+ val fs = path.getFileSystem(serializableConfiguration.value)
+ Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
+ }.map { status =>
+ FakeFileStatus(
+ status.getPath.toString,
+ status.getLen,
+ status.isDirectory,
+ status.getReplication,
+ status.getBlockSize,
+ status.getModificationTime,
+ status.getAccessTime)
+ }.collect()
+
+ val hadoopFakeStatuses = fakeStatuses.map { f =>
+ new FileStatus(
+ f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
+ }
+ mutable.LinkedHashSet(hadoopFakeStatuses: _*)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 28ac4583e9..5b8dc4a3ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 94ecb7a286..fa0df61ca5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 6921ae584d..f3c1cc5ef5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.sources.FileFormat
+import org.apache.spark.sql.execution.datasources.FileFormat
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index b8d69b1845..b1f93a9159 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -23,8 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources.{FileCatalog, Partition}
+import org.apache.spark.sql.execution.datasources.{FileCatalog, Partition, PartitionSpec}
import org.apache.spark.sql.types.StructType
class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 4b9bf8daae..26285bde31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -17,28 +17,13 @@
package org.apache.spark.sql.sources
-import scala.collection.mutable
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{Sink, Source}
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
-import org.apache.spark.util.collection.BitSet
+import org.apache.spark.sql.types.StructType
/**
* ::DeveloperApi::
@@ -318,496 +303,3 @@ trait InsertableRelation {
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}
-
-/**
- * ::Experimental::
- * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
- * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
- * to executor side to create actual [[OutputWriter]]s on the fly.
- *
- * @since 1.4.0
- */
-@Experimental
-abstract class OutputWriterFactory extends Serializable {
- /**
- * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
- * to instantiate new [[OutputWriter]]s.
- *
- * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
- * this may not point to the final output file. For example, `FileOutputFormat` writes to
- * temporary directories and then merge written files back to the final destination. In
- * this case, `path` points to a temporary output file under the temporary directory.
- * @param dataSchema Schema of the rows to be written. Partition columns are not included in the
- * schema if the relation being written is partitioned.
- * @param context The Hadoop MapReduce task context.
- * @since 1.4.0
- */
- private[sql] def newInstance(
- path: String,
- bucketId: Option[Int], // TODO: This doesn't belong here...
- dataSchema: StructType,
- context: TaskAttemptContext): OutputWriter
-}
-
-/**
- * ::Experimental::
- * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
- * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
- * executor side. This instance is used to persist rows to this single output file.
- *
- * @since 1.4.0
- */
-@Experimental
-abstract class OutputWriter {
- /**
- * Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
- * tables, dynamic partition columns are not included in rows to be written.
- *
- * @since 1.4.0
- */
- def write(row: Row): Unit
-
- /**
- * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
- * the task output is committed.
- *
- * @since 1.4.0
- */
- def close(): Unit
-
- private var converter: InternalRow => Row = _
-
- protected[sql] def initConverter(dataSchema: StructType) = {
- converter =
- CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
- }
-
- protected[sql] def writeInternal(row: InternalRow): Unit = {
- write(converter(row))
- }
-}
-
-/**
- * Acts as a container for all of the metadata required to read from a datasource. All discovery,
- * resolution and merging logic for schemas and partitions has been removed.
- *
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise
- * this relation.
- * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
- * @param dataSchema The schema of any remaining columns. Note that if any partition columns are
- * present in the actual data files as well, they are preserved.
- * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values).
- * @param fileFormat A file format that can be used to read and write the data in files.
- * @param options Configuration used when reading / writing data.
- */
-case class HadoopFsRelation(
- sqlContext: SQLContext,
- location: FileCatalog,
- partitionSchema: StructType,
- dataSchema: StructType,
- bucketSpec: Option[BucketSpec],
- fileFormat: FileFormat,
- options: Map[String, String]) extends BaseRelation with FileRelation {
-
- val schema: StructType = {
- val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
- StructType(dataSchema ++ partitionSchema.filterNot { column =>
- dataSchemaColumnNames.contains(column.name.toLowerCase)
- })
- }
-
- def partitionSchemaOption: Option[StructType] =
- if (partitionSchema.isEmpty) None else Some(partitionSchema)
- def partitionSpec: PartitionSpec = location.partitionSpec()
-
- def refresh(): Unit = location.refresh()
-
- override def toString: String =
- s"HadoopFiles"
-
- /** Returns the list of files that will be read when scanning this relation. */
- override def inputFiles: Array[String] =
- location.allFiles().map(_.getPath.toUri.toString).toArray
-
- override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
-}
-
-/**
- * Used to read and write data stored in files to/from the [[InternalRow]] format.
- */
-trait FileFormat {
- /**
- * When possible, this method should return the schema of the given `files`. When the format
- * does not support inference, or no valid files are given should return None. In these cases
- * Spark will require that user specify the schema manually.
- */
- def inferSchema(
- sqlContext: SQLContext,
- options: Map[String, String],
- files: Seq[FileStatus]): Option[StructType]
-
- /**
- * Prepares a read job and returns a potentially updated data source option [[Map]]. This method
- * can be useful for collecting necessary global information for scanning input data.
- */
- def prepareRead(
- sqlContext: SQLContext,
- options: Map[String, String],
- files: Seq[FileStatus]): Map[String, String] = options
-
- /**
- * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
- * be put here. For example, user defined output committer can be configured here
- * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
- */
- def prepareWrite(
- sqlContext: SQLContext,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory
-
- /**
- * Returns whether this format support returning columnar batch or not.
- *
- * TODO: we should just have different traits for the different formats.
- */
- def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = {
- false
- }
-
- /**
- * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
- *
- * @param dataSchema The global data schema. It can be either specified by the user, or
- * reconciled/merged from all underlying data files. If any partition columns
- * are contained in the files, they are preserved in this schema.
- * @param partitionSchema The schema of the partition column row that will be present in each
- * PartitionedFile. These columns should be appended to the rows that
- * are produced by the iterator.
- * @param requiredSchema The schema of the data that should be output for each row. This may be a
- * subset of the columns that are present in the file if column pruning has
- * occurred.
- * @param filters A set of filters than can optionally be used to reduce the number of rows output
- * @param options A set of string -> string configuration options.
- * @return
- */
- def buildReader(
- sqlContext: SQLContext,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
- // TODO: Remove this default implementation when the other formats have been ported
- // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
- throw new UnsupportedOperationException(s"buildReader is not supported for $this")
- }
-}
-
-/**
- * A collection of data files from a partitioned relation, along with the partition values in the
- * form of an [[InternalRow]].
- */
-case class Partition(values: InternalRow, files: Seq[FileStatus])
-
-/**
- * An interface for objects capable of enumerating the files that comprise a relation as well
- * as the partitioning characteristics of those files.
- */
-trait FileCatalog {
- def paths: Seq[Path]
-
- def partitionSpec(): PartitionSpec
-
- /**
- * Returns all valid files grouped into partitions when the data is partitioned. If the data is
- * unpartitioned, this will return a single partition with not partition values.
- *
- * @param filters the filters used to prune which partitions are returned. These filters must
- * only refer to partition columns and this method will only return files
- * where these predicates are guaranteed to evaluate to `true`. Thus, these
- * filters will not need to be evaluated again on the returned data.
- */
- def listFiles(filters: Seq[Expression]): Seq[Partition]
-
- def allFiles(): Seq[FileStatus]
-
- def getStatus(path: Path): Array[FileStatus]
-
- def refresh(): Unit
-}
-
-/**
- * A file catalog that caches metadata gathered by scanning all the files present in `paths`
- * recursively.
- *
- * @param parameters as set of options to control discovery
- * @param paths a list of paths to scan
- * @param partitionSchema an optional partition schema that will be use to provide types for the
- * discovered partitions
- */
-class HDFSFileCatalog(
- val sqlContext: SQLContext,
- val parameters: Map[String, String],
- val paths: Seq[Path],
- val partitionSchema: Option[StructType])
- extends FileCatalog with Logging {
-
- private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-
- var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
- var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
- var cachedPartitionSpec: PartitionSpec = _
-
- def partitionSpec(): PartitionSpec = {
- if (cachedPartitionSpec == null) {
- cachedPartitionSpec = inferPartitioning(partitionSchema)
- }
-
- cachedPartitionSpec
- }
-
- refresh()
-
- override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
- if (partitionSpec().partitionColumns.isEmpty) {
- Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
- } else {
- prunePartitions(filters, partitionSpec()).map {
- case PartitionDirectory(values, path) =>
- Partition(
- values,
- getStatus(path).filterNot(_.getPath.getName startsWith "_"))
- }
- }
- }
-
- protected def prunePartitions(
- predicates: Seq[Expression],
- partitionSpec: PartitionSpec): Seq[PartitionDirectory] = {
- val PartitionSpec(partitionColumns, partitions) = partitionSpec
- val partitionColumnNames = partitionColumns.map(_.name).toSet
- val partitionPruningPredicates = predicates.filter {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
-
- if (partitionPruningPredicates.nonEmpty) {
- val predicate = partitionPruningPredicates.reduce(expressions.And)
-
- val boundPredicate = InterpretedPredicate.create(predicate.transform {
- case a: AttributeReference =>
- val index = partitionColumns.indexWhere(a.name == _.name)
- BoundReference(index, partitionColumns(index).dataType, nullable = true)
- })
-
- val selected = partitions.filter {
- case PartitionDirectory(values, _) => boundPredicate(values)
- }
- logInfo {
- val total = partitions.length
- val selectedSize = selected.length
- val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
- s"Selected $selectedSize partitions out of $total, pruned $percentPruned% partitions."
- }
-
- selected
- } else {
- partitions
- }
- }
-
- def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq
-
- def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path)
-
- private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
- if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) {
- HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext)
- } else {
- val statuses = paths.flatMap { path =>
- val fs = path.getFileSystem(hadoopConf)
- logInfo(s"Listing $path on driver")
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(hadoopConf, this.getClass())
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- if (pathFilter != null) {
- Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty)
- } else {
- Try(fs.listStatus(path)).getOrElse(Array.empty)
- }
- }.filterNot { status =>
- val name = status.getPath.getName
- HadoopFsRelation.shouldFilterOut(name)
- }
-
- val (dirs, files) = statuses.partition(_.isDirectory)
-
- // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
- if (dirs.isEmpty) {
- mutable.LinkedHashSet(files: _*)
- } else {
- mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath))
- }
- }
- }
-
- def inferPartitioning(schema: Option[StructType]): PartitionSpec = {
- // We use leaf dirs containing data files to discover the schema.
- val leafDirs = leafDirToChildrenFiles.keys.toSeq
- schema match {
- case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
- val spec = PartitioningUtils.parsePartitions(
- leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = false,
- basePaths = basePaths)
-
- // Without auto inference, all of value in the `row` should be null or in StringType,
- // we need to cast into the data type that user specified.
- def castPartitionValuesToUserSchema(row: InternalRow) = {
- InternalRow((0 until row.numFields).map { i =>
- Cast(
- Literal.create(row.getUTF8String(i), StringType),
- userProvidedSchema.fields(i).dataType).eval()
- }: _*)
- }
-
- PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
- part.copy(values = castPartitionValuesToUserSchema(part.values))
- })
- case _ =>
- PartitioningUtils.parsePartitions(
- leafDirs,
- PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
- basePaths = basePaths)
- }
- }
-
- /**
- * Contains a set of paths that are considered as the base dirs of the input datasets.
- * The partitioning discovery logic will make sure it will stop when it reaches any
- * base path. By default, the paths of the dataset provided by users will be base paths.
- * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
- * will be `/path/something=true/`, and the returned DataFrame will not contain a column of
- * `something`. If users want to override the basePath. They can set `basePath` in the options
- * to pass the new base path to the data source.
- * For the above example, if the user-provided base path is `/path/`, the returned
- * DataFrame will have the column of `something`.
- */
- private def basePaths: Set[Path] = {
- val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
- userDefinedBasePath.getOrElse {
- // If the user does not provide basePath, we will just use paths.
- paths.toSet
- }.map { hdfsPath =>
- // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
- val fs = hdfsPath.getFileSystem(hadoopConf)
- hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
- }
-
- def refresh(): Unit = {
- val files = listLeafFiles(paths)
-
- leafFiles.clear()
- leafDirToChildrenFiles.clear()
-
- leafFiles ++= files.map(f => f.getPath -> f)
- leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent)
-
- cachedPartitionSpec = null
- }
-
- override def equals(other: Any): Boolean = other match {
- case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet
- case _ => false
- }
-
- override def hashCode(): Int = paths.toSet.hashCode()
-}
-
-/**
- * Helper methods for gathering metadata from HDFS.
- */
-private[sql] object HadoopFsRelation extends Logging {
-
- /** Checks if we should filter out this path name. */
- def shouldFilterOut(pathName: String): Boolean = {
- // TODO: We should try to filter out all files/dirs starting with "." or "_".
- // The only reason that we are not doing it now is that Parquet needs to find those
- // metadata files from leaf files returned by this methods. We should refactor
- // this logic to not mix metadata files with data files.
- pathName == "_SUCCESS" || pathName == "_temporary" || pathName.startsWith(".")
- }
-
- // We don't filter files/directories whose name start with "_" except "_temporary" here, as
- // specific data sources may take advantages over them (e.g. Parquet _metadata and
- // _common_metadata files). "_temporary" directories are explicitly ignored since failed
- // tasks/jobs may leave partial/corrupted data files there. Files and directories whose name
- // start with "." are also ignored.
- def listLeafFiles(fs: FileSystem, status: FileStatus): Array[FileStatus] = {
- logInfo(s"Listing ${status.getPath}")
- val name = status.getPath.getName.toLowerCase
- if (shouldFilterOut(name)) {
- Array.empty
- } else {
- // Dummy jobconf to get to the pathFilter defined in configuration
- val jobConf = new JobConf(fs.getConf, this.getClass())
- val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- val statuses =
- if (pathFilter != null) {
- val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- } else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
- files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
- }
- statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
- }
- }
-
- // `FileStatus` is Writable but not serializable. What make it worse, somehow it doesn't play
- // well with `SerializableWritable`. So there seems to be no way to serialize a `FileStatus`.
- // Here we use `FakeFileStatus` to extract key components of a `FileStatus` to serialize it from
- // executor side and reconstruct it on driver side.
- case class FakeFileStatus(
- path: String,
- length: Long,
- isDir: Boolean,
- blockReplication: Short,
- blockSize: Long,
- modificationTime: Long,
- accessTime: Long)
-
- def listLeafFilesInParallel(
- paths: Seq[Path],
- hadoopConf: Configuration,
- sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
- logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
-
- val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = paths.map(_.toString)
-
- val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path =>
- val fs = path.getFileSystem(serializableConfiguration.value)
- Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty)
- }.map { status =>
- FakeFileStatus(
- status.getPath.toString,
- status.getLen,
- status.isDirectory,
- status.getReplication,
- status.getBlockSize,
- status.getModificationTime,
- status.getAccessTime)
- }.collect()
-
- val hadoopFakeStatuses = fakeStatuses.map { f =>
- new FileStatus(
- f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path))
- }
- mutable.LinkedHashSet(hadoopFakeStatuses: _*)
- }
-}