aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala8
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala8
5 files changed, 31 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index ec60991af6..5aec692c98 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ml.clustering
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
@@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
@DeveloperApi
@Since("2.0.0")
def deleteCheckpointFiles(): Unit = {
- val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
- _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+ val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
+ _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, hadoopConf))
_checkpointFiles = Array.empty[String]
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
index 800430f96c..a7c5f489de 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
@@ -21,7 +21,7 @@ import java.io.IOException
import scala.collection.mutable
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.ml.tree.{LearningNode, Split}
@@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
// Indicates whether we can checkpoint
private val canCheckpoint = nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty
- // FileSystem instance for deleting checkpoints as needed
- private val fs = FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
+ // Hadoop Configuration for deleting checkpoints as needed
+ private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration
/**
* Update the node index values in the cache.
@@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we'll manually delete it here.
try {
- fs.delete(new Path(old.getCheckpointFile.get), true)
+ val path = new Path(old.getCheckpointFile.get)
+ val fs = path.getFileSystem(hadoopConf)
+ fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
@@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
if (old.getCheckpointFile.isDefined) {
try {
- fs.delete(new Path(old.getCheckpointFile.get), true)
+ val path = new Path(old.getCheckpointFile.get)
+ val fs = path.getFileSystem(hadoopConf)
+ fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
index 5c12c9305b..4dd498cd91 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
@@ -19,7 +19,8 @@ package org.apache.spark.mllib.impl
import scala.collection.mutable
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
@@ -160,21 +161,23 @@ private[mllib] abstract class PeriodicCheckpointer[T](
private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
- val fs = FileSystem.get(sc.hadoopConfiguration)
- getCheckpointFiles(old).foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+ getCheckpointFiles(old).foreach(
+ PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration))
}
}
private[spark] object PeriodicCheckpointer extends Logging {
/** Delete a checkpoint file, and log a warning if deletion fails. */
- def removeCheckpointFile(path: String, fs: FileSystem): Unit = {
+ def removeCheckpointFile(checkpointFile: String, conf: Configuration): Unit = {
try {
- fs.delete(new Path(path), true)
+ val path = new Path(checkpointFile)
+ val fs = path.getFileSystem(conf)
+ fs.delete(path, true)
} catch {
case e: Exception =>
logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
- path)
+ checkpointFile)
}
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
index e331c75989..a13e7f63a9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.mllib.impl
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.graphx.{Edge, Graph}
@@ -140,9 +140,11 @@ private object PeriodicGraphCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this graph.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
- val fs = FileSystem.get(graph.vertices.sparkContext.hadoopConfiguration)
+ val hadoopConf = graph.vertices.sparkContext.hadoopConfiguration
graph.getCheckpointFiles.foreach { checkpointFile =>
- assert(!fs.exists(new Path(checkpointFile)),
+ val path = new Path(checkpointFile)
+ val fs = path.getFileSystem(hadoopConf)
+ assert(!fs.exists(path),
"Graph checkpoint file should have been removed")
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
index b2a459a68b..14adf8c29f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.mllib.impl
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.mllib.util.MLlibTestSparkContext
@@ -127,9 +127,11 @@ private object PeriodicRDDCheckpointerSuite {
// Instead, we check for the presence of the checkpoint files.
// This test should continue to work even after this rdd.isCheckpointed issue
// is fixed (though it can then be simplified and not look for the files).
- val fs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
+ val hadoopConf = rdd.sparkContext.hadoopConfiguration
rdd.getCheckpointFile.foreach { checkpointFile =>
- assert(!fs.exists(new Path(checkpointFile)), "RDD checkpoint file should have been removed")
+ val path = new Path(checkpointFile)
+ val fs = path.getFileSystem(hadoopConf)
+ assert(!fs.exists(path), "RDD checkpoint file should have been removed")
}
}