aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-03-25 20:07:54 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-03-25 20:07:54 -0700
commit13945dd83bfa47ebd05181bda5a7c3e412feb5c0 (patch)
tree2503daf88389f98d6f04057f0ff69105bc27c32b /sql/core/src/test/scala/org/apache
parent24587ce433aa30f30a5d1ed6566365f24c222a27 (diff)
downloadspark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.gz
spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.tar.bz2
spark-13945dd83bfa47ebd05181bda5a7c3e412feb5c0.zip
[SPARK-14109][SQL] Fix HDFSMetadataLog to fallback from FileContext to FileSystem API
## What changes were proposed in this pull request? HDFSMetadataLog uses newer FileContext API to achieve atomic renaming. However, FileContext implementations may not exist for many scheme for which there may be FileSystem implementations. In those cases, rather than failing completely, we should fallback to the FileSystem based implementation, and log warning that there may be file consistency issues in case the log directory is concurrently modified. In addition I have also added more tests to increase the code coverage. ## How was this patch tested? Unit test. Tested on cluster with custom file system. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #11925 from tdas/SPARK-14109.
Diffstat (limited to 'sql/core/src/test/scala/org/apache')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala122
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala8
3 files changed, 127 insertions, 8 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9ed5686d97..d5db9db36b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -17,21 +17,48 @@
package org.apache.spark.sql.execution.streaming
+import java.io.{File, FileNotFoundException, IOException}
+import java.net.URI
import java.util.ConcurrentModificationException
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
import org.scalatest.concurrent.AsyncAssertions._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.execution.streaming.FakeFileSystem._
+import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+ /** To avoid caching of FS objects */
+ override protected val sparkConf =
+ new SparkConf().set(s"spark.hadoop.fs.$scheme.impl.disable.cache", "true")
+
private implicit def toOption[A](a: A): Option[A] = Option(a)
- test("basic") {
+ test("FileManager: FileContextManager") {
withTempDir { temp =>
- val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
+ val path = new Path(temp.getAbsolutePath)
+ testManager(path, new FileContextManager(path, new Configuration))
+ }
+ }
+
+ test("FileManager: FileSystemManager") {
+ withTempDir { temp =>
+ val path = new Path(temp.getAbsolutePath)
+ testManager(path, new FileSystemManager(path, new Configuration))
+ }
+ }
+
+ test("HDFSMetadataLog: basic") {
+ withTempDir { temp =>
+ val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, dir.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
assert(metadataLog.getLatest() === Some(0 -> "batch0"))
assert(metadataLog.get(0) === Some("batch0"))
@@ -53,7 +80,27 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("restart") {
+ testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+ sqlContext.sparkContext.hadoopConfiguration.set(
+ s"fs.$scheme.impl",
+ classOf[FakeFileSystem].getName)
+ withTempDir { temp =>
+ val metadataLog = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+ assert(metadataLog.add(0, "batch0"))
+ assert(metadataLog.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog.get(0) === Some("batch0"))
+ assert(metadataLog.get(None, 0) === Array(0 -> "batch0"))
+
+
+ val metadataLog2 = new HDFSMetadataLog[String](sqlContext, s"$scheme://$temp")
+ assert(metadataLog2.get(0) === Some("batch0"))
+ assert(metadataLog2.getLatest() === Some(0 -> "batch0"))
+ assert(metadataLog2.get(None, 0) === Array(0 -> "batch0"))
+
+ }
+ }
+
+ test("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
@@ -71,7 +118,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("metadata directory collision") {
+ test("HDFSMetadataLog: metadata directory collision") {
withTempDir { temp =>
val waiter = new Waiter
val maxBatchId = 100
@@ -102,4 +149,69 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(None, maxBatchId) === (0 to maxBatchId).map(i => (i, i.toString)))
}
}
+
+
+ def testManager(basePath: Path, fm: FileManager): Unit = {
+ // Mkdirs
+ val dir = new Path(s"$basePath/dir/subdir/subsubdir")
+ assert(!fm.exists(dir))
+ fm.mkdirs(dir)
+ assert(fm.exists(dir))
+ fm.mkdirs(dir)
+
+ // List
+ val acceptAllFilter = new PathFilter {
+ override def accept(path: Path): Boolean = true
+ }
+ val rejectAllFilter = new PathFilter {
+ override def accept(path: Path): Boolean = false
+ }
+ assert(fm.list(basePath, acceptAllFilter).exists(_.getPath.getName == "dir"))
+ assert(fm.list(basePath, rejectAllFilter).length === 0)
+
+ // Create
+ val path = new Path(s"$dir/file")
+ assert(!fm.exists(path))
+ fm.create(path).close()
+ assert(fm.exists(path))
+ intercept[IOException] {
+ fm.create(path)
+ }
+
+ // Open and delete
+ fm.open(path)
+ fm.delete(path)
+ assert(!fm.exists(path))
+ intercept[IOException] {
+ fm.open(path)
+ }
+ fm.delete(path) // should not throw exception
+
+ // Rename
+ val path1 = new Path(s"$dir/file1")
+ val path2 = new Path(s"$dir/file2")
+ fm.create(path1).close()
+ assert(fm.exists(path1))
+ fm.rename(path1, path2)
+ intercept[FileNotFoundException] {
+ fm.rename(path1, path2)
+ }
+ val path3 = new Path(s"$dir/file3")
+ fm.create(path3).close()
+ assert(fm.exists(path3))
+ intercept[FileAlreadyExistsException] {
+ fm.rename(path2, path3)
+ }
+ }
+}
+
+/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */
+class FakeFileSystem extends RawLocalFileSystem {
+ override def getUri: URI = {
+ URI.create(s"$scheme:///")
+ }
+}
+
+object FakeFileSystem {
+ val scheme = s"HDFSMetadataLogSuite${math.abs(Random.nextInt)}"
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index c341191c70..914c6a5509 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.test
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
@@ -25,6 +26,8 @@ import org.apache.spark.sql.SQLContext
*/
trait SharedSQLContext extends SQLTestUtils {
+ protected val sparkConf = new SparkConf()
+
/**
* The [[TestSQLContext]] to use for all tests in this suite.
*
@@ -44,7 +47,7 @@ trait SharedSQLContext extends SQLTestUtils {
protected override def beforeAll(): Unit = {
SQLContext.clearSqlListener()
if (_ctx == null) {
- _ctx = new TestSQLContext
+ _ctx = new TestSQLContext(sparkConf)
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index b3e146fba8..7ab79b12ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -26,9 +26,13 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
*/
private[sql] class TestSQLContext(sc: SparkContext) extends SQLContext(sc) { self =>
- def this() {
+ def this(sparkConf: SparkConf) {
this(new SparkContext("local[2]", "test-sql-context",
- new SparkConf().set("spark.sql.testkey", "true")))
+ sparkConf.set("spark.sql.testkey", "true")))
+ }
+
+ def this() {
+ this(new SparkConf)
}
@transient