aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-23 00:00:35 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-03-23 00:00:35 -0700
commitabacf5f258e9bc5c9218ddbee3909dfe5c08d0ea (patch)
treed127515428920b70aeb962ba8ea0b1273a77c58f
parent926a93e54b83f1ee596096f3301fef015705b627 (diff)
downloadspark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.tar.gz
spark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.tar.bz2
spark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.zip
[HOTFIX][SQL] Don't stop ContinuousQuery in quietly
## What changes were proposed in this pull request? Try to fix a flaky hang ## How was this patch tested? Existing Jenkins test Author: Shixiong Zhu <shixiong@databricks.com> Closes #11909 from zsxwing/hotfix2.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala24
2 files changed, 12 insertions, 25 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 62dc492d60..2dd6416853 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -65,19 +65,6 @@ import org.apache.spark.util.Utils
*/
trait StreamTest extends QueryTest with Timeouts {
- implicit class RichContinuousQuery(cq: ContinuousQuery) {
- def stopQuietly(): Unit = quietly {
- try {
- failAfter(10.seconds) {
- cq.stop()
- }
- } catch {
- case e: TestFailedDueToTimeoutException =>
- logError(e.getMessage(), e)
- }
- }
- }
-
implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index e485aa837b..c1bab9b577 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
after {
- sqlContext.streams.active.foreach(_.stopQuietly())
+ sqlContext.streams.active.foreach(_.stop())
}
test("resolve default source") {
@@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.startStream()
- .stopQuietly()
+ .stop()
}
test("resolve full class") {
@@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.startStream()
- .stopQuietly()
+ .stop()
}
test("options") {
@@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.options(map)
.option("checkpointLocation", newMetadataDir)
.startStream()
- .stopQuietly()
+ .stop()
assert(LastOptions.parameters("opt1") == "1")
assert(LastOptions.parameters("opt2") == "2")
@@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.startStream()
- .stopQuietly()
+ .stop()
assert(LastOptions.partitionColumns == Nil)
df.write
@@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir)
.partitionBy("a")
.startStream()
- .stopQuietly()
+ .stop()
assert(LastOptions.partitionColumns == Seq("a"))
withSQLConf("spark.sql.caseSensitive" -> "false") {
@@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir)
.partitionBy("A")
.startStream()
- .stopQuietly()
+ .stop()
assert(LastOptions.partitionColumns == Seq("a"))
}
@@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("checkpointLocation", newMetadataDir)
.partitionBy("b")
.startStream()
- .stopQuietly()
+ .stop()
}
}
@@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.startStream("/test")
- .stopQuietly()
+ .stop()
assert(LastOptions.parameters("path") == "/test")
}
@@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("doubleOpt", 6.7)
.option("checkpointLocation", newMetadataDir)
.startStream("/test")
- .stopQuietly()
+ .stop()
assert(LastOptions.parameters("intOpt") == "56")
assert(LastOptions.parameters("boolOpt") == "false")
@@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
}
// Should be able to start query with that name after stopping the previous query
- q1.stopQuietly()
+ q1.stop()
val q5 = startQueryWithName("name")
assert(activeStreamNames.contains("name"))
- sqlContext.streams.active.foreach(_.stopQuietly())
+ sqlContext.streams.active.foreach(_.stop())
}
}