diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-03-23 00:00:35 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-03-23 00:00:35 -0700 |
commit | abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea (patch) | |
tree | d127515428920b70aeb962ba8ea0b1273a77c58f /sql/core/src | |
parent | 926a93e54b83f1ee596096f3301fef015705b627 (diff) | |
download | spark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.tar.gz spark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.tar.bz2 spark-abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea.zip |
[HOTFIX][SQL] Don't stop ContinuousQuery in quietly
## What changes were proposed in this pull request?
Try to fix a flaky hang
## How was this patch tested?
Existing Jenkins test
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11909 from zsxwing/hotfix2.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala | 13 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala | 24 |
2 files changed, 12 insertions, 25 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 62dc492d60..2dd6416853 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -65,19 +65,6 @@ import org.apache.spark.util.Utils */ trait StreamTest extends QueryTest with Timeouts { - implicit class RichContinuousQuery(cq: ContinuousQuery) { - def stopQuietly(): Unit = quietly { - try { - failAfter(10.seconds) { - cq.stop() - } - } catch { - case e: TestFailedDueToTimeoutException => - logError(e.getMessage(), e) - } - } - } - implicit class RichSource(s: Source) { def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index e485aa837b..c1bab9b577 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath after { - sqlContext.streams.active.foreach(_.stopQuietly()) + sqlContext.streams.active.foreach(_.stop()) } test("resolve default source") { @@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() } test("resolve full class") { @@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() } test("options") { @@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .options(map) .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt2") == "2") @@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Nil) df.write @@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("a") .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Seq("a")) withSQLConf("spark.sql.caseSensitive" -> "false") { @@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("A") .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Seq("a")) } @@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("b") .startStream() - .stopQuietly() + .stop() } } @@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stopQuietly() + .stop() assert(LastOptions.parameters("path") == "/test") } @@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("doubleOpt", 6.7) .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stopQuietly() + .stop() assert(LastOptions.parameters("intOpt") == "56") assert(LastOptions.parameters("boolOpt") == "false") @@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B } // Should be able to start query with that name after stopping the previous query - q1.stopQuietly() + q1.stop() val q5 = startQueryWithName("name") assert(activeStreamNames.contains("name")) - sqlContext.streams.active.foreach(_.stopQuietly()) + sqlContext.streams.active.foreach(_.stop()) } } |