diff options
author | Sean Owen <sowen@cloudera.com> | 2016-01-02 13:15:53 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-01-02 13:15:53 +0000 |
commit | 15bd73627e04591fd13667b4838c9098342db965 (patch) | |
tree | 5b23cdd4e75138f38ad51ebb17dcf45305c9e3c0 /streaming | |
parent | 94f7a12b3c8e4a6ecd969893e562feb7ffba4c24 (diff) | |
download | spark-15bd73627e04591fd13667b4838c9098342db965.tar.gz spark-15bd73627e04591fd13667b4838c9098342db965.tar.bz2 spark-15bd73627e04591fd13667b4838c9098342db965.zip |
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x
Remove use of deprecated Hadoop APIs now that 2.2+ is required
Author: Sean Owen <sowen@cloudera.com>
Closes #10446 from srowen/SPARK-12481.
Diffstat (limited to 'streaming')
3 files changed, 5 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9418beec0d..15ad2e27d3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -224,7 +224,8 @@ private[streaming] class FileBasedWriteAheadLog( val logDirectoryPath = new Path(logDirectory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + if (fileSystem.exists(logDirectoryPath) && + fileSystem.getFileStatus(logDirectoryPath).isDirectory) { val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath }) pastLogs.clear() pastLogs ++= logFileInfo diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala index 1185f30265..1f5c1d4369 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala @@ -19,10 +19,7 @@ package org.apache.spark.streaming.util import java.io._ import java.nio.ByteBuffer -import scala.util.Try - import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FSDataOutputStream import org.apache.spark.util.Utils @@ -34,11 +31,6 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf) - private lazy val hadoopFlushMethod = { - // Use reflection to get the right flush operation - val cls = classOf[FSDataOutputStream] - Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption - } private var nextOffset = stream.getPos() private var closed = false @@ -62,7 +54,7 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf: } private def flush() { - hadoopFlushMethod.foreach { _.invoke(stream) } + stream.hflush() // Useful for local file system where hflush/sync does not work (HADOOP-7844) stream.getWrappedStream.flush() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index beaae34535..a670c7d638 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -705,7 +705,8 @@ object WriteAheadLogSuite { val logDirectoryPath = new Path(directory) val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) - if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { + if (fileSystem.exists(logDirectoryPath) && + fileSystem.getFileStatus(logDirectoryPath).isDirectory) { fileSystem.listStatus(logDirectoryPath).map { _.getPath() }.sortBy { _.getName().split("-")(1).toLong }.map { |