aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-07-25 16:09:22 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-07-25 16:09:22 -0700
commitc979c8bba02bc89cb9ad81b212f085a8a5490a07 (patch)
treef2e19b86c36d5be38cfebefecaa1c94de0e924c5 /sql/core/src/test
parent12f490b5c85cdee26d47eb70ad1a1edd00504f21 (diff)
downloadspark-c979c8bba02bc89cb9ad81b212f085a8a5490a07.tar.gz
spark-c979c8bba02bc89cb9ad81b212f085a8a5490a07.tar.bz2
spark-c979c8bba02bc89cb9ad81b212f085a8a5490a07.zip
[SPARK-14131][STREAMING] SQL Improved fix for avoiding potential deadlocks in HDFSMetadataLog
## What changes were proposed in this pull request? Current fix for deadlock disables interrupts in the StreamExecution which getting offsets for all sources, and when writing to any metadata log, to avoid potential deadlocks in HDFSMetadataLog(see JIRA for more details). However, disabling interrupts can have unintended consequences in other sources. So I am making the fix more narrow, by disabling interrupt it only in the HDFSMetadataLog. This is a narrower fix for something risky like disabling interrupt. ## How was this patch tested? Existing tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14292 from tdas/SPARK-14131.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala43
3 files changed, 50 insertions, 7 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index a7b2cfe7d0..39fd1f0cd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -190,7 +190,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("compact") {
+ testWithUninterruptibleThread("compact") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
@@ -210,7 +210,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("delete expired file") {
+ testWithUninterruptibleThread("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically
withSQLConf(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index ef2b479a56..ab5a2d253b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.sql.execution.streaming.FakeFileSystem._
import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.{FileContextManager, FileManager, FileSystemManager}
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.UninterruptibleThread
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -56,7 +57,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("HDFSMetadataLog: basic") {
+ testWithUninterruptibleThread("HDFSMetadataLog: basic") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog = new HDFSMetadataLog[String](spark, dir.getAbsolutePath)
@@ -81,7 +82,8 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
+ testWithUninterruptibleThread(
+ "HDFSMetadataLog: fallback from FileContext to FileSystem", quietly = true) {
spark.conf.set(
s"fs.$scheme.impl",
classOf[FakeFileSystem].getName)
@@ -101,7 +103,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
}
- test("HDFSMetadataLog: restart") {
+ testWithUninterruptibleThread("HDFSMetadataLog: restart") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
assert(metadataLog.add(0, "batch0"))
@@ -124,7 +126,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
val waiter = new Waiter
val maxBatchId = 100
for (id <- 0 until 10) {
- new Thread() {
+ new UninterruptibleThread(s"HDFSMetadataLog: metadata directory collision - thread $id") {
override def run(): Unit = waiter {
val metadataLog =
new HDFSMetadataLog[String](spark, temp.getAbsolutePath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5286ee5bc2..d4d8e3e4e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,6 +22,7 @@ import java.util.UUID
import scala.language.implicitConversions
import scala.util.Try
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterAll
@@ -34,7 +35,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.FilterExec
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{UninterruptibleThread, Utils}
/**
* Helper trait that should be extended by all SQL test suites.
@@ -247,6 +248,46 @@ private[sql] trait SQLTestUtils
}
}
}
+
+ /** Run a test on a separate [[UninterruptibleThread]]. */
+ protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false)
+ (body: => Unit): Unit = {
+ val timeoutMillis = 10000
+ @transient var ex: Throwable = null
+
+ def runOnThread(): Unit = {
+ val thread = new UninterruptibleThread(s"Testing thread for test $name") {
+ override def run(): Unit = {
+ try {
+ body
+ } catch {
+ case NonFatal(e) =>
+ ex = e
+ }
+ }
+ }
+ thread.setDaemon(true)
+ thread.start()
+ thread.join(timeoutMillis)
+ if (thread.isAlive) {
+ thread.interrupt()
+ // If this interrupt does not work, then this thread is most likely running something that
+ // is not interruptible. There is not much point to wait for the thread to termniate, and
+ // we rather let the JVM terminate the thread on exit.
+ fail(
+ s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" +
+ s" $timeoutMillis ms")
+ } else if (ex != null) {
+ throw ex
+ }
+ }
+
+ if (quietly) {
+ testQuietly(name) { runOnThread() }
+ } else {
+ test(name) { runOnThread() }
+ }
+ }
}
private[sql] object SQLTestUtils {