diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 54ce98d195..3d69c8a187 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} -import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils @@ -185,8 +185,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) testAwaitAnyTermination( ExpectException[SparkException], - awaitTimeout = 1 seconds, - testBehaviorFor = 2 seconds) + awaitTimeout = 4 seconds, + testBehaviorFor = 6 seconds) require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned // All subsequent calls to awaitAnyTermination should throw the exception @@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath query = sqlContext .streams .startQuery( @@ -293,8 +294,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with if (withError) { logDebug(s"Terminating query ${queryToStop.name} with error") queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect { - case StreamingRelation(memoryStream, _) => - memoryStream.asInstanceOf[MemoryStream[Int]].addData(0) + case StreamingExecutionRelation(source, _) => + source.asInstanceOf[MemoryStream[Int]].addData(0) } } else { logDebug(s"Stopping query ${queryToStop.name}") |