aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 54ce98d195..3d69c8a187 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest}
-import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation}
+import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
@@ -185,8 +185,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 1 seconds,
- testBehaviorFor = 2 seconds)
+ awaitTimeout = 4 seconds,
+ testBehaviorFor = 6 seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should throw the exception
@@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
@volatile var query: StreamExecution = null
try {
val df = ds.toDF
- val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
+ val metadataRoot =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
query = sqlContext
.streams
.startQuery(
@@ -293,8 +294,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
if (withError) {
logDebug(s"Terminating query ${queryToStop.name} with error")
queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect {
- case StreamingRelation(memoryStream, _) =>
- memoryStream.asInstanceOf[MemoryStream[Int]].addData(0)
+ case StreamingExecutionRelation(source, _) =>
+ source.asInstanceOf[MemoryStream[Int]].addData(0)
}
} else {
logDebug(s"Stopping query ${queryToStop.name}")