aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala41
1 files changed, 28 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 0f91e59e04..3820968324 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -18,15 +18,16 @@
package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
-import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
-import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.types.StructType
object MemoryStream {
@@ -45,10 +46,13 @@ object MemoryStream {
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
extends Source with Logging {
protected val encoder = encoderFor[A]
- protected val logicalPlan = StreamingRelation(this)
+ protected val logicalPlan = StreamingExecutionRelation(this)
protected val output = logicalPlan.output
+
+ @GuardedBy("this")
protected val batches = new ArrayBuffer[Dataset[A]]
+ @GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)
def schema: StructType = encoder.schema
@@ -67,10 +71,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
def addData(data: TraversableOnce[A]): Offset = {
import sqlContext.implicits._
+ val ds = data.toVector.toDS()
+ logDebug(s"Adding ds: $ds")
this.synchronized {
currentOffset = currentOffset + 1
- val ds = data.toVector.toDS()
- logDebug(s"Adding ds: $ds")
batches.append(ds)
currentOffset
}
@@ -78,10 +82,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
override def toString: String = s"MemoryStream[${output.mkString(",")}]"
- override def getOffset: Option[Offset] = if (batches.isEmpty) {
- None
- } else {
- Some(currentOffset)
+ override def getOffset: Option[Offset] = synchronized {
+ if (batches.isEmpty) {
+ None
+ } else {
+ Some(currentOffset)
+ }
}
/**
@@ -91,7 +97,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
- val newBlocks = batches.slice(startOrdinal, endOrdinal)
+ val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
logDebug(
s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -108,8 +114,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
* tests and does not provide durability.
*/
-class MemorySink(schema: StructType) extends Sink with Logging {
+class MemorySink(val schema: StructType) extends Sink with Logging {
/** An order list of batches that have been written to this [[Sink]]. */
+ @GuardedBy("this")
private val batches = new ArrayBuffer[Array[Row]]()
/** Returns all rows that are stored in this [[Sink]]. */
@@ -117,6 +124,8 @@ class MemorySink(schema: StructType) extends Sink with Logging {
batches.flatten
}
+ def lastBatch: Seq[Row] = synchronized { batches.last }
+
def toDebugString: String = synchronized {
batches.zipWithIndex.map { case (b, i) =>
val dataStr = try b.mkString(" ") catch {
@@ -126,7 +135,7 @@ class MemorySink(schema: StructType) extends Sink with Logging {
}.mkString("\n")
}
- override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
if (batchId == batches.size) {
logDebug(s"Committing batch $batchId")
batches.append(data.collect())
@@ -136,3 +145,9 @@ class MemorySink(schema: StructType) extends Sink with Logging {
}
}
+/**
+ * Used to query the data that has been written into a [[MemorySink]].
+ */
+case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
+ def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
+}