diff options
5 files changed, 9 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index d5afb33c71..2bd4a46e16 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -353,7 +353,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * the name of the file being compressed. */ def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = { - val fs = FileSystem.get(hadoopConf) + val fs = file.getFileSystem(hadoopConf) val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer try { outputStream.putNextEntry(new ZipEntry(entryName)) @@ -372,7 +372,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get }.foreach { attempt => val logPath = new Path(logDir, attempt.logPath) - zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream) + zipFileToStream(logPath, attempt.logPath, zipStream) } } finally { zipStream.close() diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 36dce01590..728b4d1598 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -640,7 +640,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val deletePreviousCheckpointFile: () => Unit = () => previousCheckpointFile.foreach { file => try { - FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true) + val checkpointFile = new Path(file) + checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true) } catch { case e: IOException => logWarning(s"Cannot delete checkpoint file $file:", e) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala index 17d6e9fc2e..6cb07aecb9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala @@ -273,8 +273,9 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead // There should be 1 checkpoint remaining. assert(model.getCheckpointFiles.length === 1) - val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration) - assert(fs.exists(new Path(model.getCheckpointFiles.head))) + val checkpointFile = new Path(model.getCheckpointFiles.head) + val fs = checkpointFile.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + assert(fs.exists(checkpointFile)) model.deleteCheckpointFiles() assert(model.getCheckpointFiles.isEmpty) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 5c257bc260..b224a86845 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -178,7 +178,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo config.set("spark.sql.parquet.writeLegacyFormat", "false"); this.file = new Path(path); - long length = FileSystem.get(config).getFileStatus(this.file).getLen(); + long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); ParquetMetadata footer = readFooter(config, file, range(0, length)); List<BlockMetaData> blocks = footer.getBlocks(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 1b70055f34..6448cb6e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -39,7 +39,7 @@ class FileStreamSource( providerName: String, dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging { - private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration) + private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration) private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath) private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L) |