aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-20 11:28:51 +0100
committerSean Owen <sowen@cloudera.com>2016-04-20 11:28:51 +0100
commit17db4bfeaa0074298db622db38a5b0459518c4a9 (patch)
treed3f8b90be7a3ca83f3e01ad47ff50865a784060b
parenta3451119d951949f24f3a4c5e33a5daea615dfed (diff)
downloadspark-17db4bfeaa0074298db622db38a5b0459518c4a9.tar.gz
spark-17db4bfeaa0074298db622db38a5b0459518c4a9.tar.bz2
spark-17db4bfeaa0074298db622db38a5b0459518c4a9.zip
[SPARK-14687][CORE][SQL][MLLIB] Call path.getFileSystem(conf) instead of call FileSystem.get(conf)
## What changes were proposed in this pull request? - replaced `FileSystem.get(conf)` calls with `path.getFileSystem(conf)` ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #12450 from lw-lin/fix-fs-get.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala3
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala5
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
5 files changed, 9 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d5afb33c71..2bd4a46e16 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -353,7 +353,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* the name of the file being compressed.
*/
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
- val fs = FileSystem.get(hadoopConf)
+ val fs = file.getFileSystem(hadoopConf)
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
try {
outputStream.putNextEntry(new ZipEntry(entryName))
@@ -372,7 +372,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
- zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
+ zipFileToStream(logPath, attempt.logPath, zipStream)
}
} finally {
zipStream.close()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 36dce01590..728b4d1598 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -640,7 +640,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val deletePreviousCheckpointFile: () => Unit = () =>
previousCheckpointFile.foreach { file =>
try {
- FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true)
+ val checkpointFile = new Path(file)
+ checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint file $file:", e)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index 17d6e9fc2e..6cb07aecb9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -273,8 +273,9 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
// There should be 1 checkpoint remaining.
assert(model.getCheckpointFiles.length === 1)
- val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
- assert(fs.exists(new Path(model.getCheckpointFiles.head)))
+ val checkpointFile = new Path(model.getCheckpointFiles.head)
+ val fs = checkpointFile.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ assert(fs.exists(checkpointFile))
model.deleteCheckpointFiles()
assert(model.getCheckpointFiles.isEmpty)
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 5c257bc260..b224a86845 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -178,7 +178,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
config.set("spark.sql.parquet.writeLegacyFormat", "false");
this.file = new Path(path);
- long length = FileSystem.get(config).getFileStatus(this.file).getLen();
+ long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));
List<BlockMetaData> blocks = footer.getBlocks();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 1b70055f34..6448cb6e90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -39,7 +39,7 @@ class FileStreamSource(
providerName: String,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
- private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
+ private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)