aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-02-28 00:16:49 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-28 00:16:49 -0800
commita350bc16d36c58b48ac01f0258678ffcdb77e793 (patch)
tree5d294d922463cc1e37ef7db78d103913fda25a19 /sql/core/src/test
parent73530383538ad72fdc3dd4c670485192f12ebc4e (diff)
downloadspark-a350bc16d36c58b48ac01f0258678ffcdb77e793.tar.gz
spark-a350bc16d36c58b48ac01f0258678ffcdb77e793.tar.bz2
spark-a350bc16d36c58b48ac01f0258678ffcdb77e793.zip
[SPARK-19748][SQL] refresh function has a wrong order to do cache invalidate and regenerate the inmemory var for InMemoryFileIndex with FileStatusCache
## What changes were proposed in this pull request? If we refresh a InMemoryFileIndex with a FileStatusCache, it will first use the FileStatusCache to re-generate the cachedLeafFiles etc, then call FileStatusCache.invalidateAll. While the order to do these two actions is wrong, this lead to the refresh action does not take effect. ``` override def refresh(): Unit = { refresh0() fileStatusCache.invalidateAll() } private def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null } ``` ## How was this patch tested? unit test added Author: windpiger <songjun@outlook.com> Closes #17079 from windpiger/fixInMemoryFileIndexRefresh.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala26
1 files changed, 26 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 2b4c9f3ed3..efbfc2417d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -178,6 +178,32 @@ class FileIndexSuite extends SharedSQLContext {
assert(catalog2.allFiles().nonEmpty)
}
}
+
+ test("refresh for InMemoryFileIndex with FileStatusCache") {
+ withTempDir { dir =>
+ val fileStatusCache = FileStatusCache.getOrCreate(spark)
+ val dirPath = new Path(dir.getAbsolutePath)
+ val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
+ val catalog =
+ new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) {
+ def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+ def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+ }
+
+ val file = new File(dir, "text.txt")
+ stringToFile(file, "text")
+ assert(catalog.leafDirPaths.isEmpty)
+ assert(catalog.leafFilePaths.isEmpty)
+
+ catalog.refresh()
+
+ assert(catalog.leafFilePaths.size == 1)
+ assert(catalog.leafFilePaths.head == fs.makeQualified(new Path(file.getAbsolutePath)))
+
+ assert(catalog.leafDirPaths.size == 1)
+ assert(catalog.leafDirPaths.head == fs.makeQualified(dirPath))
+ }
+ }
}
class FakeParentPathFileSystem extends RawLocalFileSystem {