aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java7
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala4
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java8
15 files changed, 80 insertions, 27 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 4989b05d63..501dfe77d1 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -24,12 +24,15 @@ import com.google.common.io.ByteStreams;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
* of the file format).
*/
final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
+ private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
private final File file;
private InputStream in;
@@ -73,7 +76,9 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
numRecordsRemaining--;
if (numRecordsRemaining == 0) {
in.close();
- file.delete();
+ if (!file.delete() && file.exists()) {
+ logger.warn("Unable to delete spill file {}", file.getPath());
+ }
in = null;
din = null;
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 3788d18297..19be093903 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -871,7 +871,8 @@ private class PythonAccumulatorParam(@transient private val serverHost: String,
* write the data into disk after deserialization, then Python can read it from disks.
*/
// scalastyle:off no.finalize
-private[spark] class PythonBroadcast(@transient var path: String) extends Serializable {
+private[spark] class PythonBroadcast(@transient var path: String) extends Serializable
+ with Logging {
/**
* Read data from disks, then copy it to `out`
@@ -907,7 +908,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
if (!path.isEmpty) {
val file = new File(path)
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file.getPath}")
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index 4b28866dca..7d160b6790 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -175,8 +175,10 @@ private[deploy] object RPackageUtils extends Logging {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
- } finally {
- rSource.delete() // clean up
+ } finally { // clean up
+ if (!rSource.delete()) {
+ logWarning(s"Error deleting ${rSource.getPath()}")
+ }
}
} else {
if (verbose) {
@@ -211,7 +213,9 @@ private[deploy] object RPackageUtils extends Logging {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
val zipFile = new File(dir, name)
- zipFile.delete()
+ if (!zipFile.delete()) {
+ logWarning(s"Error deleting ${zipFile.getPath()}")
+ }
val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
try {
filesToBundle.foreach { file =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 8eb2ba1e86..5eb8adf97d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -242,7 +242,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
- fs.delete(path)
+ if (!fs.delete(path)) {
+ logWarning(s"Error deleting ${path}")
+ }
}
}
@@ -405,7 +407,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
try {
val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
- fs.delete(path, true)
+ if (!fs.delete(path, true)) {
+ logWarning(s"Error deleting ${path}")
+ }
}
} catch {
case e: AccessControlException =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index aa379d4cd6..1aa8cd5013 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -45,7 +45,10 @@ private[master] class FileSystemPersistenceEngine(
}
override def unpersist(name: String): Unit = {
- new File(dir + File.separator + name).delete()
+ val f = new File(dir + File.separator + name)
+ if (!f.delete()) {
+ logWarning(s"Error deleting ${f.getPath()}")
+ }
}
override def read[T: ClassTag](prefix: String): Seq[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 1c3b5da19c..a69be6a068 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -144,7 +144,9 @@ private[spark] object ReliableCheckpointRDD extends Logging {
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
- fs.delete(tempOutputPath, false)
+ if (!fs.delete(tempOutputPath, false)) {
+ logWarning(s"Error deleting ${tempOutputPath}")
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index e9f6060301..91cad6662e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -89,7 +89,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v
}
-private[spark] object ReliableRDDCheckpointData {
+private[spark] object ReliableRDDCheckpointData extends Logging {
/** Return the path of the directory to which this RDD's checkpoint data is written. */
def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = {
@@ -101,7 +101,9 @@ private[spark] object ReliableRDDCheckpointData {
checkpointPath(sc, rddId).foreach { path =>
val fs = path.getFileSystem(sc.hadoopConfiguration)
if (fs.exists(path)) {
- fs.delete(path, true)
+ if (!fs.delete(path, true)) {
+ logWarning(s"Error deleting ${path.toString()}")
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 5a06ef02f5..000a021a52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -109,7 +109,9 @@ private[spark] class EventLoggingListener(
if (shouldOverwrite && fileSystem.exists(path)) {
logWarning(s"Event log $path already exists. Overwriting...")
- fileSystem.delete(path, true)
+ if (!fileSystem.delete(path, true)) {
+ logWarning(s"Error deleting $path")
+ }
}
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -216,7 +218,9 @@ private[spark] class EventLoggingListener(
if (fileSystem.exists(target)) {
if (shouldOverwrite) {
logWarning(s"Event log $target already exists. Overwriting...")
- fileSystem.delete(target, true)
+ if (!fileSystem.delete(target, true)) {
+ logWarning(s"Error deleting $target")
+ }
} else {
throw new IOException("Target log file already exists (%s)".format(logPath))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0324c9dab9..641638a77d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -65,7 +65,9 @@ private[spark] class SimrSchedulerBackend(
override def stop() {
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
val fs = FileSystem.get(conf)
- fs.delete(new Path(driverFilePath), false)
+ if (!fs.delete(new Path(driverFilePath), false)) {
+ logWarning(s"error deleting ${driverFilePath}")
+ }
super.stop()
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index d9902f96df..cd253a78c2 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -127,7 +127,10 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
case Some(state) =>
for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
- blockManager.diskBlockManager.getFile(blockId).delete()
+ val file = blockManager.diskBlockManager.getFile(blockId)
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file.getPath()}")
+ }
}
logInfo("Deleted all files for shuffle " + shuffleId)
true
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index d0163d326d..65887d119d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -21,7 +21,7 @@ import java.io._
import com.google.common.io.ByteStreams
-import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv, Logging}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
@@ -40,7 +40,8 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
-private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {
+private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver
+ with Logging {
private lazy val blockManager = SparkEnv.get.blockManager
@@ -60,12 +61,16 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
var file = getDataFile(shuffleId, mapId)
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting data ${file.getPath()}")
+ }
}
file = getIndexFile(shuffleId, mapId)
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting index ${file.getPath()}")
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index feb9533604..c008b9dc16 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -86,7 +86,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
} catch {
case e: Throwable =>
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file}")
+ }
}
throw e
}
@@ -155,7 +157,11 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
override def remove(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
- file.delete()
+ val ret = file.delete()
+ if (!ret) {
+ logWarning(s"Error deleting ${file.getPath()}")
+ }
+ ret
} else {
false
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index f929b12606..29c5732f5a 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -208,7 +208,9 @@ class ExternalAppendOnlyMap[K, V, C](
writer.revertPartialWritesAndClose()
}
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file}")
+ }
}
}
}
@@ -489,7 +491,9 @@ class ExternalAppendOnlyMap[K, V, C](
fileStream = null
}
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file}")
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 2a30f751ff..749be34d8e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -318,7 +318,9 @@ private[spark] class ExternalSorter[K, V, C](
writer.revertPartialWritesAndClose()
}
if (file.exists()) {
- file.delete()
+ if (!file.delete()) {
+ logWarning(s"Error deleting ${file}")
+ }
}
}
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index c5f93bb47f..0d4dd6afac 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -114,10 +114,14 @@ public class ExternalShuffleBlockResolver {
"recover state for existing applications", registeredExecutorFile, e);
if (registeredExecutorFile.isDirectory()) {
for (File f : registeredExecutorFile.listFiles()) {
- f.delete();
+ if (!f.delete()) {
+ logger.warn("error deleting {}", f.getPath());
+ }
}
}
- registeredExecutorFile.delete();
+ if (!registeredExecutorFile.delete()) {
+ logger.warn("error deleting {}", registeredExecutorFile.getPath());
+ }
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(registeredExecutorFile, options);