aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala10
2 files changed, 3 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9418beec0d..15ad2e27d3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -224,7 +224,8 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ if (fileSystem.exists(logDirectoryPath) &&
+ fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
index 1185f30265..1f5c1d4369 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -19,10 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
-import scala.util.Try
-
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.util.Utils
@@ -34,11 +31,6 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
- private lazy val hadoopFlushMethod = {
- // Use reflection to get the right flush operation
- val cls = classOf[FSDataOutputStream]
- Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
- }
private var nextOffset = stream.getPos()
private var closed = false
@@ -62,7 +54,7 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
}
private def flush() {
- hadoopFlushMethod.foreach { _.invoke(stream) }
+ stream.hflush()
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
stream.getWrappedStream.flush()
}