aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorHari Shreedharan <hshreedharan@apache.org>2014-10-24 11:44:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-10-24 11:44:48 -0700
commit6a40a76848203d7266c134a26191579138c76903 (patch)
tree45c1984e36fdf83aa4ae2a7130217b7f8a5e7701 /streaming
parent7c89a8f0c81ecf91dba34c1f44393f45845d438c (diff)
downloadspark-6a40a76848203d7266c134a26191579138c76903.tar.gz
spark-6a40a76848203d7266c134a26191579138c76903.tar.bz2
spark-6a40a76848203d7266c134a26191579138c76903.zip
[SPARK-4026][Streaming] Write ahead log management
As part of the effort to avoid data loss on Spark Streaming driver failure, we want to implement a write ahead log that can write received data to HDFS. This allows the received data to be persist across driver failures. So when the streaming driver is restarted, it can find and reprocess all the data that were received but not processed. This was primarily implemented by @harishreedharan. This is still WIP, as he is going to improve the unitests by using HDFS mini cluster. Author: Hari Shreedharan <hshreedharan@apache.org> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2882 from tdas/driver-ha-wal and squashes the following commits: e4bee20 [Tathagata Das] Removed synchronized, Path.getFileSystem is threadsafe 55514e2 [Tathagata Das] Minor changes based on PR comments. d29fddd [Tathagata Das] Merge pull request #20 from harishreedharan/driver-ha-wal a317a4d [Hari Shreedharan] Directory deletion should not fail tests 9514dc8 [Tathagata Das] Added unit tests to test reading of corrupted data and other minor edits 3881706 [Tathagata Das] Merge pull request #19 from harishreedharan/driver-ha-wal 4705fff [Hari Shreedharan] Sort listed files by name. Use local files for WAL tests. eb356ca [Tathagata Das] Merge pull request #18 from harishreedharan/driver-ha-wal 82ce56e [Hari Shreedharan] Fix file ordering issue in WALManager tests 5ff90ee [Hari Shreedharan] Fix tests to not ignore ordering and also assert all data is present ef8db09 [Tathagata Das] Merge pull request #17 from harishreedharan/driver-ha-wal 7e40e56 [Hari Shreedharan] Restore old build directory after tests 587b876 [Hari Shreedharan] Fix broken test. Call getFileSystem only from synchronized method. b4be0c1 [Hari Shreedharan] Remove unused method edcbee1 [Hari Shreedharan] Tests reading and writing data using writers now use Minicluster. 5c70d1f [Hari Shreedharan] Remove underlying stream from the WALWriter. 4ab602a [Tathagata Das] Refactored write ahead stuff from streaming.storage to streaming.util b06be2b [Tathagata Das] Adding missing license. 5182ffb [Hari Shreedharan] Added documentation 172358d [Tathagata Das] Pulled WriteAheadLog-related stuff from tdas/spark/tree/driver-ha-working
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala72
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala224
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala82
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala82
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala357
7 files changed, 892 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
new file mode 100644
index 0000000000..491f117557
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+private[streaming] object HdfsUtils {
+
+ def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ // If the file exists and we have append support, append instead of creating a new file
+ val stream: FSDataOutputStream = {
+ if (dfs.isFile(dfsPath)) {
+ if (conf.getBoolean("hdfs.append.support", false)) {
+ dfs.append(dfsPath)
+ } else {
+ throw new IllegalStateException("File exists and there is no append support!")
+ }
+ } else {
+ dfs.create(dfsPath)
+ }
+ }
+ stream
+ }
+
+ def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ val instream = dfs.open(dfsPath)
+ instream
+ }
+
+ def checkState(state: Boolean, errorMsg: => String) {
+ if (!state) {
+ throw new IllegalStateException(errorMsg)
+ }
+ }
+
+ def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ val fileStatus = dfs.getFileStatus(dfsPath)
+ val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
+ blockLocs.map(_.flatMap(_.getHosts))
+ }
+
+ def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
+ // For local file systems, return the raw loca file system, such calls to flush()
+ // actually flushes the stream.
+ val fs = path.getFileSystem(conf)
+ fs match {
+ case localFs: LocalFileSystem => localFs.getRawFileSystem
+ case _ => fs
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
new file mode 100644
index 0000000000..1005a2c8ec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+/** Class for representing a segment of data in a write ahead log file */
+private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
new file mode 100644
index 0000000000..70d234320b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ * @param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
+ * Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
+ * Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+ logDirectory: String,
+ hadoopConf: Configuration,
+ rollingIntervalSecs: Int = 60,
+ maxFailures: Int = 3,
+ callerName: String = "",
+ clock: Clock = new SystemClock
+ ) extends Logging {
+
+ private val pastLogs = new ArrayBuffer[LogInfo]
+ private val callerNameTag =
+ if (callerName.nonEmpty) s" for $callerName" else ""
+ private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
+ implicit private val executionContext = ExecutionContext.fromExecutorService(
+ Utils.newDaemonFixedThreadPool(1, threadpoolName))
+ override protected val logName = s"WriteAheadLogManager $callerNameTag"
+
+ private var currentLogPath: Option[String] = None
+ private var currentLogWriter: WriteAheadLogWriter = null
+ private var currentLogWriterStartTime: Long = -1L
+ private var currentLogWriterStopTime: Long = -1L
+
+ initializeOrRecover()
+
+ /**
+ * Write a byte buffer to the log file. This method synchronously writes the data in the
+ * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
+ * to HDFS, and will be available for readers to read.
+ */
+ def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+ var fileSegment: WriteAheadLogFileSegment = null
+ var failures = 0
+ var lastException: Exception = null
+ var succeeded = false
+ while (!succeeded && failures < maxFailures) {
+ try {
+ fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+ succeeded = true
+ } catch {
+ case ex: Exception =>
+ lastException = ex
+ logWarning("Failed to write to write ahead log")
+ resetWriter()
+ failures += 1
+ }
+ }
+ if (fileSegment == null) {
+ logError(s"Failed to write to write ahead log after $failures failures")
+ throw lastException
+ }
+ fileSegment
+ }
+
+ /**
+ * Read all the existing logs from the log directory.
+ *
+ * Note that this is typically called when the caller is initializing and wants
+ * to recover past state from the write ahead logs (that is, before making any writes).
+ * If this is called after writes have been made using this manager, then it may not return
+ * the latest the records. This does not deal with currently active log files, and
+ * hence the implementation is kept simple.
+ */
+ def readFromLog(): Iterator[ByteBuffer] = synchronized {
+ val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
+ logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+ logFilesToRead.iterator.map { file =>
+ logDebug(s"Creating log reader with $file")
+ new WriteAheadLogReader(file, hadoopConf)
+ } flatMap { x => x }
+ }
+
+ /**
+ * Delete the log files that are older than the threshold time.
+ *
+ * Its important to note that the threshold time is based on the time stamps used in the log
+ * files, which is usually based on the local system time. So if there is coordination necessary
+ * between the node calculating the threshTime (say, driver node), and the local system time
+ * (say, worker node), the caller has to take account of possible time skew.
+ */
+ def cleanupOldLogs(threshTime: Long): Unit = {
+ val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+ s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+
+ def deleteFiles() {
+ oldLogFiles.foreach { logInfo =>
+ try {
+ val path = new Path(logInfo.path)
+ val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+ fs.delete(path, true)
+ synchronized { pastLogs -= logInfo }
+ logDebug(s"Cleared log file $logInfo")
+ } catch {
+ case ex: Exception =>
+ logWarning(s"Error clearing write ahead log file $logInfo", ex)
+ }
+ }
+ logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+ }
+ if (!executionContext.isShutdown) {
+ Future { deleteFiles() }
+ }
+ }
+
+ /** Stop the manager, close any open log writer */
+ def stop(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ }
+ executionContext.shutdown()
+ logInfo("Stopped write ahead log manager")
+ }
+
+ /** Get the current log writer while taking care of rotation */
+ private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
+ if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
+ resetWriter()
+ currentLogPath.foreach {
+ pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
+ }
+ currentLogWriterStartTime = currentTime
+ currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+ val newLogPath = new Path(logDirectory,
+ timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
+ currentLogPath = Some(newLogPath.toString)
+ currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
+ }
+ currentLogWriter
+ }
+
+ /** Initialize the log directory or recover existing logs inside the directory */
+ private def initializeOrRecover(): Unit = synchronized {
+ val logDirectoryPath = new Path(logDirectory)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+ if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ }
+
+ private def resetWriter(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ currentLogWriter = null
+ }
+ }
+}
+
+private[util] object WriteAheadLogManager {
+
+ case class LogInfo(startTime: Long, endTime: Long, path: String)
+
+ val logFileRegex = """log-(\d+)-(\d+)""".r
+
+ def timeToLogFile(startTime: Long, stopTime: Long): String = {
+ s"log-$startTime-$stopTime"
+ }
+
+ /** Convert a sequence of files to a sequence of sorted LogInfo objects */
+ def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
+ files.flatMap { file =>
+ logFileRegex.findFirstIn(file.getName()) match {
+ case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
+ val startTime = startTimeStr.toLong
+ val stopTime = stopTimeStr.toLong
+ Some(LogInfo(startTime, stopTime, file.toString))
+ case None =>
+ None
+ }
+ }.sortBy { _.startTime }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
new file mode 100644
index 0000000000..92bad7a882
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.Closeable
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A random access reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (bytebuffer) from the log file.
+ */
+private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
+ extends Closeable {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+
+ def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
+ assertOpen()
+ instream.seek(segment.offset)
+ val nextLength = instream.readInt()
+ HdfsUtils.checkState(nextLength == segment.length,
+ s"Expected message length to be ${segment.length}, but was $nextLength")
+ val buffer = new Array[Byte](nextLength)
+ instream.readFully(buffer)
+ ByteBuffer.wrap(buffer)
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ instream.close()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
new file mode 100644
index 0000000000..2afc0d1551
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
+ extends Iterator[ByteBuffer] with Closeable with Logging {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+ private var nextItem: Option[ByteBuffer] = None
+
+ override def hasNext: Boolean = synchronized {
+ if (closed) {
+ return false
+ }
+
+ if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
+ true
+ } else {
+ try {
+ val length = instream.readInt()
+ val buffer = new Array[Byte](length)
+ instream.readFully(buffer)
+ nextItem = Some(ByteBuffer.wrap(buffer))
+ logTrace("Read next item " + nextItem.get)
+ true
+ } catch {
+ case e: EOFException =>
+ logDebug("Error reading next item, EOF reached", e)
+ close()
+ false
+ case e: Exception =>
+ logWarning("Error while trying to read data from HDFS.", e)
+ close()
+ throw e
+ }
+ }
+ }
+
+ override def next(): ByteBuffer = synchronized {
+ val data = nextItem.getOrElse {
+ close()
+ throw new IllegalStateException(
+ "next called without calling hasNext or after hasNext returned false")
+ }
+ nextItem = None // Ensure the next hasNext call loads new data.
+ data
+ }
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ instream.close()
+ }
+ closed = true
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
new file mode 100644
index 0000000000..679f6a6dfd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
+ extends Closeable {
+
+ private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
+
+ private lazy val hadoopFlushMethod = {
+ // Use reflection to get the right flush operation
+ val cls = classOf[FSDataOutputStream]
+ Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
+ }
+
+ private var nextOffset = stream.getPos()
+ private var closed = false
+
+ /** Write the bytebuffer to the log file */
+ def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+ assertOpen()
+ data.rewind() // Rewind to ensure all data in the buffer is retrieved
+ val lengthToWrite = data.remaining()
+ val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
+ stream.writeInt(lengthToWrite)
+ if (data.hasArray) {
+ stream.write(data.array())
+ } else {
+ // If the buffer is not backed by an array, we transfer using temp array
+ // Note that despite the extra array copy, this should be faster than byte-by-byte copy
+ while (data.hasRemaining) {
+ val array = new Array[Byte](data.remaining)
+ data.get(array)
+ stream.write(array)
+ }
+ }
+ flush()
+ nextOffset = stream.getPos()
+ segment
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ stream.close()
+ }
+
+ private def flush() {
+ hadoopFlushMethod.foreach { _.invoke(stream) }
+ // Useful for local file system where hflush/sync does not work (HADOOP-7844)
+ stream.getWrappedStream.flush()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
new file mode 100644
index 0000000000..5eba93c208
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.{implicitConversions, postfixOps}
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+ val hadoopConf = new Configuration()
+ var tempDir: File = null
+ var testDir: String = null
+ var testFile: String = null
+ var manager: WriteAheadLogManager = null
+
+ before {
+ tempDir = Files.createTempDir()
+ testDir = tempDir.toString
+ testFile = new File(tempDir, Random.nextString(10)).toString
+ if (manager != null) {
+ manager.stop()
+ manager = null
+ }
+ }
+
+ after {
+ FileUtils.deleteQuietly(tempDir)
+ }
+
+ test("WriteAheadLogWriter - writing data") {
+ val dataToWrite = generateRandomData()
+ val segments = writeDataUsingWriter(testFile, dataToWrite)
+ val writtenData = readDataManually(testFile, segments)
+ assert(writtenData === dataToWrite)
+ }
+
+ test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
+ val dataToWrite = generateRandomData()
+ val writer = new WriteAheadLogWriter(testFile, hadoopConf)
+ dataToWrite.foreach { data =>
+ val segment = writer.write(stringToByteBuffer(data))
+ val dataRead = readDataManually(testFile, Seq(segment)).head
+ assert(data === dataRead)
+ }
+ writer.close()
+ }
+
+ test("WriteAheadLogReader - sequentially reading data") {
+ val writtenData = generateRandomData()
+ writeDataManually(writtenData, testFile)
+ val reader = new WriteAheadLogReader(testFile, hadoopConf)
+ val readData = reader.toSeq.map(byteBufferToString)
+ assert(readData === writtenData)
+ assert(reader.hasNext === false)
+ intercept[Exception] {
+ reader.next()
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogReader - sequentially reading data written with writer") {
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val readData = readDataUsingReader(testFile)
+ assert(readData === dataToWrite)
+ }
+
+ test("WriteAheadLogReader - reading data written with writer after corrupted write") {
+ // Write data manually for testing the sequential reader
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val fileLength = new File(testFile).length()
+
+ // Append some garbage data to get the effect of a corrupted write
+ val fw = new FileWriter(testFile, true)
+ fw.append("This line appended to file!")
+ fw.close()
+
+ // Verify the data can be read and is same as the one correctly written
+ assert(readDataUsingReader(testFile) === dataToWrite)
+
+ // Corrupt the last correctly written file
+ val raf = new FileOutputStream(testFile, true).getChannel()
+ raf.truncate(fileLength - 1)
+ raf.close()
+
+ // Verify all the data except the last can be read
+ assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
+ }
+
+ test("WriteAheadLogRandomReader - reading data using random reader") {
+ // Write data manually for testing the random reader
+ val writtenData = generateRandomData()
+ val segments = writeDataManually(writtenData, testFile)
+
+ // Get a random order of these segments and read them back
+ val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
+ val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ writtenDataAndSegments.foreach { case (data, segment) =>
+ assert(data === byteBufferToString(reader.read(segment)))
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
+ // Write data using writer for testing the random reader
+ val data = generateRandomData()
+ val segments = writeDataUsingWriter(testFile, data)
+
+ // Read a random sequence of segments and verify read data
+ val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
+ val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ dataAndSegments.foreach { case (data, segment) =>
+ assert(data === byteBufferToString(reader.read(segment)))
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogManager - write rotating logs") {
+ // Write data using manager
+ val dataToWrite = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite)
+
+ // Read data manually to verify the written data
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ val writtenData = logFiles.flatMap { file => readDataManually(file)}
+ assert(writtenData === dataToWrite)
+ }
+
+ test("WriteAheadLogManager - read rotating logs") {
+ // Write data manually for testing reading through manager
+ val writtenData = (1 to 10).map { i =>
+ val data = generateRandomData()
+ val file = testDir + s"/log-$i-$i"
+ writeDataManually(data, file)
+ data
+ }.flatten
+
+ val logDirectoryPath = new Path(testDir)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+ assert(fileSystem.exists(logDirectoryPath) === true)
+
+ // Read data using manager and verify
+ val readData = readDataUsingManager(testDir)
+ assert(readData === writtenData)
+ }
+
+ test("WriteAheadLogManager - recover past logs when creating new manager") {
+ // Write data with manager, recover with new manager and verify
+ val dataToWrite = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite)
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ val readData = readDataUsingManager(testDir)
+ assert(dataToWrite === readData)
+ }
+
+ test("WriteAheadLogManager - cleanup old logs") {
+ // Write data with manager, recover with new manager and verify
+ val manualClock = new ManualClock
+ val dataToWrite = generateRandomData()
+ manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ manager.cleanupOldLogs(manualClock.currentTime() / 2)
+ eventually(timeout(1 second), interval(10 milliseconds)) {
+ assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ }
+ }
+
+ test("WriteAheadLogManager - handling file errors while reading rotating logs") {
+ // Generate a set of log files
+ val manualClock = new ManualClock
+ val dataToWrite1 = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite1, manualClock)
+ val logFiles1 = getLogFilesInDirectory(testDir)
+ assert(logFiles1.size > 1)
+
+
+ // Recover old files and generate a second set of log files
+ val dataToWrite2 = generateRandomData()
+ manualClock.addToTime(100000)
+ writeDataUsingManager(testDir, dataToWrite2, manualClock)
+ val logFiles2 = getLogFilesInDirectory(testDir)
+ assert(logFiles2.size > logFiles1.size)
+
+ // Read the files and verify that all the written data can be read
+ val readData1 = readDataUsingManager(testDir)
+ assert(readData1 === (dataToWrite1 ++ dataToWrite2))
+
+ // Corrupt the first set of files so that they are basically unreadable
+ logFiles1.foreach { f =>
+ val raf = new FileOutputStream(f, true).getChannel()
+ raf.truncate(1)
+ raf.close()
+ }
+
+ // Verify that the corrupted files do not prevent reading of the second set of data
+ val readData = readDataUsingManager(testDir)
+ assert(readData === dataToWrite2)
+ }
+}
+
+object WriteAheadLogSuite {
+
+ private val hadoopConf = new Configuration()
+
+ /** Write data to a file directly and return an array of the file segments written. */
+ def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = {
+ val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
+ val writer = HdfsUtils.getOutputStream(file, hadoopConf)
+ data.foreach { item =>
+ val offset = writer.getPos
+ val bytes = Utils.serialize(item)
+ writer.writeInt(bytes.size)
+ writer.write(bytes)
+ segments += WriteAheadLogFileSegment(file, offset, bytes.size)
+ }
+ writer.close()
+ segments
+ }
+
+ /**
+ * Write data to a file using the writer class and return an array of the file segments written.
+ */
+ def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = {
+ val writer = new WriteAheadLogWriter(filePath, hadoopConf)
+ val segments = data.map {
+ item => writer.write(item)
+ }
+ writer.close()
+ segments
+ }
+
+ /** Write data to rotating files in log directory using the manager class. */
+ def writeDataUsingManager(
+ logDirectory: String,
+ data: Seq[String],
+ manualClock: ManualClock = new ManualClock,
+ stopManager: Boolean = true
+ ): WriteAheadLogManager = {
+ if (manualClock.currentTime < 100000) manualClock.setTime(10000)
+ val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+ rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
+ // Ensure that 500 does not get sorted after 2000, so put a high base value.
+ data.foreach { item =>
+ manualClock.addToTime(500)
+ manager.writeToLog(item)
+ }
+ if (stopManager) manager.stop()
+ manager
+ }
+
+ /** Read data from a segments of a log file directly and return the list of byte buffers.*/
+ def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+ val reader = HdfsUtils.getInputStream(file, hadoopConf)
+ segments.map { x =>
+ reader.seek(x.offset)
+ val data = new Array[Byte](x.length)
+ reader.readInt()
+ reader.readFully(data)
+ Utils.deserialize[String](data)
+ }
+ }
+
+ /** Read all the data from a log file directly and return the list of byte buffers. */
+ def readDataManually(file: String): Seq[String] = {
+ val reader = HdfsUtils.getInputStream(file, hadoopConf)
+ val buffer = new ArrayBuffer[String]
+ try {
+ while (true) {
+ // Read till EOF is thrown
+ val length = reader.readInt()
+ val bytes = new Array[Byte](length)
+ reader.read(bytes)
+ buffer += Utils.deserialize[String](bytes)
+ }
+ } catch {
+ case ex: EOFException =>
+ } finally {
+ reader.close()
+ }
+ buffer
+ }
+
+ /** Read all the data from a log file using reader class and return the list of byte buffers. */
+ def readDataUsingReader(file: String): Seq[String] = {
+ val reader = new WriteAheadLogReader(file, hadoopConf)
+ val readData = reader.toList.map(byteBufferToString)
+ reader.close()
+ readData
+ }
+
+ /** Read all the data in the log file in a directory using the manager class. */
+ def readDataUsingManager(logDirectory: String): Seq[String] = {
+ val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+ callerName = "WriteAheadLogSuite")
+ val data = manager.readFromLog().map(byteBufferToString).toSeq
+ manager.stop()
+ data
+ }
+
+ /** Get the log files in a direction */
+ def getLogFilesInDirectory(directory: String): Seq[String] = {
+ val logDirectoryPath = new Path(directory)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+ if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ fileSystem.listStatus(logDirectoryPath).map {
+ _.getPath.toString.stripPrefix("file:")
+ }.sorted
+ } else {
+ Seq.empty
+ }
+ }
+
+ def generateRandomData(): Seq[String] = {
+ (1 to 100).map { _.toString }
+ }
+
+ implicit def stringToByteBuffer(str: String): ByteBuffer = {
+ ByteBuffer.wrap(Utils.serialize(str))
+ }
+
+ implicit def byteBufferToString(byteBuffer: ByteBuffer): String = {
+ Utils.deserialize[String](byteBuffer.array)
+ }
+}