From caea15214571d9b12dcf1553e5c1cc8b83a8ba5b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 22 Mar 2016 10:18:42 -0700 Subject: [SPARK-13985][SQL] Deterministic batches with ids This PR relaxes the requirements of a `Sink` for structured streaming to only require idempotent appending of data. Previously the `Sink` needed to be able to transactionally append data while recording an opaque offset indicated how far in a stream we have processed. In order to do this, a new write-ahead-log has been added to stream execution, which records the offsets that will are present in each batch. The log is created in the newly added `checkpointLocation`, which defaults to `${spark.sql.streaming.checkpointLocation}/${queryName}` but can be overriden by setting `checkpointLocation` in `DataFrameWriter`. In addition to making sinks easier to write the addition of batchIds and a checkpoint location is done in anticipation of integration with the the `StateStore` (#11645). Author: Michael Armbrust Closes #11804 from marmbrus/batchIds. --- .../apache/spark/sql/ContinuousQueryManager.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 11 +- .../scala/org/apache/spark/sql/SinkStatus.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 3 +- .../sql/execution/streaming/CompositeOffset.scala | 12 ++ .../sql/execution/streaming/FileStreamSource.scala | 24 ++- .../sql/execution/streaming/HDFSMetadataLog.scala | 7 +- .../spark/sql/execution/streaming/Sink.scala | 30 +--- .../spark/sql/execution/streaming/Source.scala | 10 +- .../sql/execution/streaming/StreamExecution.scala | 193 ++++++++++++++------- .../sql/execution/streaming/StreamProgress.scala | 52 ++---- .../spark/sql/execution/streaming/memory.scala | 85 ++++----- .../org/apache/spark/sql/internal/SQLConf.scala | 7 + .../scala/org/apache/spark/sql/StreamTest.scala | 18 +- .../streaming/ContinuousQueryManagerSuite.scala | 8 +- .../spark/sql/streaming/ContinuousQuerySuite.scala | 11 +- .../sql/streaming/DataFrameReaderWriterSuite.scala | 55 ++++-- .../sql/streaming/FileStreamSourceSuite.scala | 18 -- .../sql/util/ContinuousQueryListenerSuite.scala | 6 +- 19 files changed, 319 insertions(+), 241 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 0a156ea99a..fa8219bbed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -164,13 +164,17 @@ class ContinuousQueryManager(sqlContext: SQLContext) { } /** Start a query */ - private[sql] def startQuery(name: String, df: DataFrame, sink: Sink): ContinuousQuery = { + private[sql] def startQuery( + name: String, + checkpointLocation: String, + df: DataFrame, + sink: Sink): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } - val query = new StreamExecution(sqlContext, name, df.logicalPlan, sink) + val query = new StreamExecution(sqlContext, name, checkpointLocation, df.logicalPlan, sink) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7ed1c51360..c07bd0e7b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -251,8 +253,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) + val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) + val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { + new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString + }) df.sqlContext.sessionState.continuousQueryManager.startQuery( - extraOptions.getOrElse("queryName", StreamExecution.nextName), df, dataSource.createSink()) + queryName, + checkpointLocation, + df, + dataSource.createSink()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala index ce21451b2c..5a9852809c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala @@ -31,4 +31,4 @@ import org.apache.spark.sql.execution.streaming.{Offset, Sink} @Experimental class SinkStatus private[sql]( val description: String, - val offset: Option[Offset]) + val offset: Offset) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e2a14edc54..fac2a64726 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -162,7 +162,8 @@ case class DataSource( paths = files, userSpecifiedSchema = Some(dataSchema), className = className, - options = options.filterKeys(_ != "path")).resolveRelation())) + options = + new CaseInsensitiveMap(options.filterKeys(_ != "path"))).resolveRelation())) } new FileStreamSource( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala index 59a52a3d59..e48ac59892 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala @@ -52,6 +52,18 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { case i if i == 0 => 0 case i if i > 0 => 1 } + + /** + * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of + * sources. + * + * This method is typically used to associate a serialized offset with actual sources (which + * cannot be serialized). + */ + def toStreamProgress(sources: Seq[Source]): StreamProgress = { + assert(sources.size == offsets.size) + new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) } + } } object CompositeOffset { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 787e93f543..d13b1a6166 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -109,20 +109,16 @@ class FileStreamSource( /** * Returns the next batch of data that is available after `start`, if any is available. */ - override def getNextBatch(start: Option[Offset]): Option[Batch] = { + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) - val end = fetchMaxOffset() - val endId = end.offset - - if (startId + 1 <= endId) { - val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten - logDebug(s"Return files from batches ${startId + 1}:$endId") - logDebug(s"Streaming ${files.mkString(", ")}") - Some(new Batch(end, dataFrameBuilder(files))) - } - else { - None - } + val endId = end.asInstanceOf[LongOffset].offset + + assert(startId <= endId) + val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten + logDebug(s"Return files from batches ${startId + 1}:$endId") + logDebug(s"Streaming ${files.mkString(", ")}") + dataFrameBuilder(files) + } private def fetchAllFiles(): Seq[String] = { @@ -130,4 +126,6 @@ class FileStreamSource( .filterNot(_.getPath.getName.startsWith("_")) .map(_.getPath.toUri.toString) } + + override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index ac2842b6d5..298b5d292e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.sql.SQLContext @@ -42,7 +43,9 @@ import org.apache.spark.sql.SQLContext * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends MetadataLog[T] { +class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) + extends MetadataLog[T] + with Logging { private val metadataPath = new Path(path) @@ -113,6 +116,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends try { // Try to commit the batch // It will fail if there is an existing file (someone has committed the batch) + logDebug(s"Attempting to write log #${batchFile(batchId)}") fc.rename(tempPath, batchFile(batchId), Options.Rename.NONE) return } catch { @@ -161,6 +165,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String) extends val bytes = IOUtils.toByteArray(input) Some(serializer.deserialize[T](ByteBuffer.wrap(bytes))) } else { + logDebug(s"Unable to find batch $batchMetadataFile") None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index e3b2d2f67e..25015d58f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -17,31 +17,19 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.DataFrame + /** - * An interface for systems that can collect the results of a streaming query. - * - * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the - * data and update the [[Offset]]. In the case of a failure, the sink will be recreated - * and must be able to return the [[Offset]] for all of the data that is made durable. - * This contract allows Spark to process data with exactly-once semantics, even in the case - * of failures that require the computation to be restarted. + * An interface for systems that can collect the results of a streaming query. In order to preserve + * exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same + * batch. */ trait Sink { - /** - * Returns the [[Offset]] for all data that is currently present in the sink, if any. This - * function will be called by Spark when restarting execution in order to determine at which point - * in the input stream computation should be resumed from. - */ - def currentOffset: Option[Offset] /** - * Accepts a new batch of data as well as a [[Offset]] that denotes how far in the input - * data computation has progressed to. When computation restarts after a failure, it is important - * that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that - * has been persisted durably. Note that this does not necessarily have to be the - * [[Offset]] for the most recent batch of data that was given to the sink. For example, - * it is valid to buffer data before persisting, as long as the [[Offset]] is stored - * transactionally as data is eventually persisted. + * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if + * this method is called more than once with the same batchId (which will happen in the case of + * failures), then `data` should only be added once. */ - def addBatch(batch: Batch): Unit + def addBatch(batchId: Long, data: DataFrame): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 25922979ac..6457f928ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType /** @@ -29,8 +30,13 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType + /** Returns the maximum available offset for this source. */ + def getOffset: Option[Offset] + /** - * Returns the next batch of data that is available after `start`, if any is available. + * Returns the data that is is between the offsets (`start`, `end`]. When `start` is `None` then + * the batch should begin with the first available record. This method must always return the + * same data for a particular `start` and `end` pair. */ - def getNextBatch(start: Option[Offset]): Option[Batch] + def getBatch(start: Option[Offset], end: Offset): DataFrame } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0062b7fc75..c5fefb5346 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} @@ -41,6 +43,7 @@ import org.apache.spark.sql.util.ContinuousQueryListener._ class StreamExecution( val sqlContext: SQLContext, override val name: String, + val checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink) extends ContinuousQuery with Logging { @@ -52,13 +55,28 @@ class StreamExecution( /** Minimum amount of time in between the start of each batch. */ private val minBatchTime = 10 - /** Tracks how much data we have processed from each input source. */ - private[sql] val streamProgress = new StreamProgress + /** + * Tracks how much data we have processed and committed to the sink or state store from each + * input source. + */ + private[sql] var committedOffsets = new StreamProgress + + /** + * Tracks the offsets that are available to be processed, but have not yet be committed to the + * sink. + */ + private var availableOffsets = new StreamProgress + + /** The current batchId or -1 if execution has not yet been initialized. */ + private var currentBatchId: Long = -1 /** All stream sources present the query plan. */ private val sources = logicalPlan.collect { case s: StreamingRelation => s.source } + /** A list of unique sources in the query plan. */ + private val uniqueSources = sources.distinct + /** Defines the internal state of execution */ @volatile private var state: State = INITIALIZED @@ -74,20 +92,34 @@ class StreamExecution( override def run(): Unit = { runBatches() } } + /** + * A write-ahead-log that records the offsets that are present in each batch. In order to ensure + * that a given batch will always consist of the same data, we write to this log *before* any + * processing is done. Thus, the Nth record in this log indicated data that is currently being + * processed and the N-1th entry indicates which offsets have been durably committed to the sink. + */ + private val offsetLog = + new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) + /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE /** Returns current status of all the sources. */ override def sourceStatuses: Array[SourceStatus] = { - sources.map(s => new SourceStatus(s.toString, streamProgress.get(s))).toArray + sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray } /** Returns current status of the sink. */ - override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, sink.currentOffset) + override def sinkStatus: SinkStatus = + new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources)) /** Returns the [[ContinuousQueryException]] if the query was terminated by an exception. */ override def exception: Option[ContinuousQueryException] = Option(streamDeathCause) + /** Returns the path of a file with `name` in the checkpoint directory. */ + private def checkpointFile(name: String): String = + new Path(new Path(checkpointRoot), name).toUri.toString + /** * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event * has been posted to all the listeners. @@ -102,7 +134,7 @@ class StreamExecution( * Repeatedly attempts to run batches as data arrives. * * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted - * so that listeners are guaranteed to get former event before the latter. Furthermore, this + * such that listeners are guaranteed to get a start event before a termination. Furthermore, this * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns. */ private def runBatches(): Unit = { @@ -118,9 +150,10 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SQLContext.setActive(sqlContext) populateStartOffsets() - logInfo(s"Stream running at $streamProgress") + logDebug(s"Stream running from $committedOffsets to $availableOffsets") while (isActive) { - attemptBatch() + if (dataAvailable) runBatch() + commitAndConstructNextBatch() Thread.sleep(minBatchTime) // TODO: Could be tighter } } catch { @@ -130,7 +163,7 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(streamProgress.toCompositeOffset(sources))) + Some(committedOffsets.toCompositeOffset(sources))) logError(s"Query $name terminated with error", e) } finally { state = TERMINATED @@ -142,48 +175,99 @@ class StreamExecution( /** * Populate the start offsets to start the execution at the current offsets stored in the sink - * (i.e. avoid reprocessing data that we have already processed). + * (i.e. avoid reprocessing data that we have already processed). This function must be called + * before any processing occurs and will populate the following fields: + * - currentBatchId + * - committedOffsets + * - availableOffsets */ private def populateStartOffsets(): Unit = { - sink.currentOffset match { - case Some(c: CompositeOffset) => - val storedProgress = c.offsets - val sources = logicalPlan collect { - case StreamingRelation(source, _) => source + offsetLog.getLatest() match { + case Some((batchId, nextOffsets)) => + logInfo(s"Resuming continuous query, starting with batch $batchId") + currentBatchId = batchId + 1 + availableOffsets = nextOffsets.toStreamProgress(sources) + logDebug(s"Found possibly uncommitted offsets $availableOffsets") + + offsetLog.get(batchId - 1).foreach { + case lastOffsets => + committedOffsets = lastOffsets.toStreamProgress(sources) + logDebug(s"Resuming with committed offsets: $committedOffsets") } - assert(sources.size == storedProgress.size) - sources.zip(storedProgress).foreach { case (source, offset) => - offset.foreach(streamProgress.update(source, _)) - } case None => // We are starting this stream for the first time. - case _ => throw new IllegalArgumentException("Expected composite offset from sink") + logInfo(s"Starting new continuous query.") + currentBatchId = 0 + commitAndConstructNextBatch() } } /** - * Checks to see if any new data is present in any of the sources. When new data is available, - * a batch is executed and passed to the sink, updating the currentOffsets. + * Returns true if there is any new data available to be processed. */ - private def attemptBatch(): Unit = { + private def dataAvailable: Boolean = { + availableOffsets.exists { + case (source, available) => + committedOffsets + .get(source) + .map(committed => committed < available) + .getOrElse(true) + } + } + + /** + * Queries all of the sources to see if any new data is available. When there is new data the + * batchId counter is incremented and a new log entry is written with the newest offsets. + * + * Note that committing the offsets for a new batch implicitly marks the previous batch as + * finished and thus this method should only be called when all currently available data + * has been written to the sink. + */ + private def commitAndConstructNextBatch(): Boolean = { + // Update committed offsets. + committedOffsets ++= availableOffsets + + // Check to see what new data is available. + val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + availableOffsets ++= newData + + if (dataAvailable) { + assert( + offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + currentBatchId += 1 + logInfo(s"Committed offsets for batch $currentBatchId.") + true + } else { + false + } + } + + /** + * Processes any data available between `availableOffsets` and `committedOffsets`. + */ + private def runBatch(): Unit = { val startTime = System.nanoTime() - // A list of offsets that need to be updated if this batch is successful. - // Populated while walking the tree. - val newOffsets = new ArrayBuffer[(Source, Offset)] + // Request unprocessed data from all sources. + val newData = availableOffsets.flatMap { + case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) => + val current = committedOffsets.get(source) + val batch = source.getBatch(current, available) + logDebug(s"Retrieving data from $source: $current -> $available") + Some(source -> batch) + case _ => None + }.toMap + // A list of attributes that will need to be updated. var replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { case StreamingRelation(source, output) => - val prevOffset = streamProgress.get(source) - val newBatch = source.getNextBatch(prevOffset) - - newBatch.map { batch => - newOffsets += ((source, batch.end)) - val newPlan = batch.data.logicalPlan - - assert(output.size == newPlan.output.size) + newData.get(source).map { data => + val newPlan = data.logicalPlan + assert(output.size == newPlan.output.size, + s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { @@ -197,35 +281,24 @@ class StreamExecution( case a: Attribute if replacementMap.contains(a) => replacementMap(a) } - if (newOffsets.nonEmpty) { - val optimizerStart = System.nanoTime() - - lastExecution = new QueryExecution(sqlContext, newPlan) - val executedPlan = lastExecution.executedPlan - val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 - logDebug(s"Optimized batch in ${optimizerTime}ms") + val optimizerStart = System.nanoTime() - streamProgress.synchronized { - // Update the offsets and calculate a new composite offset - newOffsets.foreach(streamProgress.update) + lastExecution = new QueryExecution(sqlContext, newPlan) + val executedPlan = lastExecution.executedPlan + val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 + logDebug(s"Optimized batch in ${optimizerTime}ms") - // Construct the batch and send it to the sink. - val batchOffset = streamProgress.toCompositeOffset(sources) - val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan)) - sink.addBatch(nextBatch) - } - - awaitBatchLock.synchronized { - // Wake up any threads that are waiting for the stream to progress. - awaitBatchLock.notifyAll() - } + val nextBatch = Dataset.newDataFrame(sqlContext, newPlan) + sink.addBatch(currentBatchId - 1, nextBatch) - val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 - logInfo(s"Completed up to $newOffsets in ${batchTime}ms") - postEvent(new QueryProgress(this)) + awaitBatchLock.synchronized { + // Wake up any threads that are waiting for the stream to progress. + awaitBatchLock.notifyAll() } - logDebug(s"Waiting for data, current: $streamProgress") + val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 + logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + postEvent(new QueryProgress(this)) } private def postEvent(event: ContinuousQueryListener.Event) { @@ -252,9 +325,7 @@ class StreamExecution( * least the given `Offset`. This method is indented for use primarily when writing tests. */ def awaitOffset(source: Source, newOffset: Offset): Unit = { - def notDone = streamProgress.synchronized { - !streamProgress.contains(source) || streamProgress(source) < newOffset - } + def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset while (notDone) { logInfo(s"Waiting until $newOffset at $source") @@ -297,7 +368,7 @@ class StreamExecution( s""" |=== Continuous Query === |Name: $name - |Current Offsets: $streamProgress + |Current Offsets: $committedOffsets | |Current State: $state |Thread State: ${microBatchThread.getState} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index d45b9bd983..405a5f0387 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -17,55 +17,31 @@ package org.apache.spark.sql.execution.streaming -import scala.collection.mutable +import scala.collection.{immutable, GenTraversableOnce} /** * A helper class that looks like a Map[Source, Offset]. */ -class StreamProgress { - private val currentOffsets = new mutable.HashMap[Source, Offset] +class StreamProgress( + val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) + extends scala.collection.immutable.Map[Source, Offset] { - private[streaming] def update(source: Source, newOffset: Offset): Unit = { - currentOffsets.get(source).foreach(old => - assert(newOffset > old, s"Stream going backwards $newOffset -> $old")) - currentOffsets.put(source, newOffset) + private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = { + CompositeOffset(source.map(get)) } - private[streaming] def update(newOffset: (Source, Offset)): Unit = - update(newOffset._1, newOffset._2) + override def toString: String = + baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") - private[streaming] def apply(source: Source): Offset = currentOffsets(source) - private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source) - private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source) + override def +[B1 >: Offset](kv: (Source, B1)): Map[Source, B1] = baseMap + kv - private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = { - val updated = new StreamProgress - currentOffsets.foreach(updated.update) - updates.foreach(updated.update) - updated - } + override def get(key: Source): Option[Offset] = baseMap.get(key) - /** - * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable, - * it should be copied before being passed to user code. - */ - private[streaming] def copy(): StreamProgress = { - val copied = new StreamProgress - currentOffsets.foreach(copied.update) - copied - } + override def iterator: Iterator[(Source, Offset)] = baseMap.iterator - private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = { - CompositeOffset(source.map(get)) - } + override def -(key: Source): Map[Source, Offset] = baseMap - key - override def toString: String = - currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") - - override def equals(other: Any): Boolean = other match { - case s: StreamProgress => currentOffsets == s.currentOffsets - case _ => false + def ++(updates: GenTraversableOnce[(Source, Offset)]): StreamProgress = { + new StreamProgress(baseMap ++ updates) } - - override def hashCode: Int = currentOffsets.hashCode() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index a6504cd088..8c2bb4abd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -51,8 +51,6 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) protected var currentOffset: LongOffset = new LongOffset(-1) - protected def blockManager = SparkEnv.get.blockManager - def schema: StructType = encoder.schema def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { @@ -78,25 +76,32 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized { - val newBlocks = - batches.drop( - start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1) - - if (newBlocks.nonEmpty) { - logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}") - val df = newBlocks - .map(_.toDF()) - .reduceOption(_ unionAll _) - .getOrElse(sqlContext.emptyDataFrame) + override def toString: String = s"MemoryStream[${output.mkString(",")}]" - Some(new Batch(currentOffset, df)) - } else { - None - } + override def getOffset: Option[Offset] = if (batches.isEmpty) { + None + } else { + Some(currentOffset) } - override def toString: String = s"MemoryStream[${output.mkString(",")}]" + /** + * Returns the next batch of data that is available after `start`, if any is available. + */ + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + val startOrdinal = + start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + val newBlocks = batches.slice(startOrdinal, endOrdinal) + + logDebug( + s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") + newBlocks + .map(_.toDF()) + .reduceOption(_ unionAll _) + .getOrElse { + sys.error("No data selected!") + } + } } /** @@ -105,45 +110,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) */ class MemorySink(schema: StructType) extends Sink with Logging { /** An order list of batches that have been written to this [[Sink]]. */ - private var batches = new ArrayBuffer[Batch]() - - /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */ - private val externalRowConverter = RowEncoder(schema) - - override def currentOffset: Option[Offset] = synchronized { - batches.lastOption.map(_.end) - } - - override def addBatch(nextBatch: Batch): Unit = synchronized { - nextBatch.data.collect() // 'compute' the batch's data and record the batch - batches.append(nextBatch) - } + private val batches = new ArrayBuffer[Array[Row]]() /** Returns all rows that are stored in this [[Sink]]. */ def allData: Seq[Row] = synchronized { - batches - .map(_.data) - .reduceOption(_ unionAll _) - .map(_.collect().toSeq) - .getOrElse(Seq.empty) - } - - /** - * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the - * corresponding point in the input. This function can be used when testing to simulate data - * that has been lost due to buffering. - */ - def dropBatches(num: Int): Unit = synchronized { - batches.dropRight(num) + batches.flatten } def toDebugString: String = synchronized { - batches.map { b => - val dataStr = try b.data.collect().mkString(" ") catch { + batches.zipWithIndex.map { case (b, i) => + val dataStr = try b.mkString(" ") catch { case NonFatal(e) => "[Error converting to string]" } - s"${b.end}: $dataStr" + s"$i: $dataStr" }.mkString("\n") } + + override def addBatch(batchId: Long, data: DataFrame): Unit = { + if (batchId == batches.size) { + logDebug(s"Committing batch $batchId") + batches.append(data.collect()) + } else { + logDebug(s"Skipping already committed batch: $batchId") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 70d1a8b071..fd1d77f514 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -524,6 +524,11 @@ object SQLConf { doc = "When true, the planner will try to find out duplicated exchanges and re-use them.", isPublic = false) + val CHECKPOINT_LOCATION = stringConf("spark.sql.streaming.checkpointLocation", + defaultValue = None, + doc = "The default location for storing checkpoint data for continuously executing queries.", + isPublic = true) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -554,6 +559,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin /** ************************ Spark SQL Params/Hints ******************* */ + def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) + def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) def useCompression: Boolean = getConf(COMPRESS_CACHED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 81078dc6a0..f356cde9cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.util.Utils /** * A framework for implementing tests for streaming queries and sources. @@ -64,6 +65,12 @@ import org.apache.spark.sql.execution.streaming._ */ trait StreamTest extends QueryTest with Timeouts { + implicit class RichContinuousQuery(cq: ContinuousQuery) { + def stopQuietly(): Unit = quietly { + cq.stop() + } + } + implicit class RichSource(s: Source) { def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) @@ -126,8 +133,6 @@ trait StreamTest extends QueryTest with Timeouts { override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}" } - case class DropBatches(num: Int) extends StreamAction - /** Stops the stream. It must currently be running. */ case object StopStream extends StreamAction with StreamMustBeRunning @@ -202,7 +207,7 @@ trait StreamTest extends QueryTest with Timeouts { }.mkString("\n") def currentOffsets = - if (currentStream != null) currentStream.streamProgress.toString else "not started" + if (currentStream != null) currentStream.committedOffsets.toString else "not started" def threadState = if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" @@ -266,6 +271,7 @@ trait StreamTest extends QueryTest with Timeouts { } val testThread = Thread.currentThread() + val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath try { startedTest.foreach { action => @@ -276,7 +282,7 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = sqlContext .streams - .startQuery(StreamExecution.nextName, stream, sink) + .startQuery(StreamExecution.nextName, metadataRoot, stream, sink) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { @@ -308,10 +314,6 @@ trait StreamTest extends QueryTest with Timeouts { currentStream = null } - case DropBatches(num) => - verify(currentStream == null, "dropping batches while running leads to corruption") - sink.dropBatches(num) - case ef: ExpectFailure[_] => verify(currentStream != null, "can not expect failure when stream is not running") try failAfter(streamingTimeout) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 45e824ad63..54ce98d195 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{ContinuousQuery, Dataset, StreamTest} import org.apache.spark.sql.execution.streaming.{MemorySink, MemoryStream, StreamExecution, StreamingRelation} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { @@ -235,9 +236,14 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF + val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath query = sqlContext .streams - .startQuery(StreamExecution.nextName, df, new MemorySink(df.schema)) + .startQuery( + StreamExecution.nextName, + metadataRoot, + df, + new MemorySink(df.schema)) .asInstanceOf[StreamExecution] } catch { case NonFatal(e) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 84ed017a9d..3be0ea481d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -54,7 +54,8 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery( - q => q.exception.get.startOffset.get === q.streamProgress.toCompositeOffset(Seq(inputData)), + q => + q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), "incorrect start offset on exception") ) } @@ -68,19 +69,19 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), AssertOnQuery(_.sourceStatuses(0).offset === None), AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offset === None), + AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)), AddData(inputData, 1, 2), CheckAnswer(6, 3), AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))), + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))), AddData(inputData, 1, 2), CheckAnswer(6, 3, 6, 3), AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))), + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))), AddData(inputData, 0), ExpectFailure[SparkException], AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))), - AssertOnQuery(_.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(1)))) + AssertOnQuery(_.sinkStatus.offset === CompositeOffset.fill(LongOffset(1))) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index 0878277811..e485aa837b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.streaming.test import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, ContinuousQuery, SQLContext, StreamTest} -import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source} +import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.Utils object LastOptions { var parameters: Map[String, String] = null @@ -41,8 +42,15 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { LastOptions.parameters = parameters LastOptions.schema = schema new Source { - override def getNextBatch(start: Option[Offset]): Option[Batch] = None override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import sqlContext.implicits._ + + Seq[Int]().toDS().toDF() + } } } @@ -53,8 +61,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { LastOptions.parameters = parameters LastOptions.partitionColumns = partitionColumns new Sink { - override def addBatch(batch: Batch): Unit = {} - override def currentOffset: Option[Offset] = None + override def addBatch(batchId: Long, data: DataFrame): Unit = {} } } } @@ -62,8 +69,10 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ + private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath + after { - sqlContext.streams.active.foreach(_.stop()) + sqlContext.streams.active.foreach(_.stopQuietly()) } test("resolve default source") { @@ -72,8 +81,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() } test("resolve full class") { @@ -82,8 +92,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream() .write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() } test("options") { @@ -108,8 +119,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("opt1", "1") .options(Map("opt2" -> "2")) .options(map) + .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt2") == "2") @@ -123,38 +135,43 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Nil) df.write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .partitionBy("a") .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Seq("a")) withSQLConf("spark.sql.caseSensitive" -> "false") { df.write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .partitionBy("A") .startStream() - .stop() + .stopQuietly() assert(LastOptions.partitionColumns == Seq("a")) } intercept[AnalysisException] { df.write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .partitionBy("b") .startStream() - .stop() + .stopQuietly() } } test("stream paths") { val df = sqlContext.read .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .stream("/test") assert(LastOptions.parameters("path") == "/test") @@ -163,8 +180,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B df.write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stop() + .stopQuietly() assert(LastOptions.parameters("path") == "/test") } @@ -187,8 +205,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("intOpt", 56) .option("boolOpt", false) .option("doubleOpt", 6.7) + .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stop() + .stopQuietly() assert(LastOptions.parameters("intOpt") == "56") assert(LastOptions.parameters("boolOpt") == "false") @@ -204,6 +223,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .queryName(name) .startStream() } @@ -215,6 +235,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .stream("/test") .write .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) .startStream() } @@ -248,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B } // Should be able to start query with that name after stopping the previous query - q1.stop() + q1.stopQuietly() val q5 = startQueryWithName("name") assert(activeStreamNames.contains("name")) - sqlContext.streams.active.foreach(_.stop()) + sqlContext.streams.active.foreach(_.stopQuietly()) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 4c18e38db8..89de15acf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -318,16 +318,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("fault tolerance") { - def assertBatch(batch1: Option[Batch], batch2: Option[Batch]): Unit = { - (batch1, batch2) match { - case (Some(b1), Some(b2)) => - assert(b1.end === b2.end) - assert(b1.data.as[String].collect() === b2.data.as[String].collect()) - case (None, None) => - case _ => fail(s"batch ($batch1) is not equal to batch ($batch2)") - } - } - val src = Utils.createTempDir("streaming.src") val tmp = Utils.createTempDir("streaming.tmp") @@ -345,14 +335,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") ) - val textSource2 = createFileStreamSource("text", src.getCanonicalPath) - assert(textSource2.currentOffset === textSource.currentOffset) - assertBatch(textSource2.getNextBatch(None), textSource.getNextBatch(None)) - for (f <- 0L to textSource.currentOffset.offset) { - val offset = LongOffset(f) - assertBatch(textSource2.getNextBatch(Some(offset)), textSource.getNextBatch(Some(offset))) - } - Utils.deleteRecursively(src) Utils.deleteRecursively(tmp) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index 52783281ab..d04783ecac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -61,7 +61,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with // The source and sink offsets must be None as this must be called before the // batches have started assert(status.sourceStatuses(0).offset === None) - assert(status.sinkStatus.offset === None) + assert(status.sinkStatus.offset === CompositeOffset(None :: Nil)) // No progress events or termination events assert(listener.progressStatuses.isEmpty) @@ -78,7 +78,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with assert(status != null) assert(status.active == true) assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) // No termination events assert(listener.terminationStatus === null) @@ -92,7 +92,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with assert(status.active === false) // must be inactive by the time onQueryTerm is called assert(status.sourceStatuses(0).offset === Some(LongOffset(0))) - assert(status.sinkStatus.offset === Some(CompositeOffset.fill(LongOffset(0)))) + assert(status.sinkStatus.offset === CompositeOffset.fill(LongOffset(0))) } listener.checkAsyncErrors() } -- cgit v1.2.3