diff options
21 files changed, 79 insertions, 79 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index b54885b7ff..3f7cfd9d2c 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source { val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched")) /** - * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog. + * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex. */ val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index fb72c679e3..526623a36d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -177,7 +177,7 @@ class CacheManager extends Logging { /** * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the - * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes + * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns * false. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index b459df5734..092aabc89a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -26,23 +26,23 @@ import org.apache.spark.sql.types.StructType /** - * A [[FileCatalog]] for a metastore catalog table. + * A [[FileIndex]] for a metastore catalog table. * * @param sparkSession a [[SparkSession]] * @param table the metadata of the table * @param sizeInBytes the table's data size in bytes */ -class TableFileCatalog( +class CatalogFileIndex( sparkSession: SparkSession, val table: CatalogTable, - override val sizeInBytes: Long) extends FileCatalog { + override val sizeInBytes: Long) extends FileIndex { protected val hadoopConf = sparkSession.sessionState.newHadoopConf private val fileStatusCache = FileStatusCache.newCache(sparkSession) assert(table.identifier.database.isDefined, - "The table identifier must be qualified in TableFileCatalog") + "The table identifier must be qualified in CatalogFileIndex") private val baseLocation = table.storage.locationUri @@ -57,12 +57,12 @@ class TableFileCatalog( override def refresh(): Unit = fileStatusCache.invalidateAll() /** - * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions + * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions * specified by the given partition-pruning filters. * * @param filters partition-pruning filters */ - def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { + def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { if (table.partitionColumnNames.nonEmpty) { val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) @@ -70,20 +70,20 @@ class TableFileCatalog( PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get) } val partitionSpec = PartitionSpec(partitionSchema, partitions) - new PrunedTableFileCatalog( + new PrunedInMemoryFileIndex( sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) } else { - new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None) + new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None) } } override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles - // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member + // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to // implement `equals` and `hashCode` here, to make it work with cache lookup. override def equals(o: Any): Boolean = o match { - case other: TableFileCatalog => this.table.identifier == other.table.identifier + case other: CatalogFileIndex => this.table.identifier == other.table.identifier case _ => false } @@ -97,12 +97,12 @@ class TableFileCatalog( * @param tableBasePath The default base path of the Hive metastore table * @param partitionSpec The partition specifications from Hive metastore */ -private class PrunedTableFileCatalog( +private class PrunedInMemoryFileIndex( sparkSession: SparkSession, tableBasePath: Path, fileStatusCache: FileStatusCache, override val partitionSpec: PartitionSpec) - extends ListingFileCatalog( + extends InMemoryFileIndex( sparkSession, partitionSpec.partitions.map(_.path), Map.empty, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5b8f05a396..996109865f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -202,7 +202,7 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None) + val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None) val partitionSchema = fileCatalog.partitionSpec().partitionColumns val inferred = format.inferSchema( sparkSession, @@ -364,7 +364,7 @@ case class DataSource( case (format: FileFormat, _) if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath) + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -417,12 +417,12 @@ case class DataSource( val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) { - new TableFileCatalog( + new CatalogFileIndex( sparkSession, catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) } else { - new ListingFileCatalog( + new InMemoryFileIndex( sparkSession, globbedPaths, options, partitionSchema) } @@ -433,7 +433,7 @@ case class DataSource( format.inferSchema( sparkSession, caseInsensitiveOptions, - fileCatalog.asInstanceOf[ListingFileCatalog].allFiles()) + fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles()) }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index dba64624c3..277223d52e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -33,7 +33,7 @@ case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) * An interface for objects capable of enumerating the root paths of a relation as well as the * partitions of a relation subject to some pruning expressions. */ -trait FileCatalog { +trait FileIndex { /** * Returns the list of root input paths from which the catalog will get files. There may be a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index afad889808..014abd454f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * Acts as a container for all of the metadata required to read from a datasource. All discovery, * resolution and merging logic for schemas and partitions has been removed. * - * @param location A [[FileCatalog]] that can enumerate the locations of all the files that + * @param location A [[FileIndex]] that can enumerate the locations of all the files that * comprise this relation. * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - location: FileCatalog, + location: FileIndex, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index d9d588388a..7531f0ae02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.StructType /** - * A [[FileCatalog]] that generates the list of files to process by recursively listing all the + * A [[FileIndex]] that generates the list of files to process by recursively listing all the * files present in `paths`. * * @param rootPaths the list of root table paths to scan @@ -34,13 +34,13 @@ import org.apache.spark.sql.types.StructType * @param partitionSchema an optional partition schema that will be use to provide types for the * discovered partitions */ -class ListingFileCatalog( +class InMemoryFileIndex( sparkSession: SparkSession, override val rootPaths: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) - extends PartitioningAwareFileCatalog( + extends PartitioningAwareFileIndex( sparkSession, parameters, partitionSchema, fileStatusCache) { @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ @@ -79,7 +79,7 @@ class ListingFileCatalog( } override def equals(other: Any): Boolean = other match { - case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet + case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index cc4049e925..a8a722dd3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -34,19 +34,19 @@ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration /** - * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. + * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables. * It provides the necessary methods to parse partition data based on a set of files. * * @param parameters as set of options to control partition discovery * @param userPartitionSchema an optional partition schema that will be use to provide types for * the discovered partitions */ -abstract class PartitioningAwareFileCatalog( +abstract class PartitioningAwareFileIndex( sparkSession: SparkSession, parameters: Map[String, String], userPartitionSchema: Option[StructType], - fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging { - import PartitioningAwareFileCatalog.BASE_PATH_PARAM + fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { + import PartitioningAwareFileIndex.BASE_PATH_PARAM /** Returns the specification of the partitions inferred from the data. */ def partitionSpec(): PartitionSpec @@ -253,9 +253,9 @@ abstract class PartitioningAwareFileCatalog( } val discovered = if (pathsToFetch.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession) + PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession) } else { - PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf) + PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf) } discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) @@ -266,7 +266,7 @@ abstract class PartitioningAwareFileCatalog( } } -object PartitioningAwareFileCatalog extends Logging { +object PartitioningAwareFileIndex extends Logging { val BASE_PATH_PARAM = "basePath" /** A serializable variant of HDFS's BlockLocation. */ @@ -383,7 +383,7 @@ object PartitioningAwareFileCatalog extends Logging { if (shouldFilterOut(name)) { Seq.empty[FileStatus] } else { - // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, // and does not include anything else recursively. val statuses = try fs.listStatus(path) catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 8689017c3e..8566a80610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { logicalRelation @ LogicalRelation(fsRelation @ HadoopFsRelation( - tableFileCatalog: TableFileCatalog, + catalogFileIndex: CatalogFileIndex, partitionSchema, _, _, @@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet))) if (partitionKeyFilters.nonEmpty) { - val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq) + val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) val prunedFsRelation = - fsRelation.copy(location = prunedFileCatalog)(sparkSession) + fsRelation.copy(location = prunedFileIndex)(sparkSession) val prunedLogicalRelation = logicalRelation.copy( relation = prunedFsRelation, expectedOutputAttributes = Some(logicalRelation.output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index c14feea91e..b26edeeb04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( */ def allFiles(): Array[T] = { var latestId = getLatest().map(_._1).getOrElse(-1L) - // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog` + // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex` // is calling this method. This loop will retry the reading to deal with the // race condition. while (true) { @@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( } catch { case e: IOException => // Another process using `CompactibleFileStreamLog` may delete the batch files when - // `StreamFileCatalog` are reading. However, it only happens when a compaction is + // `StreamFileIndex` are reading. However, it only happens when a compaction is // deleting old files. If so, let's try the next compaction batch and we should find it. // Otherwise, this is a real IO issue and we should throw it. latestId = nextCompactionBatchId(latestId, compactInterval) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a392b82999..680df01acc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.types.StructType /** @@ -156,7 +156,7 @@ class FileStreamSource( private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) - val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) + val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => (status.getPath.toUri.toString, status.getModificationTime) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala index 82b67cb1ca..aeaa134736 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._ /** - * A [[FileCatalog]] that generates the list of files to processing by reading them from the + * A [[FileIndex]] that generates the list of files to processing by reading them from the * metadata log files generated by the [[FileStreamSink]]. */ -class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path) - extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) { +class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 9c43169cbf..56df1face6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext -class FileCatalogSuite extends SharedSQLContext { +class FileIndexSuite extends SharedSQLContext { - test("ListingFileCatalog: leaf files are qualified paths") { + test("InMemoryFileIndex: leaf files are qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") stringToFile(file, "text") val path = new Path(file.getCanonicalPath) - val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) { + val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } @@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext { } } - test("ListingFileCatalog: input paths are converted to qualified paths") { + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") stringToFile(file, "text") @@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext { val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath)) require(qualifiedFilePath.toString.startsWith("file:")) - val catalog1 = new ListingFileCatalog( + val catalog1 = new InMemoryFileIndex( spark, Seq(unqualifiedDirPath), Map.empty, None) assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) - val catalog2 = new ListingFileCatalog( + val catalog2 = new InMemoryFileIndex( spark, Seq(unqualifiedFilePath), Map.empty, None) assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) } } - test("ListingFileCatalog: folders that don't exist don't throw exceptions") { + test("InMemoryFileIndex: folders that don't exist don't throw exceptions") { withTempDir { dir => val deletedFolder = new File(dir, "deleted") assert(!deletedFolder.exists()) - val catalog1 = new ListingFileCatalog( + val catalog1 = new InMemoryFileIndex( spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None) // doesn't throw an exception assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty) } } - test("PartitioningAwareFileCatalog - file filtering") { - assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd")) - assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata")) - assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata")) - assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata")) + test("PartitioningAwareFileIndex - file filtering") { + assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd")) + assert(PartitioningAwareFileIndex.shouldFilterOut(".ab")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_cd")) + assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata")) + assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) + assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) } - test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { + test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) - extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + extends PartitioningAwareFileIndex(spark, Map.empty, None) { override def refresh(): Unit = {} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c32254d9df..d900ce7bb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi util.stringToFile(file, fileName) } - val fileCatalog = new ListingFileCatalog( + val fileCatalog = new InMemoryFileIndex( sparkSession = spark, rootPaths = Seq(new Path(tempDir)), parameters = Map.empty[String, String], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f2a209e919..120a3a2ef3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) => + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) => assert(location.partitionSpec() === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 19c89f5c41..18b42a81a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog} +import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest { .add(StructField("id", IntegerType)) assert(outputDf.schema === expectedSchema) - // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has + // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // been inferred val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] => baseRelation.asInstanceOf[HadoopFsRelation] } assert(hadoopdFsRelations.size === 1) - assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog]) + assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id")) assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b9e9da9a1e..47018b3a3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val numFiles = 10000 // This is to avoid running a spark job to list of files in parallel - // by the ListingFileCatalog. + // by the InMemoryFileIndex. spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) withTempDirs { case (root, tmp) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d1de863ce3..624ab747e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -200,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val rootPaths: Seq[Path] = if (lazyPruningEnabled) { Seq(metastoreRelation.hiveQlTable.getDataLocation) } else { - // By convention (for example, see TableFileCatalog), the definition of a + // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. // Partitioned tables without partitions use the location of the table's base path. // Partitioned tables with partitions use the locations of those partitions' data @@ -227,7 +227,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong val fileCatalog = { - val catalog = new TableFileCatalog( + val catalog = new CatalogFileIndex( sparkSession, metastoreRelation.catalogTable, sizeInBytes) if (lazyPruningEnabled) { catalog diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index ecdf4f14b3..fc35304c80 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -321,17 +321,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("DROP TABLE cachedTable") } - test("cache a table using TableFileCatalog") { + test("cache a table using CatalogFileIndex") { withTable("test") { sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet") val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name) }) val relation = HadoopFsRelation( - location = tableFileCatalog, + location = catalogFileIndex, partitionSchema = tableMeta.partitionSchema, dataSchema = dataSchema, bucketSpec = None, @@ -343,7 +343,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) - val sameCatalog = new TableFileCatalog(spark, tableMeta, 0) + val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0) val sameRelation = HadoopFsRelation( location = sameCatalog, partitionSchema = tableMeta.partitionSchema, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 476383a5b3..d8e31c4e39 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -256,7 +256,7 @@ class PartitionedTablePerfStatsSuite // of doing plan cache validation based on the entire partition set. HiveCatalogMetrics.reset() assert(spark.sql("select * from test where partCol1 = 999").count() == 0) - // 5 from table resolution, another 5 from ListingFileCatalog + // 5 from table resolution, another 5 from InMemoryFileIndex assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 59639aacf3..cdbc26cd5c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -45,13 +45,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te |LOCATION '${dir.getAbsolutePath}'""".stripMargin) val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name) }) val relation = HadoopFsRelation( - location = tableFileCatalog, + location = catalogFileIndex, partitionSchema = tableMeta.partitionSchema, dataSchema = dataSchema, bucketSpec = None, |