aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala72
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala224
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala82
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala82
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala357
7 files changed, 892 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
new file mode 100644
index 0000000000..491f117557
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+
+private[streaming] object HdfsUtils {
+
+ def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ // If the file exists and we have append support, append instead of creating a new file
+ val stream: FSDataOutputStream = {
+ if (dfs.isFile(dfsPath)) {
+ if (conf.getBoolean("hdfs.append.support", false)) {
+ dfs.append(dfsPath)
+ } else {
+ throw new IllegalStateException("File exists and there is no append support!")
+ }
+ } else {
+ dfs.create(dfsPath)
+ }
+ }
+ stream
+ }
+
+ def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ val instream = dfs.open(dfsPath)
+ instream
+ }
+
+ def checkState(state: Boolean, errorMsg: => String) {
+ if (!state) {
+ throw new IllegalStateException(errorMsg)
+ }
+ }
+
+ def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
+ val dfsPath = new Path(path)
+ val dfs = getFileSystemForPath(dfsPath, conf)
+ val fileStatus = dfs.getFileStatus(dfsPath)
+ val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
+ blockLocs.map(_.flatMap(_.getHosts))
+ }
+
+ def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
+ // For local file systems, return the raw loca file system, such calls to flush()
+ // actually flushes the stream.
+ val fs = path.getFileSystem(conf)
+ fs match {
+ case localFs: LocalFileSystem => localFs.getRawFileSystem
+ case _ => fs
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
new file mode 100644
index 0000000000..1005a2c8ec
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogFileSegment.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+/** Class for representing a segment of data in a write ahead log file */
+private[streaming] case class WriteAheadLogFileSegment (path: String, offset: Long, length: Int)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
new file mode 100644
index 0000000000..70d234320b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+import WriteAheadLogManager._
+
+/**
+ * This class manages write ahead log files.
+ * - Writes records (bytebuffers) to periodically rotating log files.
+ * - Recovers the log files and the reads the recovered records upon failures.
+ * - Cleans up old log files.
+ *
+ * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write
+ * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read.
+ *
+ * @param logDirectory Directory when rotating log files will be created.
+ * @param hadoopConf Hadoop configuration for reading/writing log files.
+ * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over.
+ * Default is one minute.
+ * @param maxFailures Max number of failures that is tolerated for every attempt to write to log.
+ * Default is three.
+ * @param callerName Optional name of the class who is using this manager.
+ * @param clock Optional clock that is used to check for rotation interval.
+ */
+private[streaming] class WriteAheadLogManager(
+ logDirectory: String,
+ hadoopConf: Configuration,
+ rollingIntervalSecs: Int = 60,
+ maxFailures: Int = 3,
+ callerName: String = "",
+ clock: Clock = new SystemClock
+ ) extends Logging {
+
+ private val pastLogs = new ArrayBuffer[LogInfo]
+ private val callerNameTag =
+ if (callerName.nonEmpty) s" for $callerName" else ""
+ private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
+ implicit private val executionContext = ExecutionContext.fromExecutorService(
+ Utils.newDaemonFixedThreadPool(1, threadpoolName))
+ override protected val logName = s"WriteAheadLogManager $callerNameTag"
+
+ private var currentLogPath: Option[String] = None
+ private var currentLogWriter: WriteAheadLogWriter = null
+ private var currentLogWriterStartTime: Long = -1L
+ private var currentLogWriterStopTime: Long = -1L
+
+ initializeOrRecover()
+
+ /**
+ * Write a byte buffer to the log file. This method synchronously writes the data in the
+ * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
+ * to HDFS, and will be available for readers to read.
+ */
+ def writeToLog(byteBuffer: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+ var fileSegment: WriteAheadLogFileSegment = null
+ var failures = 0
+ var lastException: Exception = null
+ var succeeded = false
+ while (!succeeded && failures < maxFailures) {
+ try {
+ fileSegment = getLogWriter(clock.currentTime).write(byteBuffer)
+ succeeded = true
+ } catch {
+ case ex: Exception =>
+ lastException = ex
+ logWarning("Failed to write to write ahead log")
+ resetWriter()
+ failures += 1
+ }
+ }
+ if (fileSegment == null) {
+ logError(s"Failed to write to write ahead log after $failures failures")
+ throw lastException
+ }
+ fileSegment
+ }
+
+ /**
+ * Read all the existing logs from the log directory.
+ *
+ * Note that this is typically called when the caller is initializing and wants
+ * to recover past state from the write ahead logs (that is, before making any writes).
+ * If this is called after writes have been made using this manager, then it may not return
+ * the latest the records. This does not deal with currently active log files, and
+ * hence the implementation is kept simple.
+ */
+ def readFromLog(): Iterator[ByteBuffer] = synchronized {
+ val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
+ logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
+ logFilesToRead.iterator.map { file =>
+ logDebug(s"Creating log reader with $file")
+ new WriteAheadLogReader(file, hadoopConf)
+ } flatMap { x => x }
+ }
+
+ /**
+ * Delete the log files that are older than the threshold time.
+ *
+ * Its important to note that the threshold time is based on the time stamps used in the log
+ * files, which is usually based on the local system time. So if there is coordination necessary
+ * between the node calculating the threshTime (say, driver node), and the local system time
+ * (say, worker node), the caller has to take account of possible time skew.
+ */
+ def cleanupOldLogs(threshTime: Long): Unit = {
+ val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
+ logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
+ s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
+
+ def deleteFiles() {
+ oldLogFiles.foreach { logInfo =>
+ try {
+ val path = new Path(logInfo.path)
+ val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+ fs.delete(path, true)
+ synchronized { pastLogs -= logInfo }
+ logDebug(s"Cleared log file $logInfo")
+ } catch {
+ case ex: Exception =>
+ logWarning(s"Error clearing write ahead log file $logInfo", ex)
+ }
+ }
+ logInfo(s"Cleared log files in $logDirectory older than $threshTime")
+ }
+ if (!executionContext.isShutdown) {
+ Future { deleteFiles() }
+ }
+ }
+
+ /** Stop the manager, close any open log writer */
+ def stop(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ }
+ executionContext.shutdown()
+ logInfo("Stopped write ahead log manager")
+ }
+
+ /** Get the current log writer while taking care of rotation */
+ private def getLogWriter(currentTime: Long): WriteAheadLogWriter = synchronized {
+ if (currentLogWriter == null || currentTime > currentLogWriterStopTime) {
+ resetWriter()
+ currentLogPath.foreach {
+ pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _)
+ }
+ currentLogWriterStartTime = currentTime
+ currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000)
+ val newLogPath = new Path(logDirectory,
+ timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime))
+ currentLogPath = Some(newLogPath.toString)
+ currentLogWriter = new WriteAheadLogWriter(currentLogPath.get, hadoopConf)
+ }
+ currentLogWriter
+ }
+
+ /** Initialize the log directory or recover existing logs inside the directory */
+ private def initializeOrRecover(): Unit = synchronized {
+ val logDirectoryPath = new Path(logDirectory)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+ if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+ pastLogs.clear()
+ pastLogs ++= logFileInfo
+ logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+ logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+ }
+ }
+
+ private def resetWriter(): Unit = synchronized {
+ if (currentLogWriter != null) {
+ currentLogWriter.close()
+ currentLogWriter = null
+ }
+ }
+}
+
+private[util] object WriteAheadLogManager {
+
+ case class LogInfo(startTime: Long, endTime: Long, path: String)
+
+ val logFileRegex = """log-(\d+)-(\d+)""".r
+
+ def timeToLogFile(startTime: Long, stopTime: Long): String = {
+ s"log-$startTime-$stopTime"
+ }
+
+ /** Convert a sequence of files to a sequence of sorted LogInfo objects */
+ def logFilesTologInfo(files: Seq[Path]): Seq[LogInfo] = {
+ files.flatMap { file =>
+ logFileRegex.findFirstIn(file.getName()) match {
+ case Some(logFileRegex(startTimeStr, stopTimeStr)) =>
+ val startTime = startTimeStr.toLong
+ val stopTime = stopTimeStr.toLong
+ Some(LogInfo(startTime, stopTime, file.toString))
+ case None =>
+ None
+ }
+ }.sortBy { _.startTime }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
new file mode 100644
index 0000000000..92bad7a882
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.Closeable
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * A random access reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info,
+ * this reads the record (bytebuffer) from the log file.
+ */
+private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration)
+ extends Closeable {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+
+ def read(segment: WriteAheadLogFileSegment): ByteBuffer = synchronized {
+ assertOpen()
+ instream.seek(segment.offset)
+ val nextLength = instream.readInt()
+ HdfsUtils.checkState(nextLength == segment.length,
+ s"Expected message length to be ${segment.length}, but was $nextLength")
+ val buffer = new Array[Byte](nextLength)
+ instream.readFully(buffer)
+ ByteBuffer.wrap(buffer)
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ instream.close()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
+ }
+}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
new file mode 100644
index 0000000000..2afc0d1551
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io.{Closeable, EOFException}
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.Logging
+
+/**
+ * A reader for reading write ahead log files written using
+ * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads
+ * the records (bytebuffers) in the log file sequentially and return them as an
+ * iterator of bytebuffers.
+ */
+private[streaming] class WriteAheadLogReader(path: String, conf: Configuration)
+ extends Iterator[ByteBuffer] with Closeable with Logging {
+
+ private val instream = HdfsUtils.getInputStream(path, conf)
+ private var closed = false
+ private var nextItem: Option[ByteBuffer] = None
+
+ override def hasNext: Boolean = synchronized {
+ if (closed) {
+ return false
+ }
+
+ if (nextItem.isDefined) { // handle the case where hasNext is called without calling next
+ true
+ } else {
+ try {
+ val length = instream.readInt()
+ val buffer = new Array[Byte](length)
+ instream.readFully(buffer)
+ nextItem = Some(ByteBuffer.wrap(buffer))
+ logTrace("Read next item " + nextItem.get)
+ true
+ } catch {
+ case e: EOFException =>
+ logDebug("Error reading next item, EOF reached", e)
+ close()
+ false
+ case e: Exception =>
+ logWarning("Error while trying to read data from HDFS.", e)
+ close()
+ throw e
+ }
+ }
+ }
+
+ override def next(): ByteBuffer = synchronized {
+ val data = nextItem.getOrElse {
+ close()
+ throw new IllegalStateException(
+ "next called without calling hasNext or after hasNext returned false")
+ }
+ nextItem = None // Ensure the next hasNext call loads new data.
+ data
+ }
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ instream.close()
+ }
+ closed = true
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
new file mode 100644
index 0000000000..679f6a6dfd
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.net.URI
+import java.nio.ByteBuffer
+
+import scala.util.Try
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem}
+
+/**
+ * A writer for writing byte-buffers to a write ahead log file.
+ */
+private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration)
+ extends Closeable {
+
+ private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
+
+ private lazy val hadoopFlushMethod = {
+ // Use reflection to get the right flush operation
+ val cls = classOf[FSDataOutputStream]
+ Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
+ }
+
+ private var nextOffset = stream.getPos()
+ private var closed = false
+
+ /** Write the bytebuffer to the log file */
+ def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {
+ assertOpen()
+ data.rewind() // Rewind to ensure all data in the buffer is retrieved
+ val lengthToWrite = data.remaining()
+ val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)
+ stream.writeInt(lengthToWrite)
+ if (data.hasArray) {
+ stream.write(data.array())
+ } else {
+ // If the buffer is not backed by an array, we transfer using temp array
+ // Note that despite the extra array copy, this should be faster than byte-by-byte copy
+ while (data.hasRemaining) {
+ val array = new Array[Byte](data.remaining)
+ data.get(array)
+ stream.write(array)
+ }
+ }
+ flush()
+ nextOffset = stream.getPos()
+ segment
+ }
+
+ override def close(): Unit = synchronized {
+ closed = true
+ stream.close()
+ }
+
+ private def flush() {
+ hadoopFlushMethod.foreach { _.invoke(stream) }
+ // Useful for local file system where hflush/sync does not work (HADOOP-7844)
+ stream.getWrappedStream.flush()
+ }
+
+ private def assertOpen() {
+ HdfsUtils.checkState(!closed, "Stream is closed. Create a new Writer to write to file.")
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
new file mode 100644
index 0000000000..5eba93c208
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.util
+
+import java.io._
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.{implicitConversions, postfixOps}
+import scala.util.Random
+
+import WriteAheadLogSuite._
+import com.google.common.io.Files
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.util.Utils
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
+
+ val hadoopConf = new Configuration()
+ var tempDir: File = null
+ var testDir: String = null
+ var testFile: String = null
+ var manager: WriteAheadLogManager = null
+
+ before {
+ tempDir = Files.createTempDir()
+ testDir = tempDir.toString
+ testFile = new File(tempDir, Random.nextString(10)).toString
+ if (manager != null) {
+ manager.stop()
+ manager = null
+ }
+ }
+
+ after {
+ FileUtils.deleteQuietly(tempDir)
+ }
+
+ test("WriteAheadLogWriter - writing data") {
+ val dataToWrite = generateRandomData()
+ val segments = writeDataUsingWriter(testFile, dataToWrite)
+ val writtenData = readDataManually(testFile, segments)
+ assert(writtenData === dataToWrite)
+ }
+
+ test("WriteAheadLogWriter - syncing of data by writing and reading immediately") {
+ val dataToWrite = generateRandomData()
+ val writer = new WriteAheadLogWriter(testFile, hadoopConf)
+ dataToWrite.foreach { data =>
+ val segment = writer.write(stringToByteBuffer(data))
+ val dataRead = readDataManually(testFile, Seq(segment)).head
+ assert(data === dataRead)
+ }
+ writer.close()
+ }
+
+ test("WriteAheadLogReader - sequentially reading data") {
+ val writtenData = generateRandomData()
+ writeDataManually(writtenData, testFile)
+ val reader = new WriteAheadLogReader(testFile, hadoopConf)
+ val readData = reader.toSeq.map(byteBufferToString)
+ assert(readData === writtenData)
+ assert(reader.hasNext === false)
+ intercept[Exception] {
+ reader.next()
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogReader - sequentially reading data written with writer") {
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val readData = readDataUsingReader(testFile)
+ assert(readData === dataToWrite)
+ }
+
+ test("WriteAheadLogReader - reading data written with writer after corrupted write") {
+ // Write data manually for testing the sequential reader
+ val dataToWrite = generateRandomData()
+ writeDataUsingWriter(testFile, dataToWrite)
+ val fileLength = new File(testFile).length()
+
+ // Append some garbage data to get the effect of a corrupted write
+ val fw = new FileWriter(testFile, true)
+ fw.append("This line appended to file!")
+ fw.close()
+
+ // Verify the data can be read and is same as the one correctly written
+ assert(readDataUsingReader(testFile) === dataToWrite)
+
+ // Corrupt the last correctly written file
+ val raf = new FileOutputStream(testFile, true).getChannel()
+ raf.truncate(fileLength - 1)
+ raf.close()
+
+ // Verify all the data except the last can be read
+ assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
+ }
+
+ test("WriteAheadLogRandomReader - reading data using random reader") {
+ // Write data manually for testing the random reader
+ val writtenData = generateRandomData()
+ val segments = writeDataManually(writtenData, testFile)
+
+ // Get a random order of these segments and read them back
+ val writtenDataAndSegments = writtenData.zip(segments).toSeq.permutations.take(10).flatten
+ val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ writtenDataAndSegments.foreach { case (data, segment) =>
+ assert(data === byteBufferToString(reader.read(segment)))
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogRandomReader - reading data using random reader written with writer") {
+ // Write data using writer for testing the random reader
+ val data = generateRandomData()
+ val segments = writeDataUsingWriter(testFile, data)
+
+ // Read a random sequence of segments and verify read data
+ val dataAndSegments = data.zip(segments).toSeq.permutations.take(10).flatten
+ val reader = new WriteAheadLogRandomReader(testFile, hadoopConf)
+ dataAndSegments.foreach { case (data, segment) =>
+ assert(data === byteBufferToString(reader.read(segment)))
+ }
+ reader.close()
+ }
+
+ test("WriteAheadLogManager - write rotating logs") {
+ // Write data using manager
+ val dataToWrite = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite)
+
+ // Read data manually to verify the written data
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ val writtenData = logFiles.flatMap { file => readDataManually(file)}
+ assert(writtenData === dataToWrite)
+ }
+
+ test("WriteAheadLogManager - read rotating logs") {
+ // Write data manually for testing reading through manager
+ val writtenData = (1 to 10).map { i =>
+ val data = generateRandomData()
+ val file = testDir + s"/log-$i-$i"
+ writeDataManually(data, file)
+ data
+ }.flatten
+
+ val logDirectoryPath = new Path(testDir)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+ assert(fileSystem.exists(logDirectoryPath) === true)
+
+ // Read data using manager and verify
+ val readData = readDataUsingManager(testDir)
+ assert(readData === writtenData)
+ }
+
+ test("WriteAheadLogManager - recover past logs when creating new manager") {
+ // Write data with manager, recover with new manager and verify
+ val dataToWrite = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite)
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ val readData = readDataUsingManager(testDir)
+ assert(dataToWrite === readData)
+ }
+
+ test("WriteAheadLogManager - cleanup old logs") {
+ // Write data with manager, recover with new manager and verify
+ val manualClock = new ManualClock
+ val dataToWrite = generateRandomData()
+ manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false)
+ val logFiles = getLogFilesInDirectory(testDir)
+ assert(logFiles.size > 1)
+ manager.cleanupOldLogs(manualClock.currentTime() / 2)
+ eventually(timeout(1 second), interval(10 milliseconds)) {
+ assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+ }
+ }
+
+ test("WriteAheadLogManager - handling file errors while reading rotating logs") {
+ // Generate a set of log files
+ val manualClock = new ManualClock
+ val dataToWrite1 = generateRandomData()
+ writeDataUsingManager(testDir, dataToWrite1, manualClock)
+ val logFiles1 = getLogFilesInDirectory(testDir)
+ assert(logFiles1.size > 1)
+
+
+ // Recover old files and generate a second set of log files
+ val dataToWrite2 = generateRandomData()
+ manualClock.addToTime(100000)
+ writeDataUsingManager(testDir, dataToWrite2, manualClock)
+ val logFiles2 = getLogFilesInDirectory(testDir)
+ assert(logFiles2.size > logFiles1.size)
+
+ // Read the files and verify that all the written data can be read
+ val readData1 = readDataUsingManager(testDir)
+ assert(readData1 === (dataToWrite1 ++ dataToWrite2))
+
+ // Corrupt the first set of files so that they are basically unreadable
+ logFiles1.foreach { f =>
+ val raf = new FileOutputStream(f, true).getChannel()
+ raf.truncate(1)
+ raf.close()
+ }
+
+ // Verify that the corrupted files do not prevent reading of the second set of data
+ val readData = readDataUsingManager(testDir)
+ assert(readData === dataToWrite2)
+ }
+}
+
+object WriteAheadLogSuite {
+
+ private val hadoopConf = new Configuration()
+
+ /** Write data to a file directly and return an array of the file segments written. */
+ def writeDataManually(data: Seq[String], file: String): Seq[WriteAheadLogFileSegment] = {
+ val segments = new ArrayBuffer[WriteAheadLogFileSegment]()
+ val writer = HdfsUtils.getOutputStream(file, hadoopConf)
+ data.foreach { item =>
+ val offset = writer.getPos
+ val bytes = Utils.serialize(item)
+ writer.writeInt(bytes.size)
+ writer.write(bytes)
+ segments += WriteAheadLogFileSegment(file, offset, bytes.size)
+ }
+ writer.close()
+ segments
+ }
+
+ /**
+ * Write data to a file using the writer class and return an array of the file segments written.
+ */
+ def writeDataUsingWriter(filePath: String, data: Seq[String]): Seq[WriteAheadLogFileSegment] = {
+ val writer = new WriteAheadLogWriter(filePath, hadoopConf)
+ val segments = data.map {
+ item => writer.write(item)
+ }
+ writer.close()
+ segments
+ }
+
+ /** Write data to rotating files in log directory using the manager class. */
+ def writeDataUsingManager(
+ logDirectory: String,
+ data: Seq[String],
+ manualClock: ManualClock = new ManualClock,
+ stopManager: Boolean = true
+ ): WriteAheadLogManager = {
+ if (manualClock.currentTime < 100000) manualClock.setTime(10000)
+ val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+ rollingIntervalSecs = 1, callerName = "WriteAheadLogSuite", clock = manualClock)
+ // Ensure that 500 does not get sorted after 2000, so put a high base value.
+ data.foreach { item =>
+ manualClock.addToTime(500)
+ manager.writeToLog(item)
+ }
+ if (stopManager) manager.stop()
+ manager
+ }
+
+ /** Read data from a segments of a log file directly and return the list of byte buffers.*/
+ def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = {
+ val reader = HdfsUtils.getInputStream(file, hadoopConf)
+ segments.map { x =>
+ reader.seek(x.offset)
+ val data = new Array[Byte](x.length)
+ reader.readInt()
+ reader.readFully(data)
+ Utils.deserialize[String](data)
+ }
+ }
+
+ /** Read all the data from a log file directly and return the list of byte buffers. */
+ def readDataManually(file: String): Seq[String] = {
+ val reader = HdfsUtils.getInputStream(file, hadoopConf)
+ val buffer = new ArrayBuffer[String]
+ try {
+ while (true) {
+ // Read till EOF is thrown
+ val length = reader.readInt()
+ val bytes = new Array[Byte](length)
+ reader.read(bytes)
+ buffer += Utils.deserialize[String](bytes)
+ }
+ } catch {
+ case ex: EOFException =>
+ } finally {
+ reader.close()
+ }
+ buffer
+ }
+
+ /** Read all the data from a log file using reader class and return the list of byte buffers. */
+ def readDataUsingReader(file: String): Seq[String] = {
+ val reader = new WriteAheadLogReader(file, hadoopConf)
+ val readData = reader.toList.map(byteBufferToString)
+ reader.close()
+ readData
+ }
+
+ /** Read all the data in the log file in a directory using the manager class. */
+ def readDataUsingManager(logDirectory: String): Seq[String] = {
+ val manager = new WriteAheadLogManager(logDirectory, hadoopConf,
+ callerName = "WriteAheadLogSuite")
+ val data = manager.readFromLog().map(byteBufferToString).toSeq
+ manager.stop()
+ data
+ }
+
+ /** Get the log files in a direction */
+ def getLogFilesInDirectory(directory: String): Seq[String] = {
+ val logDirectoryPath = new Path(directory)
+ val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
+
+ if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ fileSystem.listStatus(logDirectoryPath).map {
+ _.getPath.toString.stripPrefix("file:")
+ }.sorted
+ } else {
+ Seq.empty
+ }
+ }
+
+ def generateRandomData(): Seq[String] = {
+ (1 to 100).map { _.toString }
+ }
+
+ implicit def stringToByteBuffer(str: String): ByteBuffer = {
+ ByteBuffer.wrap(Utils.serialize(str))
+ }
+
+ implicit def byteBufferToString(byteBuffer: ByteBuffer): String = {
+ Utils.deserialize[String](byteBuffer.array)
+ }
+}