aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala)24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala)8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala)16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala36
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala6
21 files changed, 79 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b54885b7ff..3f7cfd9d2c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source {
val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
/**
- * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
+ * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.
*/
val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index fb72c679e3..526623a36d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -177,7 +177,7 @@ class CacheManager extends Logging {
/**
* Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
- * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
+ * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
* in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
* false.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index b459df5734..092aabc89a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -26,23 +26,23 @@ import org.apache.spark.sql.types.StructType
/**
- * A [[FileCatalog]] for a metastore catalog table.
+ * A [[FileIndex]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
* @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
*/
-class TableFileCatalog(
+class CatalogFileIndex(
sparkSession: SparkSession,
val table: CatalogTable,
- override val sizeInBytes: Long) extends FileCatalog {
+ override val sizeInBytes: Long) extends FileIndex {
protected val hadoopConf = sparkSession.sessionState.newHadoopConf
private val fileStatusCache = FileStatusCache.newCache(sparkSession)
assert(table.identifier.database.isDefined,
- "The table identifier must be qualified in TableFileCatalog")
+ "The table identifier must be qualified in CatalogFileIndex")
private val baseLocation = table.storage.locationUri
@@ -57,12 +57,12 @@ class TableFileCatalog(
override def refresh(): Unit = fileStatusCache.invalidateAll()
/**
- * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
+ * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
* specified by the given partition-pruning filters.
*
* @param filters partition-pruning filters
*/
- def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
+ def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
if (table.partitionColumnNames.nonEmpty) {
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
@@ -70,20 +70,20 @@ class TableFileCatalog(
PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
- new PrunedTableFileCatalog(
+ new PrunedInMemoryFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
} else {
- new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
+ new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
}
}
override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
- // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
+ // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
// of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
// implement `equals` and `hashCode` here, to make it work with cache lookup.
override def equals(o: Any): Boolean = o match {
- case other: TableFileCatalog => this.table.identifier == other.table.identifier
+ case other: CatalogFileIndex => this.table.identifier == other.table.identifier
case _ => false
}
@@ -97,12 +97,12 @@ class TableFileCatalog(
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
-private class PrunedTableFileCatalog(
+private class PrunedInMemoryFileIndex(
sparkSession: SparkSession,
tableBasePath: Path,
fileStatusCache: FileStatusCache,
override val partitionSpec: PartitionSpec)
- extends ListingFileCatalog(
+ extends InMemoryFileIndex(
sparkSession,
partitionSpec.partitions.map(_.path),
Map.empty,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5b8f05a396..996109865f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -202,7 +202,7 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified)
}.toArray
- val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
+ val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
val partitionSchema = fileCatalog.partitionSpec().partitionColumns
val inferred = format.inferSchema(
sparkSession,
@@ -364,7 +364,7 @@ case class DataSource(
case (format: FileFormat, _)
if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
- val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
+ val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
@@ -417,12 +417,12 @@ case class DataSource(
val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
- new TableFileCatalog(
+ new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
} else {
- new ListingFileCatalog(
+ new InMemoryFileIndex(
sparkSession, globbedPaths, options, partitionSchema)
}
@@ -433,7 +433,7 @@ case class DataSource(
format.inferSchema(
sparkSession,
caseInsensitiveOptions,
- fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
+ fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index dba64624c3..277223d52e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -33,7 +33,7 @@ case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
* An interface for objects capable of enumerating the root paths of a relation as well as the
* partitions of a relation subject to some pruning expressions.
*/
-trait FileCatalog {
+trait FileIndex {
/**
* Returns the list of root input paths from which the catalog will get files. There may be a
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index afad889808..014abd454f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* Acts as a container for all of the metadata required to read from a datasource. All discovery,
* resolution and merging logic for schemas and partitions has been removed.
*
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[FileIndex]] that can enumerate the locations of all the files that
* comprise this relation.
* @param partitionSchema The schema of the columns (if any) that are used to partition the relation
* @param dataSchema The schema of any remaining columns. Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* @param options Configuration used when reading / writing data.
*/
case class HadoopFsRelation(
- location: FileCatalog,
+ location: FileIndex,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index d9d588388a..7531f0ae02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.types.StructType
/**
- * A [[FileCatalog]] that generates the list of files to process by recursively listing all the
+ * A [[FileIndex]] that generates the list of files to process by recursively listing all the
* files present in `paths`.
*
* @param rootPaths the list of root table paths to scan
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.StructType
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
*/
-class ListingFileCatalog(
+class InMemoryFileIndex(
sparkSession: SparkSession,
override val rootPaths: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
- extends PartitioningAwareFileCatalog(
+ extends PartitioningAwareFileIndex(
sparkSession, parameters, partitionSchema, fileStatusCache) {
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -79,7 +79,7 @@ class ListingFileCatalog(
}
override def equals(other: Any): Boolean = other match {
- case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
+ case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index cc4049e925..a8a722dd3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -34,19 +34,19 @@ import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
/**
- * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
+ * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
*/
-abstract class PartitioningAwareFileCatalog(
+abstract class PartitioningAwareFileIndex(
sparkSession: SparkSession,
parameters: Map[String, String],
userPartitionSchema: Option[StructType],
- fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
- import PartitioningAwareFileCatalog.BASE_PATH_PARAM
+ fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
+ import PartitioningAwareFileIndex.BASE_PATH_PARAM
/** Returns the specification of the partitions inferred from the data. */
def partitionSpec(): PartitionSpec
@@ -253,9 +253,9 @@ abstract class PartitioningAwareFileCatalog(
}
val discovered = if (pathsToFetch.length >=
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
+ PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
} else {
- PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
+ PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
}
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
@@ -266,7 +266,7 @@ abstract class PartitioningAwareFileCatalog(
}
}
-object PartitioningAwareFileCatalog extends Logging {
+object PartitioningAwareFileIndex extends Logging {
val BASE_PATH_PARAM = "basePath"
/** A serializable variant of HDFS's BlockLocation. */
@@ -383,7 +383,7 @@ object PartitioningAwareFileCatalog extends Logging {
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
} else {
- // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+ // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses = try fs.listStatus(path) catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8689017c3e..8566a80610 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
logicalRelation @
LogicalRelation(fsRelation @
HadoopFsRelation(
- tableFileCatalog: TableFileCatalog,
+ catalogFileIndex: CatalogFileIndex,
partitionSchema,
_,
_,
@@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
if (partitionKeyFilters.nonEmpty) {
- val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+ val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
val prunedFsRelation =
- fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+ fsRelation.copy(location = prunedFileIndex)(sparkSession)
val prunedLogicalRelation = logicalRelation.copy(
relation = prunedFsRelation,
expectedOutputAttributes = Some(logicalRelation.output))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index c14feea91e..b26edeeb04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
*/
def allFiles(): Array[T] = {
var latestId = getLatest().map(_._1).getOrElse(-1L)
- // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+ // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
// is calling this method. This loop will retry the reading to deal with the
// race condition.
while (true) {
@@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
} catch {
case e: IOException =>
// Another process using `CompactibleFileStreamLog` may delete the batch files when
- // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+ // `StreamFileIndex` are reading. However, it only happens when a compaction is
// deleting old files. If so, let's try the next compaction batch and we should find it.
// Otherwise, this is a real IO issue and we should throw it.
latestId = nextCompactionBatchId(latestId, compactInterval)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a392b82999..680df01acc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType
/**
@@ -156,7 +156,7 @@ class FileStreamSource(
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
- val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+ val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 82b67cb1ca..aeaa134736 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._
/**
- * A [[FileCatalog]] that generates the list of files to processing by reading them from the
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*/
-class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
- extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+ extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 9c43169cbf..56df1face6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
-class FileCatalogSuite extends SharedSQLContext {
+class FileIndexSuite extends SharedSQLContext {
- test("ListingFileCatalog: leaf files are qualified paths") {
+ test("InMemoryFileIndex: leaf files are qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")
val path = new Path(file.getCanonicalPath)
- val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) {
+ val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
}
@@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext {
}
}
- test("ListingFileCatalog: input paths are converted to qualified paths") {
+ test("InMemoryFileIndex: input paths are converted to qualified paths") {
withTempDir { dir =>
val file = new File(dir, "text.txt")
stringToFile(file, "text")
@@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext {
val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
require(qualifiedFilePath.toString.startsWith("file:"))
- val catalog1 = new ListingFileCatalog(
+ val catalog1 = new InMemoryFileIndex(
spark, Seq(unqualifiedDirPath), Map.empty, None)
assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
- val catalog2 = new ListingFileCatalog(
+ val catalog2 = new InMemoryFileIndex(
spark, Seq(unqualifiedFilePath), Map.empty, None)
assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
}
}
- test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+ test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
withTempDir { dir =>
val deletedFolder = new File(dir, "deleted")
assert(!deletedFolder.exists())
- val catalog1 = new ListingFileCatalog(
+ val catalog1 = new InMemoryFileIndex(
spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
// doesn't throw an exception
assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
}
}
- test("PartitioningAwareFileCatalog - file filtering") {
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
- assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
- assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+ test("PartitioningAwareFileIndex - file filtering") {
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
+ assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
+ assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
}
- test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
+ test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
- extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+ extends PartitioningAwareFileIndex(spark, Map.empty, None) {
override def refresh(): Unit = {}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c32254d9df..d900ce7bb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
util.stringToFile(file, fileName)
}
- val fileCatalog = new ListingFileCatalog(
+ val fileCatalog = new InMemoryFileIndex(
sparkSession = spark,
rootPaths = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f2a209e919..120a3a2ef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
case LogicalRelation(
- HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
+ HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) =>
assert(location.partitionSpec() === PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 19c89f5c41..18b42a81a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog}
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest {
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)
- // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has
+ // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
baseRelation.asInstanceOf[HadoopFsRelation]
}
assert(hadoopdFsRelations.size === 1)
- assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog])
+ assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b9e9da9a1e..47018b3a3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val numFiles = 10000
// This is to avoid running a spark job to list of files in parallel
- // by the ListingFileCatalog.
+ // by the InMemoryFileIndex.
spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
withTempDirs { case (root, tmp) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1de863ce3..624ab747e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -200,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
Seq(metastoreRelation.hiveQlTable.getDataLocation)
} else {
- // By convention (for example, see TableFileCatalog), the definition of a
+ // By convention (for example, see CatalogFileIndex), the definition of a
// partitioned table's paths depends on whether that table has any actual partitions.
// Partitioned tables without partitions use the location of the table's base path.
// Partitioned tables with partitions use the locations of those partitions' data
@@ -227,7 +227,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val logicalRelation = cached.getOrElse {
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
- val catalog = new TableFileCatalog(
+ val catalog = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index ecdf4f14b3..fc35304c80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -321,17 +321,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("DROP TABLE cachedTable")
}
- test("cache a table using TableFileCatalog") {
+ test("cache a table using CatalogFileIndex") {
withTable("test") {
sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
- location = tableFileCatalog,
+ location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
@@ -343,7 +343,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
- val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
val sameRelation = HadoopFsRelation(
location = sameCatalog,
partitionSchema = tableMeta.partitionSchema,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 476383a5b3..d8e31c4e39 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -256,7 +256,7 @@ class PartitionedTablePerfStatsSuite
// of doing plan cache validation based on the entire partition set.
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
- // 5 from table resolution, another 5 from ListingFileCatalog
+ // 5 from table resolution, another 5 from InMemoryFileIndex
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 59639aacf3..cdbc26cd5c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -45,13 +45,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
- location = tableFileCatalog,
+ location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,