aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/streaming.py3
-rw-r--r--python/pyspark/sql/utils.py2
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java (renamed from sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java)3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala3
-rw-r--r--sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java (renamed from sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java)2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala)3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala)36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala)9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala)2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala)18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala)9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala)8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala4
42 files changed, 121 insertions, 74 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 8238b8e7cd..cd75622ced 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -201,7 +201,8 @@ class ProcessingTime(Trigger):
self.interval = interval
def _to_java_trigger(self, sqlContext):
- return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval)
+ return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
+ self.interval)
def _test():
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 8c8768f50b..9ddaf78acf 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -71,7 +71,7 @@ def capture_sql_exception(f):
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
raise ParseException(s.split(': ', 1)[1], stackTrace)
- if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
+ if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
index 1936d53e5e..41e2582921 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.sql;
+package org.apache.spark.sql.streaming;
import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.InternalOutputModes;
/**
* :: Experimental ::
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
index 8ef5d9a653..153f9f57fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import org.apache.spark.sql.streaming.OutputMode
+
/**
* Internal helper class to generate objects representing various [[OutputMode]]s,
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f4c0347609..8373fa336d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode}
+import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.streaming.OutputMode
/**
* Analyzes the presence of unsupported operations in a logical plan.
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
index 1764f3348d..e0a54fe30a 100644
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql;
+package org.apache.spark.sql.streaming;
import org.junit.Test;
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index c2e3d47450..378cca3644 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, OutputMode}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.IntegerType
/** A dummy command for testing unsupported operations. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 291b8250c9..25678e938d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Utils
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7be49b1749..3a6ec4595e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
+import org.apache.spark.sql.streaming.ContinuousQuery
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0dc70c0b1c..2e14c5d486 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -30,13 +30,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming.ContinuousQueryManager
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -645,7 +643,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)
/**
* Returns a [[ContinuousQueryManager]] that allows managing all the
- * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this` context.
+ * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context.
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index dc4b72a6fb..52bedf9dbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
@@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.util.Utils
@@ -182,7 +183,7 @@ class SparkSession private(
/**
* :: Experimental ::
* Returns a [[ContinuousQueryManager]] that allows managing all the
- * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`.
+ * [[ContinuousQuery ContinuousQueries]] active on `this`.
*
* @group basic
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index e40525287a..7e3e45e56e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ContinuousQuery
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SparkPlanner =>
@@ -201,7 +202,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
/**
* Used to plan aggregation queries that are computed incrementally as part of a
- * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner
+ * [[ContinuousQuery]]. Currently this rule is injected into the planner
* on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]]
*/
object StatefulAggregationStrategy extends Strategy {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 814880b0e0..93f1ad01bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
index b1d24b6cfc..2a1be09693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
-import org.apache.spark.sql.util.ContinuousQueryListener
-import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.sql.streaming.ContinuousQueryListener
import org.apache.spark.util.ListenerBus
/**
@@ -30,7 +29,10 @@ import org.apache.spark.util.ListenerBus
* dispatch them to ContinuousQueryListener.
*/
class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
- extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+ extends SparkListener
+ with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+
+ import ContinuousQueryListener._
sparkListenerBus.addListener(this)
@@ -74,7 +76,8 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
* listener bus.
*/
private case class WrappedContinuousQueryListenerEvent(
- streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent {
+ streamingListenerEvent: ContinuousQueryListener.Event)
+ extends SparkListenerEvent {
// Do not log streaming events in event log as history server does not support these events.
protected[spark] override def logEvent: Boolean = false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 5c86049851..bc0e443ca7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -17,10 +17,11 @@
package org.apache.spark.sql.execution.streaming
-import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession}
+import org.apache.spark.sql.{InternalOutputModes, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode}
+import org.apache.spark.sql.streaming.OutputMode
/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ab0900d7f6..16d38a2f7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.ContinuousQueryListener
-import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
/**
@@ -54,6 +53,8 @@ class StreamExecution(
val outputMode: OutputMode)
extends ContinuousQuery with Logging {
+ import org.apache.spark.sql.streaming.ContinuousQueryListener._
+
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index 569907b369..ac510df209 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.ProcessingTime
+import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, SystemClock}
trait TriggerExecutor {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 391f1e54b7..2ec2a3c3c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,8 +18,9 @@
package org.apache.spark.sql.execution.streaming
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
// Number of rows to display, by default 20 rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e4a95e7335..4496f41615 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
object MemoryStream {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 4c7bbf04bc..b2db377ec7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager}
import org.apache.spark.sql.util.ExecutionListenerManager
@@ -142,7 +143,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager
/**
- * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
+ * Interface to start and stop [[ContinuousQuery]]s.
*/
lazy val continuousQueryManager: ContinuousQueryManager = {
new ContinuousQueryManager(sparkSession)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 3d4edbb93d..d2077a07f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
index 4d5afe2eb5..451cfd85e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.SparkSession
/**
* :: Experimental ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
index fec38629d9..5196c5a537 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
index ba1facf11b..6bdd513288 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/util/ContinuousQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
@@ -15,20 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.sql.util
+package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.ContinuousQuery
-import org.apache.spark.sql.util.ContinuousQueryListener._
/**
* :: Experimental ::
* Interface for listening to events related to [[ContinuousQuery ContinuousQueries]].
* @note The methods are not thread-safe as they may be called from different threads.
+ *
+ * @since 2.0.0
*/
@Experimental
abstract class ContinuousQueryListener {
+ import ContinuousQueryListener._
+
/**
* Called when a query is started.
* @note This is called synchronously with
@@ -36,6 +38,7 @@ abstract class ContinuousQueryListener {
* that is, `onQueryStart` will be called on all listeners before
* `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
* don't block this method as it will block your query.
+ * @since 2.0.0
*/
def onQueryStarted(queryStarted: QueryStarted): Unit
@@ -46,10 +49,14 @@ abstract class ContinuousQueryListener {
* latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]]
* may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
* is terminated when you are processing [[QueryProgress]].
+ * @since 2.0.0
*/
def onQueryProgress(queryProgress: QueryProgress): Unit
- /** Called when a query is stopped, with or without error */
+ /**
+ * Called when a query is stopped, with or without error.
+ * @since 2.0.0
+ */
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
}
@@ -57,19 +64,32 @@ abstract class ContinuousQueryListener {
/**
* :: Experimental ::
* Companion object of [[ContinuousQueryListener]] that defines the listener events.
+ * @since 2.0.0
*/
@Experimental
object ContinuousQueryListener {
- /** Base type of [[ContinuousQueryListener]] events */
+ /**
+ * Base type of [[ContinuousQueryListener]] events.
+ * @since 2.0.0
+ */
trait Event
- /** Event representing the start of a query */
+ /**
+ * Event representing the start of a query.
+ * @since 2.0.0
+ */
class QueryStarted private[sql](val query: ContinuousQuery) extends Event
- /** Event representing any progress updates in a query */
+ /**
+ * Event representing any progress updates in a query.
+ * @since 2.0.0
+ */
class QueryProgress private[sql](val query: ContinuousQuery) extends Event
- /** Event representing that termination of a query */
+ /**
+ * Event representing that termination of a query.
+ * @since 2.0.0
+ */
class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
index c686400150..1bfdd2da4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import scala.collection.mutable
import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.util.{Clock, SystemClock}
/**
* :: Experimental ::
- * A class to manage all the [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active
- * on a [[SparkSession]].
+ * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]].
*
* @since 2.0.0
*/
@@ -147,7 +146,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
/**
* Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of
- * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]].
+ * [[ContinuousQuery]].
*
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index 5a9852809c..79ddf01042 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Sink}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index 2479e67e36..8fccd5b7a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.execution.streaming.{Offset, Source}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
index 256e8a47a4..d3fdbac576 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import java.util.concurrent.TimeUnit
@@ -29,9 +29,11 @@ import org.apache.spark.unsafe.types.CalendarInterval
/**
* :: Experimental ::
* Used to indicate how often results should be produced by a [[ContinuousQuery]].
+ *
+ * @since 2.0.0
*/
@Experimental
-sealed trait Trigger {}
+sealed trait Trigger
/**
* :: Experimental ::
@@ -53,6 +55,8 @@ sealed trait Trigger {}
* import java.util.concurrent.TimeUnit
* df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
+ *
+ * @since 2.0.0
*/
@Experimental
case class ProcessingTime(intervalMs: Long) extends Trigger {
@@ -62,6 +66,8 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
/**
* :: Experimental ::
* Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s.
+ *
+ * @since 2.0.0
*/
@Experimental
object ProcessingTime {
@@ -73,6 +79,8 @@ object ProcessingTime {
* {{{
* df.write.trigger(ProcessingTime("10 seconds"))
* }}}
+ *
+ * @since 2.0.0
*/
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
@@ -101,6 +109,8 @@ object ProcessingTime {
* import scala.concurrent.duration._
* df.write.trigger(ProcessingTime(10.seconds))
* }}}
+ *
+ * @since 2.0.0
*/
def apply(interval: Duration): ProcessingTime = {
new ProcessingTime(interval.toMillis)
@@ -113,6 +123,8 @@ object ProcessingTime {
* {{{
* df.write.trigger(ProcessingTime.create("10 seconds"))
* }}}
+ *
+ * @since 2.0.0
*/
def create(interval: String): ProcessingTime = {
apply(interval)
@@ -126,6 +138,8 @@ object ProcessingTime {
* import java.util.concurrent.TimeUnit
* df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
+ *
+ * @since 2.0.0
*/
def create(interval: Long, unit: TimeUnit): ProcessingTime = {
new ProcessingTime(unit.toMillis(interval))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
index 0d18a645f6..52c200796c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.ProcessingTime
class ProcessingTimeSuite extends SparkFunSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 7f99d303ba..00d5e051de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.ProcessingTime
+import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, ManualClock, SystemClock}
class ProcessingTimeExecutorSuite extends SparkFunSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index 8788898fc8..cdd97da8ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.util
+package org.apache.spark.sql.streaming
import java.util.concurrent.ConcurrentLinkedQueue
@@ -26,14 +26,13 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
-import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.util.ContinuousQueryListener.{QueryProgress, QueryStarted, QueryTerminated}
-class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+
+class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
+ import ContinuousQueryListener._
after {
spark.streams.active.foreach(_.stop())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index b75c3ea106..c1e4970b3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -28,12 +28,11 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
-import org.apache.spark.sql.{ContinuousQuery, Dataset, OutputMode, StreamTest}
+import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
-class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
import AwaitTerminationTester._
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index f469cde6be..e4ca86d9d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.streaming
import org.apache.spark.SparkException
-import org.apache.spark.sql.StreamTest
import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution}
-import org.apache.spark.sql.test.SharedSQLContext
-class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
+
+class ContinuousQuerySuite extends StreamTest {
import AwaitTerminationTester._
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 3d8dcaf5a5..1c73208736 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
-class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
+class FileStreamSinkSuite extends StreamTest {
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 1d784f1f4e..f681b8878d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -137,7 +137,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
val valueSchema = new StructType().add("value", StringType)
}
-class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
+class FileStreamSourceSuite extends FileStreamSourceTest {
import testImplicits._
@@ -594,7 +594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
}
-class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
+class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 4efb7cf52d..1c0fb34dd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -23,9 +23,7 @@ import java.util.UUID
import scala.util.Random
import scala.util.control.NonFatal
-import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
/**
@@ -38,7 +36,7 @@ import org.apache.spark.util.Utils
*
* At the end, the resulting files are loaded and the answer is checked.
*/
-class FileStressSuite extends StreamTest with SharedSQLContext {
+class FileStressSuite extends StreamTest {
import testImplicits._
testQuietly("fault tolerance stress test - unpartitioned output") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index e5bd0b4744..df76499fa2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
-class MemorySinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class MemorySinkSuite extends StreamTest with BeforeAndAfter {
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
index 81760d2aa8..7f2972edea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
@@ -17,11 +17,9 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql.StreamTest
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.test.SharedSQLContext
-class MemorySourceStressSuite extends StreamTest with SharedSQLContext {
+class MemorySourceStressSuite extends StreamTest {
import testImplicits._
test("memory stress test") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index c17cb1de6c..9414b1ce40 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock
-class StreamSuite extends StreamTest with SharedSQLContext {
+class StreamSuite extends StreamTest {
import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index b033725f18..dd8672aa64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.streaming
import java.lang.Thread.UncaughtExceptionHandler
@@ -33,10 +33,12 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
+import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
/**
@@ -63,7 +65,7 @@ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
* avoid hanging forever in the case of failures. However, individual suites can change this
* by overriding `streamingTimeout`.
*/
-trait StreamTest extends QueryTest with Timeouts {
+trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** How long to wait for an active stream to catch up when checking a result. */
val streamingTimeout = 10.seconds
@@ -523,7 +525,7 @@ trait StreamTest extends QueryTest with Timeouts {
case class ExpectException[E <: Exception]()(implicit val t: ClassTag[E])
extends ExpectedBehavior
- private val DEFAULT_TEST_TIMEOUT = 1 second
+ private val DEFAULT_TEST_TIMEOUT = 1.second
def test(
expectedBehavior: ExpectedBehavior,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 322bbb9ea0..1f174aee8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -20,19 +20,18 @@ package org.apache.spark.sql.streaming
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, StreamTest}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.InternalOutputModes._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.test.SharedSQLContext
object FailureSinglton {
var firstTime = true
}
-class StreamingAggregationSuite extends StreamTest with SharedSQLContext with BeforeAndAfterAll {
+class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
override def afterAll(): Unit = {
super.afterAll()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 38a0534ab6..a2aac69064 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, StreamTest}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
@@ -101,7 +101,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}
}
-class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
+class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath