diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-03-25 20:07:54 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-03-25 20:07:54 -0700 |
commit | 13945dd83bfa47ebd05181bda5a7c3e412feb5c0 (patch) | |
tree | 2503daf88389f98d6f04057f0ff69105bc27c32b /sql/core/src/test/scala/org/apache | |
parent | 24587ce433aa30f30a5d1ed6566365f24c222a27 (diff) | |
download | spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.gz spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.bz2 spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.zip |
[SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API
## What changes were proposed in this pull request?
HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, FileContext implementations may not exist for many scheme for which there may be FileSystem implementations. In those cases, rather than failing completely, we should fallback to the FileSystem based implementation, and log warning that there may be file consistency issues in case the log directory is concurrently modified.
In addition I have also added more tests to increase the code coverage.
## How was this patch tested?
Unit test.
Tested on cluster with custom file system.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #11925 from tdas/SPARK-14109.
Diffstat (limited to 'sql/core/src/test/scala/org/apache')
3 files changed, 127 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 9ed5686d97..d5db9db36b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -17,21 +17,48 @@ package org.apache.spark.sql.execution.streaming +import java.io.{File, FileNotFoundException, IOException} +import java.net.URI import java.util.ConcurrentModificationException +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ import org.scalatest.concurrent.AsyncAssertions._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.sql.execution.streaming.FakeFileSystem._ +import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager} import org.apache.spark.sql.test.SharedSQLContext class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { + /** To avoid caching of FS objects */ + override protected val sparkConf = + new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true") + private implicit def toOption[A](a: A): Option[A] = Option(a) - test("basic") { + test("FileManager: FileContextManager") { withTempDir { temp => - val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) + val path = new Path(temp.getAbsolutePath) + testManager(path, new FileContextManager(path, new Configuration)) + } + } + + test("FileManager: FileSystemManager") { + withTempDir { temp => + val path = new Path(temp.getAbsolutePath) + testManager(path, new FileSystemManager(path, new Configuration)) + } + } + + test("HDFSMetadataLog: basic") { + withTempDir { temp => + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = new HDFSMetadataLog[String](sqlContext, dir.getAbsolutePath) assert(metadataLog.add(0, "batch0")) assert(metadataLog.getLatest() === Some(0 -> "batch0")) assert(metadataLog.get(0) === Some("batch0")) @@ -53,7 +80,27 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("restart") { + testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") { + sqlContext.sparkContext.hadoopConfiguration.set( + s"fs.$scheme.impl", + classOf[FakeFileSystem].getName) + withTempDir { temp => + val metadataLog = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp") + assert(metadataLog.add(0, "batch0")) + assert(metadataLog.getLatest() === Some(0 -> "batch0")) + assert(metadataLog.get(0) === Some("batch0")) + assert(metadataLog.get(None, 0) === Array(0 -> "batch0")) + + + val metadataLog2 = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp") + assert(metadataLog2.get(0) === Some("batch0")) + assert(metadataLog2.getLatest() === Some(0 -> "batch0")) + assert(metadataLog2.get(None, 0) === Array(0 -> "batch0")) + + } + } + + test("HDFSMetadataLog: restart") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -71,7 +118,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } } - test("metadata directory collision") { + test("HDFSMetadataLog: metadata directory collision") { withTempDir { temp => val waiter = new Waiter val maxBatchId = 100 @@ -102,4 +149,69 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString))) } } + + + def testManager(basePath: Path, fm: FileManager): Unit = { + // Mkdirs + val dir = new Path(s"$basePath/dir/subdir/subsubdir") + assert(!fm.exists(dir)) + fm.mkdirs(dir) + assert(fm.exists(dir)) + fm.mkdirs(dir) + + // List + val acceptAllFilter = new PathFilter { + override def accept(path: Path): Boolean = true + } + val rejectAllFilter = new PathFilter { + override def accept(path: Path): Boolean = false + } + assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir")) + assert(fm.list(basePath, rejectAllFilter).length === 0) + + // Create + val path = new Path(s"$dir/file") + assert(!fm.exists(path)) + fm.create(path).close() + assert(fm.exists(path)) + intercept[IOException] { + fm.create(path) + } + + // Open and delete + fm.open(path) + fm.delete(path) + assert(!fm.exists(path)) + intercept[IOException] { + fm.open(path) + } + fm.delete(path) // should not throw exception + + // Rename + val path1 = new Path(s"$dir/file1") + val path2 = new Path(s"$dir/file2") + fm.create(path1).close() + assert(fm.exists(path1)) + fm.rename(path1, path2) + intercept[FileNotFoundException] { + fm.rename(path1, path2) + } + val path3 = new Path(s"$dir/file3") + fm.create(path3).close() + assert(fm.exists(path3)) + intercept[FileAlreadyExistsException] { + fm.rename(path2, path3) + } + } +} + +/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ +class FakeFileSystem extends RawLocalFileSystem { + override def getUri: URI = { + URI.create(s"$scheme:///") + } +} + +object FakeFileSystem { + val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index c341191c70..914c6a5509 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.test +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext @@ -25,6 +26,8 @@ import org.apache.spark.sql.SQLContext */ trait SharedSQLContext extends SQLTestUtils { + protected val sparkConf = new SparkConf() + /** * The [[TestSQLContext]] to use for all tests in this suite. * @@ -44,7 +47,7 @@ trait SharedSQLContext extends SQLTestUtils { protected override def beforeAll(): Unit = { SQLContext.clearSqlListener() if (_ctx == null) { - _ctx = new TestSQLContext + _ctx = new TestSQLContext(sparkConf) } // Ensure we have initialized the context before calling parent code super.beforeAll() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala index b3e146fba8..7ab79b12ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -26,9 +26,13 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} */ private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { self => - def this() { + def this(sparkConf: SparkConf) { this(new SparkContext("local[2]", "test-sql-context", - new SparkConf().set("spark.sql.testkey", "true"))) + sparkConf.set("spark.sql.testkey", "true"))) + } + + def this() { + this(new SparkConf) } @transient |