aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-03-25 20:07:54 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-03-25 20:07:54 -0700
commit13945dd83bfa47ebd05181bda5a7c3e412feb5c0 (patch)
tree2503daf88389f98d6f04057f0ff69105bc27c32b
parent24587ce433aa30f30a5d1ed6566365f24c222a27 (diff)
downloadspark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.gz
spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.bz2
spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.zip
[SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API
## What changes were proposed in this pull request? HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, FileContext implementations may not exist for many scheme for which there may be FileSystem implementations. In those cases, rather than failing completely, we should fallback to the FileSystem based implementation, and log warning that there may be file consistency issues in case the log directory is concurrently modified. In addition I have also added more tests to increase the code coverage. ## How was this patch tested? Unit test. Tested on cluster with custom file system. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11925 from tdas/SPARK-14109.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala179
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala122
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala8
4 files changed, 288 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index f27d23b1cd..9663fee18d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.execution.streaming
import java.io.{FileNotFoundException, IOException}
import java.nio.ByteBuffer
-import java.util.{ConcurrentModificationException, EnumSet}
+import java.util.{ConcurrentModificationException, EnumSet, UUID}
import scala.reflect.ClassTag
import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
@@ -32,6 +33,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SQLContext
+
/**
* A [[MetadataLog]] implementation based on HDFS. [[HDFSMetadataLog]] uses the specified `path`
* as the metadata storage.
@@ -47,17 +49,13 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
extends MetadataLog[T]
with Logging {
- private val metadataPath = new Path(path)
+ import HDFSMetadataLog._
- private val fc =
- if (metadataPath.toUri.getScheme == null) {
- FileContext.getFileContext(sqlContext.sparkContext.hadoopConfiguration)
- } else {
- FileContext.getFileContext(metadataPath.toUri, sqlContext.sparkContext.hadoopConfiguration)
- }
+ private val metadataPath = new Path(path)
+ private val fileManager = createFileManager()
- if (!fc.util().exists(metadataPath)) {
- fc.mkdir(metadataPath, FsPermission.getDirDefault, true)
+ if (!fileManager.exists(metadataPath)) {
+ fileManager.mkdirs(metadataPath)
}
/**
@@ -104,10 +102,9 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
// Use nextId to create a temp file
var nextId = 0
while (true) {
- val tempPath = new Path(metadataPath, s".${batchId}_$nextId.tmp")
- fc.deleteOnExit(tempPath)
+ val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
- val output = fc.create(tempPath, EnumSet.of(CreateFlag.CREATE))
+ val output = fileManager.create(tempPath)
try {
output.write(bytes)
} finally {
@@ -117,7 +114,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchFile(batchId)}")
- fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
+ fileManager.rename(tempPath, batchFile(batchId))
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
@@ -147,6 +144,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
nextId += 1
+ } finally {
+ fileManager.delete(tempPath)
}
}
}
@@ -160,8 +159,8 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
override def get(batchId: Long): Option[T] = {
val batchMetadataFile = batchFile(batchId)
- if (fc.util().exists(batchMetadataFile)) {
- val input = fc.open(batchMetadataFile)
+ if (fileManager.exists(batchMetadataFile)) {
+ val input = fileManager.open(batchMetadataFile)
val bytes = IOUtils.toByteArray(input)
Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
} else {
@@ -171,7 +170,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
- val files = fc.util().listStatus(metadataPath, batchFilesFilter)
+ val files = fileManager.list(metadataPath, batchFilesFilter)
val batchIds = files
.map(_.getPath.getName.toLong)
.filter { batchId =>
@@ -184,7 +183,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
override def getLatest(): Option[(Long, T)] = {
- val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+ val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(_.getPath.getName.toLong)
.sorted
.reverse
@@ -196,4 +195,148 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
None
}
+
+ private def createFileManager(): FileManager = {
+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ try {
+ new FileContextManager(metadataPath, hadoopConf)
+ } catch {
+ case e: UnsupportedFileSystemException =>
+ logWarning("Could not use FileContext API for managing metadata log file. The log may be" +
+ "inconsistent under failures.", e)
+ new FileSystemManager(metadataPath, hadoopConf)
+ }
+ }
+}
+
+object HDFSMetadataLog {
+
+ /** A simple trait to abstract out the file management operations needed by HDFSMetadataLog. */
+ trait FileManager {
+
+ /** List the files in a path that matches a filter. */
+ def list(path: Path, filter: PathFilter): Array[FileStatus]
+
+ /** Make directory at the give path and all its parent directories as needed. */
+ def mkdirs(path: Path): Unit
+
+ /** Whether path exists */
+ def exists(path: Path): Boolean
+
+ /** Open a file for reading, or throw exception if it does not exist. */
+ def open(path: Path): FSDataInputStream
+
+ /** Create path, or throw exception if it already exists */
+ def create(path: Path): FSDataOutputStream
+
+ /**
+ * Atomically rename path, or throw exception if it cannot be done.
+ * Should throw FileNotFoundException if srcPath does not exist.
+ * Should throw FileAlreadyExistsException if destPath already exists.
+ */
+ def rename(srcPath: Path, destPath: Path): Unit
+
+ /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
+ def delete(path: Path): Unit
+ }
+
+ /**
+ * Default implementation of FileManager using newer FileContext API.
+ */
+ class FileContextManager(path: Path, hadoopConf: Configuration) extends FileManager {
+ private val fc = if (path.toUri.getScheme == null) {
+ FileContext.getFileContext(hadoopConf)
+ } else {
+ FileContext.getFileContext(path.toUri, hadoopConf)
+ }
+
+ override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+ fc.util.listStatus(path, filter)
+ }
+
+ override def rename(srcPath: Path, destPath: Path): Unit = {
+ fc.rename(srcPath, destPath)
+ }
+
+ override def mkdirs(path: Path): Unit = {
+ fc.mkdir(path, FsPermission.getDirDefault, true)
+ }
+
+ override def open(path: Path): FSDataInputStream = {
+ fc.open(path)
+ }
+
+ override def create(path: Path): FSDataOutputStream = {
+ fc.create(path, EnumSet.of(CreateFlag.CREATE))
+ }
+
+ override def exists(path: Path): Boolean = {
+ fc.util().exists(path)
+ }
+
+ override def delete(path: Path): Unit = {
+ try {
+ fc.delete(path, true)
+ } catch {
+ case e: FileNotFoundException =>
+ // ignore if file has already been deleted
+ }
+ }
+ }
+
+ /**
+ * Implementation of FileManager using older FileSystem API. Note that this implementation
+ * cannot provide atomic renaming of paths, hence can lead to consistency issues. This
+ * should be used only as a backup option, when FileContextManager cannot be used.
+ */
+ class FileSystemManager(path: Path, hadoopConf: Configuration) extends FileManager {
+ private val fs = path.getFileSystem(hadoopConf)
+
+ override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
+ fs.listStatus(path, filter)
+ }
+
+ /**
+ * Rename a path. Note that this implementation is not atomic.
+ * @throws FileNotFoundException if source path does not exist.
+ * @throws FileAlreadyExistsException if destination path already exists.
+ * @throws IOException if renaming fails for some unknown reason.
+ */
+ override def rename(srcPath: Path, destPath: Path): Unit = {
+ if (!fs.exists(srcPath)) {
+ throw new FileNotFoundException(s"Source path does not exist: $srcPath")
+ }
+ if (fs.exists(destPath)) {
+ throw new FileAlreadyExistsException(s"Destination path already exists: $destPath")
+ }
+ if (!fs.rename(srcPath, destPath)) {
+ throw new IOException(s"Failed to rename $srcPath to $destPath")
+ }
+ }
+
+ override def mkdirs(path: Path): Unit = {
+ fs.mkdirs(path, FsPermission.getDirDefault)
+ }
+
+ override def open(path: Path): FSDataInputStream = {
+ fs.open(path)
+ }
+
+ override def create(path: Path): FSDataOutputStream = {
+ fs.create(path, false)
+ }
+
+ override def exists(path: Path): Boolean = {
+ fs.exists(path)
+ }
+
+ override def delete(path: Path): Unit = {
+ try {
+ fs.delete(path, true)
+ } catch {
+ case e: FileNotFoundException =>
+ // ignore if file has already been deleted
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9ed5686d97..d5db9db36b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -17,21 +17,48 @@
package org.apache.spark.sql.execution.streaming
+import java.io.{File, FileNotFoundException, IOException}
+import java.net.URI
import java.util.ConcurrentModificationException
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
import org.scalatest.concurrent.AsyncAssertions._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+ /** To avoid caching of FS objects */
+ override protected val sparkConf =
+ new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
+
private implicit def toOption[A](a: A): Option[A] = Option(a)
- test("basic") {
+ test("FileManager: FileContextManager") {
withTempDir { temp =>
- val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ val path = new Path(temp.getAbsolutePath)
+ testManager(path, new FileContextManager(path, new Configuration))
+ }
+ }
+
+ test("FileManager: FileSystemManager") {
+ withTempDir { temp =>
+ val path = new Path(temp.getAbsolutePath)
+ testManager(path, new FileSystemManager(path, new Configuration))
+ }
+ }
+
+ test("HDFSMetadataLog: basic") {
+ withTempDir { temp =>
+ val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, dir.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
@@ -53,7 +80,27 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("restart") {
+ testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+ sqlContext.sparkContext.hadoopConfiguration.set(
+ s"fs.$scheme.impl",
+ classOf[FakeFileSystem].getName)
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+
+ val metadataLog2 = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+ assert(metadataLog2.get(0) === Some("batch0"))
+ assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog2.get(None, 0) === Array(0 -> "batch0"))
+
+ }
+ }
+
+ test("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
@@ -71,7 +118,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("metadata directory collision") {
+ test("HDFSMetadataLog: metadata directory collision") {
withTempDir { temp =>
val waiter = new Waiter
val maxBatchId = 100
@@ -102,4 +149,69 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
}
}
+
+
+ def testManager(basePath: Path, fm: FileManager): Unit = {
+ // Mkdirs
+ val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+ assert(!fm.exists(dir))
+ fm.mkdirs(dir)
+ assert(fm.exists(dir))
+ fm.mkdirs(dir)
+
+ // List
+ val acceptAllFilter = new PathFilter {
+ override def accept(path: Path): Boolean = true
+ }
+ val rejectAllFilter = new PathFilter {
+ override def accept(path: Path): Boolean = false
+ }
+ assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir"))
+ assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+ // Create
+ val path = new Path(s"$dir/file")
+ assert(!fm.exists(path))
+ fm.create(path).close()
+ assert(fm.exists(path))
+ intercept[IOException] {
+ fm.create(path)
+ }
+
+ // Open and delete
+ fm.open(path)
+ fm.delete(path)
+ assert(!fm.exists(path))
+ intercept[IOException] {
+ fm.open(path)
+ }
+ fm.delete(path) // should not throw exception
+
+ // Rename
+ val path1 = new Path(s"$dir/file1")
+ val path2 = new Path(s"$dir/file2")
+ fm.create(path1).close()
+ assert(fm.exists(path1))
+ fm.rename(path1, path2)
+ intercept[FileNotFoundException] {
+ fm.rename(path1, path2)
+ }
+ val path3 = new Path(s"$dir/file3")
+ fm.create(path3).close()
+ assert(fm.exists(path3))
+ intercept[FileAlreadyExistsException] {
+ fm.rename(path2, path3)
+ }
+ }
+}
+
+/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */
+class FakeFileSystem extends RawLocalFileSystem {
+ override def getUri: URI = {
+ URI.create(s"$scheme:///")
+ }
+}
+
+object FakeFileSystem {
+ val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index c341191c70..914c6a5509 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.test
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
@@ -25,6 +26,8 @@ import org.apache.spark.sql.SQLContext
*/
trait SharedSQLContext extends SQLTestUtils {
+ protected val sparkConf = new SparkConf()
+
/**
* The [[TestSQLContext]] to use for all tests in this suite.
*
@@ -44,7 +47,7 @@ trait SharedSQLContext extends SQLTestUtils {
protected override def beforeAll(): Unit = {
SQLContext.clearSqlListener()
if (_ctx == null) {
- _ctx = new TestSQLContext
+ _ctx = new TestSQLContext(sparkConf)
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index b3e146fba8..7ab79b12ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -26,9 +26,13 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { self =>
- def this() {
+ def this(sparkConf: SparkConf) {
this(new SparkContext("local[2]", "test-sql-context",
- new SparkConf().set("spark.sql.testkey", "true")))
+ sparkConf.set("spark.sql.testkey", "true")))
+ }
+
+ def this() {
+ this(new SparkConf)
}
@transient