aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpetermaxlee <petermaxlee@gmail.com>2016-06-30 16:49:59 -0700
committerReynold Xin <rxin@databricks.com>2016-06-30 16:49:59 -0700
commitfb41670c9263a89ec233861cc91a19cf1bb19073 (patch)
tree48bef5a493fccc003c70aa7df323323c727b3fc1
parent5d00a7bc19ddeb1b5247733b55095a03ee7b1a30 (diff)
downloadspark-fb41670c9263a89ec233861cc91a19cf1bb19073.tar.gz
spark-fb41670c9263a89ec233861cc91a19cf1bb19073.tar.bz2
spark-fb41670c9263a89ec233861cc91a19cf1bb19073.zip
[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException
## What changes were proposed in this pull request? This patch appends a message to suggest users running refresh table or reloading data frames when Spark sees a FileNotFoundException due to stale, cached metadata. ## How was this patch tested? Added a unit test for this in MetadataCacheSuite. Author: petermaxlee <petermaxlee@gmail.com> Closes #14003 from petermaxlee/SPARK-16336.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala88
2 files changed, 102 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 1443057d5c..c66da3a831 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -117,7 +117,20 @@ class FileScanRDD(
currentFile = files.next()
logInfo(s"Reading File $currentFile")
InputFileNameHolder.setInputFileName(currentFile.filePath)
- currentIterator = readFunction(currentFile)
+
+ try {
+ currentIterator = readFunction(currentFile)
+ } catch {
+ case e: java.io.FileNotFoundException =>
+ throw new java.io.FileNotFoundException(
+ e.getMessage + "\n" +
+ "It is possible the underlying files have been updated. " +
+ "You can explicitly invalidate the cache in Spark by " +
+ "running 'REFRESH TABLE tableName' command in SQL or " +
+ "by recreating the Dataset/DataFrame involved."
+ )
+ }
+
hasNext
} else {
currentFile = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
new file mode 100644
index 0000000000..d872f4baa6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+
+ /** Removes one data file in the given directory. */
+ private def deleteOneFileInDirectory(dir: File): Unit = {
+ assert(dir.isDirectory)
+ val oneFile = dir.listFiles().find { file =>
+ !file.getName.startsWith("_") && !file.getName.startsWith(".")
+ }
+ assert(oneFile.isDefined)
+ oneFile.foreach(_.delete())
+ }
+
+ test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") {
+ withTempPath { (location: File) =>
+ // Create a Parquet directory
+ spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+ .write.parquet(location.getAbsolutePath)
+
+ // Read the directory in
+ val df = spark.read.parquet(location.getAbsolutePath)
+ assert(df.count() == 100)
+
+ // Delete a file
+ deleteOneFileInDirectory(location)
+
+ // Read it again and now we should see a FileNotFoundException
+ val e = intercept[SparkException] {
+ df.count()
+ }
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("REFRESH"))
+ }
+ }
+
+ ignore("SPARK-16337 temporary view refresh") {
+ withTempPath { (location: File) =>
+ // Create a Parquet directory
+ spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+ .write.parquet(location.getAbsolutePath)
+
+ // Read the directory in
+ spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+ assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)
+
+ // Delete a file
+ deleteOneFileInDirectory(location)
+
+ // Read it again and now we should see a FileNotFoundException
+ val e = intercept[SparkException] {
+ sql("select count(*) from view_refresh").first()
+ }
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("refresh()"))
+
+ // Refresh and we should be able to read it again.
+ spark.catalog.refreshTable("view_refresh")
+ val newCount = sql("select count(*) from view_refresh").first().getLong(0)
+ assert(newCount > 0 && newCount < 100)
+ }
+ }
+}