aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-22 10:18:42 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-22 10:18:42 -0700
commitcaea15214571d9b12dcf1553e5c1cc8b83a8ba5b (patch)
treebc3e49ee19c98636bd249685011fe1ae3879cebc
parentc632bdc01f51bb253fa3dc258ffa7fdecf814d35 (diff)
downloadspark-caea15214571d9b12dcf1553e5c1cc8b83a8ba5b.tar.gz
spark-caea15214571d9b12dcf1553e5c1cc8b83a8ba5b.tar.bz2
spark-caea15214571d9b12dcf1553e5c1cc8b83a8ba5b.zip
[SPARK-13985][SQL] Deterministic batches with ids
This PR relaxes the requirements of a `Sink` for structured streaming to only require idempotent appending of data. Previously the `Sink` needed to be able to transactionally append data while recording an opaque offset indicated how far in a stream we have processed. In order to do this, a new write-ahead-log has been added to stream execution, which records the offsets that will are present in each batch. The log is created in the newly added `checkpointLocation`, which defaults to `${spark.sql.streaming.checkpointLocation}/${queryName}` but can be overriden by setting `checkpointLocation` in `DataFrameWriter`. In addition to making sinks easier to write the addition of batchIds and a checkpoint location is done in anticipation of integration with the the `StateStore` (#11645). Author: Michael Armbrust <michael@databricks.com> Closes #11804 from marmbrus/batchIds.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala193
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala52
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala85
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala55
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala6
19 files changed, 319 insertions, 241 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index 0a156ea99a..fa8219bbed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -164,13 +164,17 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
}
/** Start a query */
- private[sql] def startQuery(name: String, df: DataFrame, sink: Sink): ContinuousQuery = {
+ private[sql] def startQuery(
+ name: String,
+ checkpointLocation: String,
+ df: DataFrame,
+ sink: Sink): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
- val query = new StreamExecution(sqlContext, name, df.logicalPlan, sink)
+ val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink)
query.start()
activeQueries.put(name, query)
query
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7ed1c51360..c07bd0e7b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -21,6 +21,8 @@ import java.util.Properties
import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -251,8 +253,15 @@ final class DataFrameWriter private[sql](df: DataFrame) {
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
+ val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
+ val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+ new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
+ })
df.sqlContext.sessionState.continuousQueryManager.startQuery(
- extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink())
+ queryName,
+ checkpointLocation,
+ df,
+ dataSource.createSink())
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
index ce21451b2c..5a9852809c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
@@ -31,4 +31,4 @@ import org.apache.spark.sql.execution.streaming.{Offset, Sink}
@Experimental
class SinkStatus private[sql](
val description: String,
- val offset: Option[Offset])
+ val offset: Offset)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index e2a14edc54..fac2a64726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -162,7 +162,8 @@ case class DataSource(
paths = files,
userSpecifiedSchema = Some(dataSchema),
className = className,
- options = options.filterKeys(_ != "path")).resolveRelation()))
+ options =
+ new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation()))
}
new FileStreamSource(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index 59a52a3d59..e48ac59892 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -52,6 +52,18 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
case i if i == 0 => 0
case i if i > 0 => 1
}
+
+ /**
+ * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
+ * sources.
+ *
+ * This method is typically used to associate a serialized offset with actual sources (which
+ * cannot be serialized).
+ */
+ def toStreamProgress(sources: Seq[Source]): StreamProgress = {
+ assert(sources.size == offsets.size)
+ new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
+ }
}
object CompositeOffset {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 787e93f543..d13b1a6166 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -109,20 +109,16 @@ class FileStreamSource(
/**
* Returns the next batch of data that is available after `start`, if any is available.
*/
- override def getNextBatch(start: Option[Offset]): Option[Batch] = {
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
- val end = fetchMaxOffset()
- val endId = end.offset
-
- if (startId + 1 <= endId) {
- val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
- logDebug(s"Return files from batches ${startId + 1}:$endId")
- logDebug(s"Streaming ${files.mkString(", ")}")
- Some(new Batch(end, dataFrameBuilder(files)))
- }
- else {
- None
- }
+ val endId = end.asInstanceOf[LongOffset].offset
+
+ assert(startId <= endId)
+ val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
+ logDebug(s"Return files from batches ${startId + 1}:$endId")
+ logDebug(s"Streaming ${files.mkString(", ")}")
+ dataFrameBuilder(files)
+
}
private def fetchAllFiles(): Seq[String] = {
@@ -130,4 +126,6 @@ class FileStreamSource(
.filterNot(_.getPath.getName.startsWith("_"))
.map(_.getPath.toUri.toString)
}
+
+ override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index ac2842b6d5..298b5d292e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.SQLContext
@@ -42,7 +43,9 @@ import org.apache.spark.sql.SQLContext
* Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing
* files in a directory always shows the latest files.
*/
-class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] {
+class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
+ extends MetadataLog[T]
+ with Logging {
private val metadataPath = new Path(path)
@@ -113,6 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends
try {
// Try to commit the batch
// It will fail if there is an existing file (someone has committed the batch)
+ logDebug(s"Attempting to write log #${batchFile(batchId)}")
fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE)
return
} catch {
@@ -161,6 +165,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends
val bytes = IOUtils.toByteArray(input)
Some(serializer.deserialize[T](ByteBuffer.wrap(bytes)))
} else {
+ logDebug(s"Unable to find batch $batchMetadataFile")
None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e3b2d2f67e..25015d58f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -17,31 +17,19 @@
package org.apache.spark.sql.execution.streaming
+import org.apache.spark.sql.DataFrame
+
/**
- * An interface for systems that can collect the results of a streaming query.
- *
- * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
- * data and update the [[Offset]]. In the case of a failure, the sink will be recreated
- * and must be able to return the [[Offset]] for all of the data that is made durable.
- * This contract allows Spark to process data with exactly-once semantics, even in the case
- * of failures that require the computation to be restarted.
+ * An interface for systems that can collect the results of a streaming query. In order to preserve
+ * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
+ * batch.
*/
trait Sink {
- /**
- * Returns the [[Offset]] for all data that is currently present in the sink, if any. This
- * function will be called by Spark when restarting execution in order to determine at which point
- * in the input stream computation should be resumed from.
- */
- def currentOffset: Option[Offset]
/**
- * Accepts a new batch of data as well as a [[Offset]] that denotes how far in the input
- * data computation has progressed to. When computation restarts after a failure, it is important
- * that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that
- * has been persisted durably. Note that this does not necessarily have to be the
- * [[Offset]] for the most recent batch of data that was given to the sink. For example,
- * it is valid to buffer data before persisting, as long as the [[Offset]] is stored
- * transactionally as data is eventually persisted.
+ * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
+ * this method is called more than once with the same batchId (which will happen in the case of
+ * failures), then `data` should only be added once.
*/
- def addBatch(batch: Batch): Unit
+ def addBatch(batchId: Long, data: DataFrame): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 25922979ac..6457f928ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
/**
@@ -29,8 +30,13 @@ trait Source {
/** Returns the schema of the data from this source */
def schema: StructType
+ /** Returns the maximum available offset for this source. */
+ def getOffset: Option[Offset]
+
/**
- * Returns the next batch of data that is available after `start`, if any is available.
+ * Returns the data that is is between the offsets (`start`, `end`]. When `start` is `None` then
+ * the batch should begin with the first available record. This method must always return the
+ * same data for a particular `start` and `end` pair.
*/
- def getNextBatch(start: Option[Offset]): Option[Batch]
+ def getBatch(start: Option[Offset], end: Offset): DataFrame
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 0062b7fc75..c5fefb5346 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
@@ -41,6 +43,7 @@ import org.apache.spark.sql.util.ContinuousQueryListener._
class StreamExecution(
val sqlContext: SQLContext,
override val name: String,
+ val checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink) extends ContinuousQuery with Logging {
@@ -52,13 +55,28 @@ class StreamExecution(
/** Minimum amount of time in between the start of each batch. */
private val minBatchTime = 10
- /** Tracks how much data we have processed from each input source. */
- private[sql] val streamProgress = new StreamProgress
+ /**
+ * Tracks how much data we have processed and committed to the sink or state store from each
+ * input source.
+ */
+ private[sql] var committedOffsets = new StreamProgress
+
+ /**
+ * Tracks the offsets that are available to be processed, but have not yet be committed to the
+ * sink.
+ */
+ private var availableOffsets = new StreamProgress
+
+ /** The current batchId or -1 if execution has not yet been initialized. */
+ private var currentBatchId: Long = -1
/** All stream sources present the query plan. */
private val sources =
logicalPlan.collect { case s: StreamingRelation => s.source }
+ /** A list of unique sources in the query plan. */
+ private val uniqueSources = sources.distinct
+
/** Defines the internal state of execution */
@volatile
private var state: State = INITIALIZED
@@ -74,20 +92,34 @@ class StreamExecution(
override def run(): Unit = { runBatches() }
}
+ /**
+ * A write-ahead-log that records the offsets that are present in each batch. In order to ensure
+ * that a given batch will always consist of the same data, we write to this log *before* any
+ * processing is done. Thus, the Nth record in this log indicated data that is currently being
+ * processed and the N-1th entry indicates which offsets have been durably committed to the sink.
+ */
+ private val offsetLog =
+ new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets"))
+
/** Whether the query is currently active or not */
override def isActive: Boolean = state == ACTIVE
/** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = {
- sources.map(s => new SourceStatus(s.toString, streamProgress.get(s))).toArray
+ sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray
}
/** Returns current status of the sink. */
- override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, sink.currentOffset)
+ override def sinkStatus: SinkStatus =
+ new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
/** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */
override def exception: Option[ContinuousQueryException] = Option(streamDeathCause)
+ /** Returns the path of a file with `name` in the checkpoint directory. */
+ private def checkpointFile(name: String): String =
+ new Path(new Path(checkpointRoot), name).toUri.toString
+
/**
* Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
* has been posted to all the listeners.
@@ -102,7 +134,7 @@ class StreamExecution(
* Repeatedly attempts to run batches as data arrives.
*
* Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
- * so that listeners are guaranteed to get former event before the latter. Furthermore, this
+ * such that listeners are guaranteed to get a start event before a termination. Furthermore, this
* method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
*/
private def runBatches(): Unit = {
@@ -118,9 +150,10 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SQLContext.setActive(sqlContext)
populateStartOffsets()
- logInfo(s"Stream running at $streamProgress")
+ logDebug(s"Stream running from $committedOffsets to $availableOffsets")
while (isActive) {
- attemptBatch()
+ if (dataAvailable) runBatch()
+ commitAndConstructNextBatch()
Thread.sleep(minBatchTime) // TODO: Could be tighter
}
} catch {
@@ -130,7 +163,7 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
- Some(streamProgress.toCompositeOffset(sources)))
+ Some(committedOffsets.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
} finally {
state = TERMINATED
@@ -142,48 +175,99 @@ class StreamExecution(
/**
* Populate the start offsets to start the execution at the current offsets stored in the sink
- * (i.e. avoid reprocessing data that we have already processed).
+ * (i.e. avoid reprocessing data that we have already processed). This function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * - availableOffsets
*/
private def populateStartOffsets(): Unit = {
- sink.currentOffset match {
- case Some(c: CompositeOffset) =>
- val storedProgress = c.offsets
- val sources = logicalPlan collect {
- case StreamingRelation(source, _) => source
+ offsetLog.getLatest() match {
+ case Some((batchId, nextOffsets)) =>
+ logInfo(s"Resuming continuous query, starting with batch $batchId")
+ currentBatchId = batchId + 1
+ availableOffsets = nextOffsets.toStreamProgress(sources)
+ logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+
+ offsetLog.get(batchId - 1).foreach {
+ case lastOffsets =>
+ committedOffsets = lastOffsets.toStreamProgress(sources)
+ logDebug(s"Resuming with committed offsets: $committedOffsets")
}
- assert(sources.size == storedProgress.size)
- sources.zip(storedProgress).foreach { case (source, offset) =>
- offset.foreach(streamProgress.update(source, _))
- }
case None => // We are starting this stream for the first time.
- case _ => throw new IllegalArgumentException("Expected composite offset from sink")
+ logInfo(s"Starting new continuous query.")
+ currentBatchId = 0
+ commitAndConstructNextBatch()
}
}
/**
- * Checks to see if any new data is present in any of the sources. When new data is available,
- * a batch is executed and passed to the sink, updating the currentOffsets.
+ * Returns true if there is any new data available to be processed.
*/
- private def attemptBatch(): Unit = {
+ private def dataAvailable: Boolean = {
+ availableOffsets.exists {
+ case (source, available) =>
+ committedOffsets
+ .get(source)
+ .map(committed => committed < available)
+ .getOrElse(true)
+ }
+ }
+
+ /**
+ * Queries all of the sources to see if any new data is available. When there is new data the
+ * batchId counter is incremented and a new log entry is written with the newest offsets.
+ *
+ * Note that committing the offsets for a new batch implicitly marks the previous batch as
+ * finished and thus this method should only be called when all currently available data
+ * has been written to the sink.
+ */
+ private def commitAndConstructNextBatch(): Boolean = {
+ // Update committed offsets.
+ committedOffsets ++= availableOffsets
+
+ // Check to see what new data is available.
+ val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+ availableOffsets ++= newData
+
+ if (dataAvailable) {
+ assert(
+ offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
+ currentBatchId += 1
+ logInfo(s"Committed offsets for batch $currentBatchId.")
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Processes any data available between `availableOffsets` and `committedOffsets`.
+ */
+ private def runBatch(): Unit = {
val startTime = System.nanoTime()
- // A list of offsets that need to be updated if this batch is successful.
- // Populated while walking the tree.
- val newOffsets = new ArrayBuffer[(Source, Offset)]
+ // Request unprocessed data from all sources.
+ val newData = availableOffsets.flatMap {
+ case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
+ val current = committedOffsets.get(source)
+ val batch = source.getBatch(current, available)
+ logDebug(s"Retrieving data from $source: $current -> $available")
+ Some(source -> batch)
+ case _ => None
+ }.toMap
+
// A list of attributes that will need to be updated.
var replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
case StreamingRelation(source, output) =>
- val prevOffset = streamProgress.get(source)
- val newBatch = source.getNextBatch(prevOffset)
-
- newBatch.map { batch =>
- newOffsets += ((source, batch.end))
- val newPlan = batch.data.logicalPlan
-
- assert(output.size == newPlan.output.size)
+ newData.get(source).map { data =>
+ val newPlan = data.logicalPlan
+ assert(output.size == newPlan.output.size,
+ s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
@@ -197,35 +281,24 @@ class StreamExecution(
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
}
- if (newOffsets.nonEmpty) {
- val optimizerStart = System.nanoTime()
-
- lastExecution = new QueryExecution(sqlContext, newPlan)
- val executedPlan = lastExecution.executedPlan
- val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
- logDebug(s"Optimized batch in ${optimizerTime}ms")
+ val optimizerStart = System.nanoTime()
- streamProgress.synchronized {
- // Update the offsets and calculate a new composite offset
- newOffsets.foreach(streamProgress.update)
+ lastExecution = new QueryExecution(sqlContext, newPlan)
+ val executedPlan = lastExecution.executedPlan
+ val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
+ logDebug(s"Optimized batch in ${optimizerTime}ms")
- // Construct the batch and send it to the sink.
- val batchOffset = streamProgress.toCompositeOffset(sources)
- val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan))
- sink.addBatch(nextBatch)
- }
-
- awaitBatchLock.synchronized {
- // Wake up any threads that are waiting for the stream to progress.
- awaitBatchLock.notifyAll()
- }
+ val nextBatch = Dataset.newDataFrame(sqlContext, newPlan)
+ sink.addBatch(currentBatchId - 1, nextBatch)
- val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
- logInfo(s"Completed up to $newOffsets in ${batchTime}ms")
- postEvent(new QueryProgress(this))
+ awaitBatchLock.synchronized {
+ // Wake up any threads that are waiting for the stream to progress.
+ awaitBatchLock.notifyAll()
}
- logDebug(s"Waiting for data, current: $streamProgress")
+ val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
+ logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
+ postEvent(new QueryProgress(this))
}
private def postEvent(event: ContinuousQueryListener.Event) {
@@ -252,9 +325,7 @@ class StreamExecution(
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
- def notDone = streamProgress.synchronized {
- !streamProgress.contains(source) || streamProgress(source) < newOffset
- }
+ def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset
while (notDone) {
logInfo(s"Waiting until $newOffset at $source")
@@ -297,7 +368,7 @@ class StreamExecution(
s"""
|=== Continuous Query ===
|Name: $name
- |Current Offsets: $streamProgress
+ |Current Offsets: $committedOffsets
|
|Current State: $state
|Thread State: ${microBatchThread.getState}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index d45b9bd983..405a5f0387 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -17,55 +17,31 @@
package org.apache.spark.sql.execution.streaming
-import scala.collection.mutable
+import scala.collection.{immutable, GenTraversableOnce}
/**
* A helper class that looks like a Map[Source, Offset].
*/
-class StreamProgress {
- private val currentOffsets = new mutable.HashMap[Source, Offset]
+class StreamProgress(
+ val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
+ extends scala.collection.immutable.Map[Source, Offset] {
- private[streaming] def update(source: Source, newOffset: Offset): Unit = {
- currentOffsets.get(source).foreach(old =>
- assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
- currentOffsets.put(source, newOffset)
+ private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
+ CompositeOffset(source.map(get))
}
- private[streaming] def update(newOffset: (Source, Offset)): Unit =
- update(newOffset._1, newOffset._2)
+ override def toString: String =
+ baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
- private[streaming] def apply(source: Source): Offset = currentOffsets(source)
- private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
- private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
+ override def +[B1 >: Offset](kv: (Source, B1)): Map[Source, B1] = baseMap + kv
- private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
- val updated = new StreamProgress
- currentOffsets.foreach(updated.update)
- updates.foreach(updated.update)
- updated
- }
+ override def get(key: Source): Option[Offset] = baseMap.get(key)
- /**
- * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
- * it should be copied before being passed to user code.
- */
- private[streaming] def copy(): StreamProgress = {
- val copied = new StreamProgress
- currentOffsets.foreach(copied.update)
- copied
- }
+ override def iterator: Iterator[(Source, Offset)] = baseMap.iterator
- private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = {
- CompositeOffset(source.map(get))
- }
+ override def -(key: Source): Map[Source, Offset] = baseMap - key
- override def toString: String =
- currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
-
- override def equals(other: Any): Boolean = other match {
- case s: StreamProgress => currentOffsets == s.currentOffsets
- case _ => false
+ def ++(updates: GenTraversableOnce[(Source, Offset)]): StreamProgress = {
+ new StreamProgress(baseMap ++ updates)
}
-
- override def hashCode: Int = currentOffsets.hashCode()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index a6504cd088..8c2bb4abd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -51,8 +51,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
protected var currentOffset: LongOffset = new LongOffset(-1)
- protected def blockManager = SparkEnv.get.blockManager
-
def schema: StructType = encoder.schema
def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -78,25 +76,32 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
- override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized {
- val newBlocks =
- batches.drop(
- start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1)
-
- if (newBlocks.nonEmpty) {
- logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}")
- val df = newBlocks
- .map(_.toDF())
- .reduceOption(_ unionAll _)
- .getOrElse(sqlContext.emptyDataFrame)
+ override def toString: String = s"MemoryStream[${output.mkString(",")}]"
- Some(new Batch(currentOffset, df))
- } else {
- None
- }
+ override def getOffset: Option[Offset] = if (batches.isEmpty) {
+ None
+ } else {
+ Some(currentOffset)
}
- override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+ /**
+ * Returns the next batch of data that is available after `start`, if any is available.
+ */
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ val startOrdinal =
+ start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+ val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+ val newBlocks = batches.slice(startOrdinal, endOrdinal)
+
+ logDebug(
+ s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
+ newBlocks
+ .map(_.toDF())
+ .reduceOption(_ unionAll _)
+ .getOrElse {
+ sys.error("No data selected!")
+ }
+ }
}
/**
@@ -105,45 +110,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
*/
class MemorySink(schema: StructType) extends Sink with Logging {
/** An order list of batches that have been written to this [[Sink]]. */
- private var batches = new ArrayBuffer[Batch]()
-
- /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */
- private val externalRowConverter = RowEncoder(schema)
-
- override def currentOffset: Option[Offset] = synchronized {
- batches.lastOption.map(_.end)
- }
-
- override def addBatch(nextBatch: Batch): Unit = synchronized {
- nextBatch.data.collect() // 'compute' the batch's data and record the batch
- batches.append(nextBatch)
- }
+ private val batches = new ArrayBuffer[Array[Row]]()
/** Returns all rows that are stored in this [[Sink]]. */
def allData: Seq[Row] = synchronized {
- batches
- .map(_.data)
- .reduceOption(_ unionAll _)
- .map(_.collect().toSeq)
- .getOrElse(Seq.empty)
- }
-
- /**
- * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the
- * corresponding point in the input. This function can be used when testing to simulate data
- * that has been lost due to buffering.
- */
- def dropBatches(num: Int): Unit = synchronized {
- batches.dropRight(num)
+ batches.flatten
}
def toDebugString: String = synchronized {
- batches.map { b =>
- val dataStr = try b.data.collect().mkString(" ") catch {
+ batches.zipWithIndex.map { case (b, i) =>
+ val dataStr = try b.mkString(" ") catch {
case NonFatal(e) => "[Error converting to string]"
}
- s"${b.end}: $dataStr"
+ s"$i: $dataStr"
}.mkString("\n")
}
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ if (batchId == batches.size) {
+ logDebug(s"Committing batch $batchId")
+ batches.append(data.collect())
+ } else {
+ logDebug(s"Skipping already committed batch: $batchId")
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 70d1a8b071..fd1d77f514 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,6 +524,11 @@ object SQLConf {
doc = "When true, the planner will try to find out duplicated exchanges and re-use them.",
isPublic = false)
+ val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation",
+ defaultValue = None,
+ doc = "The default location for storing checkpoint data for continuously executing queries.",
+ isPublic = true)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -554,6 +559,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
/** ************************ Spark SQL Params/Hints ******************* */
+ def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
+
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
def useCompression: Boolean = getConf(COMPRESS_CACHED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 81078dc6a0..f356cde9cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.Utils
/**
* A framework for implementing tests for streaming queries and sources.
@@ -64,6 +65,12 @@ import org.apache.spark.sql.execution.streaming._
*/
trait StreamTest extends QueryTest with Timeouts {
+ implicit class RichContinuousQuery(cq: ContinuousQuery) {
+ def stopQuietly(): Unit = quietly {
+ cq.stop()
+ }
+ }
+
implicit class RichSource(s: Source) {
def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
@@ -126,8 +133,6 @@ trait StreamTest extends QueryTest with Timeouts {
override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
}
- case class DropBatches(num: Int) extends StreamAction
-
/** Stops the stream. It must currently be running. */
case object StopStream extends StreamAction with StreamMustBeRunning
@@ -202,7 +207,7 @@ trait StreamTest extends QueryTest with Timeouts {
}.mkString("\n")
def currentOffsets =
- if (currentStream != null) currentStream.streamProgress.toString else "not started"
+ if (currentStream != null) currentStream.committedOffsets.toString else "not started"
def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
@@ -266,6 +271,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
val testThread = Thread.currentThread()
+ val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
try {
startedTest.foreach { action =>
@@ -276,7 +282,7 @@ trait StreamTest extends QueryTest with Timeouts {
currentStream =
sqlContext
.streams
- .startQuery(StreamExecution.nextName, stream, sink)
+ .startQuery(StreamExecution.nextName, metadataRoot, stream, sink)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
@@ -308,10 +314,6 @@ trait StreamTest extends QueryTest with Timeouts {
currentStream = null
}
- case DropBatches(num) =>
- verify(currentStream == null, "dropping batches while running leads to corruption")
- sink.dropBatches(num)
-
case ef: ExpectFailure[_] =>
verify(currentStream != null, "can not expect failure when stream is not running")
try failAfter(streamingTimeout) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 45e824ad63..54ce98d195 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest}
import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation}
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
@@ -235,9 +236,14 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
@volatile var query: StreamExecution = null
try {
val df = ds.toDF
+ val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
query = sqlContext
.streams
- .startQuery(StreamExecution.nextName, df, new MemorySink(df.schema))
+ .startQuery(
+ StreamExecution.nextName,
+ metadataRoot,
+ df,
+ new MemorySink(df.schema))
.asInstanceOf[StreamExecution]
} catch {
case NonFatal(e) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 84ed017a9d..3be0ea481d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -54,7 +54,8 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(
- q => q.exception.get.startOffset.get === q.streamProgress.toCompositeOffset(Seq(inputData)),
+ q =>
+ q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)),
"incorrect start offset on exception")
)
}
@@ -68,19 +69,19 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
AssertOnQuery(_.sourceStatuses(0).offset === None),
AssertOnQuery(_.sinkStatus.description.contains("Memory")),
- AssertOnQuery(_.sinkStatus.offset === None),
+ AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
- AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))),
+ AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))),
AddData(inputData, 1, 2),
CheckAnswer(6, 3, 6, 3),
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))),
- AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))),
+ AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))),
AddData(inputData, 0),
ExpectFailure[SparkException],
AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))),
- AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1))))
+ AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1)))
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index 0878277811..e485aa837b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.streaming.test
import org.scalatest.BeforeAndAfter
-import org.apache.spark.sql.{AnalysisException, ContinuousQuery, SQLContext, StreamTest}
-import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.util.Utils
object LastOptions {
var parameters: Map[String, String] = null
@@ -41,8 +42,15 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
LastOptions.parameters = parameters
LastOptions.schema = schema
new Source {
- override def getNextBatch(start: Option[Offset]): Option[Batch] = None
override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def getOffset: Option[Offset] = Some(new LongOffset(0))
+
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ import sqlContext.implicits._
+
+ Seq[Int]().toDS().toDF()
+ }
}
}
@@ -53,8 +61,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
LastOptions.parameters = parameters
LastOptions.partitionColumns = partitionColumns
new Sink {
- override def addBatch(batch: Batch): Unit = {}
- override def currentOffset: Option[Offset] = None
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {}
}
}
}
@@ -62,8 +69,10 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._
+ private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
+
after {
- sqlContext.streams.active.foreach(_.stop())
+ sqlContext.streams.active.foreach(_.stopQuietly())
}
test("resolve default source") {
@@ -72,8 +81,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.stream()
.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.startStream()
- .stop()
+ .stopQuietly()
}
test("resolve full class") {
@@ -82,8 +92,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.stream()
.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.startStream()
- .stop()
+ .stopQuietly()
}
test("options") {
@@ -108,8 +119,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("opt1", "1")
.options(Map("opt2" -> "2"))
.options(map)
+ .option("checkpointLocation", newMetadataDir)
.startStream()
- .stop()
+ .stopQuietly()
assert(LastOptions.parameters("opt1") == "1")
assert(LastOptions.parameters("opt2") == "2")
@@ -123,38 +135,43 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
df.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.startStream()
- .stop()
+ .stopQuietly()
assert(LastOptions.partitionColumns == Nil)
df.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.partitionBy("a")
.startStream()
- .stop()
+ .stopQuietly()
assert(LastOptions.partitionColumns == Seq("a"))
withSQLConf("spark.sql.caseSensitive" -> "false") {
df.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.partitionBy("A")
.startStream()
- .stop()
+ .stopQuietly()
assert(LastOptions.partitionColumns == Seq("a"))
}
intercept[AnalysisException] {
df.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.partitionBy("b")
.startStream()
- .stop()
+ .stopQuietly()
}
}
test("stream paths") {
val df = sqlContext.read
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.stream("/test")
assert(LastOptions.parameters("path") == "/test")
@@ -163,8 +180,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
df.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.startStream("/test")
- .stop()
+ .stopQuietly()
assert(LastOptions.parameters("path") == "/test")
}
@@ -187,8 +205,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.option("intOpt", 56)
.option("boolOpt", false)
.option("doubleOpt", 6.7)
+ .option("checkpointLocation", newMetadataDir)
.startStream("/test")
- .stop()
+ .stopQuietly()
assert(LastOptions.parameters("intOpt") == "56")
assert(LastOptions.parameters("boolOpt") == "false")
@@ -204,6 +223,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.stream("/test")
.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.queryName(name)
.startStream()
}
@@ -215,6 +235,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
.stream("/test")
.write
.format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
.startStream()
}
@@ -248,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
}
// Should be able to start query with that name after stopping the previous query
- q1.stop()
+ q1.stopQuietly()
val q5 = startQueryWithName("name")
assert(activeStreamNames.contains("name"))
- sqlContext.streams.active.foreach(_.stop())
+ sqlContext.streams.active.foreach(_.stopQuietly())
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 4c18e38db8..89de15acf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -318,16 +318,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("fault tolerance") {
- def assertBatch(batch1: Option[Batch], batch2: Option[Batch]): Unit = {
- (batch1, batch2) match {
- case (Some(b1), Some(b2)) =>
- assert(b1.end === b2.end)
- assert(b1.data.as[String].collect() === b2.data.as[String].collect())
- case (None, None) =>
- case _ => fail(s"batch ($batch1) is not equal to batch ($batch2)")
- }
- }
-
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
@@ -345,14 +335,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
)
- val textSource2 = createFileStreamSource("text", src.getCanonicalPath)
- assert(textSource2.currentOffset === textSource.currentOffset)
- assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None))
- for (f <- 0L to textSource.currentOffset.offset) {
- val offset = LongOffset(f)
- assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset)))
- }
-
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index 52783281ab..d04783ecac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -61,7 +61,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
// The source and sink offsets must be None as this must be called before the
// batches have started
assert(status.sourceStatuses(0).offset === None)
- assert(status.sinkStatus.offset === None)
+ assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
// No progress events or termination events
assert(listener.progressStatuses.isEmpty)
@@ -78,7 +78,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
assert(status != null)
assert(status.active == true)
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
- assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0))))
+ assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
// No termination events
assert(listener.terminationStatus === null)
@@ -92,7 +92,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
assert(status.active === false) // must be inactive by the time onQueryTerm is called
assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
- assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0))))
+ assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0)))
}
listener.checkAsyncErrors()
}