aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
committerSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
commit15bd73627e04591fd13667b4838c9098342db965 (patch)
tree5b23cdd4e75138f38ad51ebb17dcf45305c9e3c0 /streaming/src/main
parent94f7a12b3c8e4a6ecd969893e562feb7ffba4c24 (diff)
downloadspark-15bd73627e04591fd13667b4838c9098342db965.tar.gz
spark-15bd73627e04591fd13667b4838c9098342db965.tar.bz2
spark-15bd73627e04591fd13667b4838c9098342db965.zip
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x
Remove use of deprecated Hadoop APIs now that 2.2+ is required Author: Sean Owen <sowen@cloudera.com> Closes #10446 from srowen/SPARK-12481.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala10
2 files changed, 3 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9418beec0d..15ad2e27d3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -224,7 +224,8 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ if (fileSystem.exists(logDirectoryPath) &&
+ fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
index 1185f30265..1f5c1d4369 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -19,10 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
-import scala.util.Try
-
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.util.Utils
@@ -34,11 +31,6 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
- private lazy val hadoopFlushMethod = {
- // Use reflection to get the right flush operation
- val cls = classOf[FSDataOutputStream]
- Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
- }
private var nextOffset = stream.getPos()
private var closed = false
@@ -62,7 +54,7 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
}
private def flush() {
- hadoopFlushMethod.foreach { _.invoke(stream) }
+ stream.hflush()
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
stream.getWrappedStream.flush()
}