aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main/scala
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-06-01 08:30:38 -0500
committerSean Owen <sowen@cloudera.com>2016-06-01 08:30:38 -0500
commit6563d72b168c39115376e73788b48a2d60803d4e (patch)
treeaf6ffec045d492385f83036598f73f748bce7728 /mllib/src/main/scala
parente4ce1bc4f3ca088365ff199e563f23a552dc88ef (diff)
downloadspark-6563d72b168c39115376e73788b48a2d60803d4e.tar.gz
spark-6563d72b168c39115376e73788b48a2d60803d4e.tar.bz2
spark-6563d72b168c39115376e73788b48a2d60803d4e.zip
[SPARK-15664][MLLIB] Replace FileSystem.get(conf) with path.getFileSystem(conf) when removing CheckpointFile in MLlib
## What changes were proposed in this pull request? if sparkContext.set CheckpointDir to another Dir that is not default FileSystem, it will throw exception when removing CheckpointFile in MLlib. So we should always get the FileSystem from Path to avoid wrong FS problem. ## How was this patch tested? N/A Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #13408 from lianhuiwang/SPARK-15664.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala15
3 files changed, 21 insertions, 14 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index ec60991af6..5aec692c98 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ml.clustering
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
@@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
@DeveloperApi
@Since("2.0.0")
def deleteCheckpointFiles(): Unit = {
- val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
- _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+ val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
+ _checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, hadoopConf))
_checkpointFiles = Array.empty[String]
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
index 800430f96c..a7c5f489de 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/NodeIdCache.scala
@@ -21,7 +21,7 @@ import java.io.IOException
import scala.collection.mutable
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.ml.tree.{LearningNode, Split}
@@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
// Indicates whether we can checkpoint
private val canCheckpoint = nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty
- // FileSystem instance for deleting checkpoints as needed
- private val fs = FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
+ // Hadoop Configuration for deleting checkpoints as needed
+ private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration
/**
* Update the node index values in the cache.
@@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we'll manually delete it here.
try {
- fs.delete(new Path(old.getCheckpointFile.get), true)
+ val path = new Path(old.getCheckpointFile.get)
+ val fs = path.getFileSystem(hadoopConf)
+ fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
@@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
if (old.getCheckpointFile.isDefined) {
try {
- fs.delete(new Path(old.getCheckpointFile.get), true)
+ val path = new Path(old.getCheckpointFile.get)
+ val fs = path.getFileSystem(hadoopConf)
+ fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
index 5c12c9305b..4dd498cd91 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicCheckpointer.scala
@@ -19,7 +19,8 @@ package org.apache.spark.mllib.impl
import scala.collection.mutable
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
@@ -160,21 +161,23 @@ private[mllib] abstract class PeriodicCheckpointer[T](
private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
- val fs = FileSystem.get(sc.hadoopConfiguration)
- getCheckpointFiles(old).foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
+ getCheckpointFiles(old).foreach(
+ PeriodicCheckpointer.removeCheckpointFile(_, sc.hadoopConfiguration))
}
}
private[spark] object PeriodicCheckpointer extends Logging {
/** Delete a checkpoint file, and log a warning if deletion fails. */
- def removeCheckpointFile(path: String, fs: FileSystem): Unit = {
+ def removeCheckpointFile(checkpointFile: String, conf: Configuration): Unit = {
try {
- fs.delete(new Path(path), true)
+ val path = new Path(checkpointFile)
+ val fs = path.getFileSystem(conf)
+ fs.delete(path, true)
} catch {
case e: Exception =>
logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
- path)
+ checkpointFile)
}
}
}