diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 0f91e59e04..3820968324 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -18,15 +18,16 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext} -import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.types.StructType object MemoryStream { @@ -45,10 +46,13 @@ object MemoryStream { case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) extends Source with Logging { protected val encoder = encoderFor[A] - protected val logicalPlan = StreamingRelation(this) + protected val logicalPlan = StreamingExecutionRelation(this) protected val output = logicalPlan.output + + @GuardedBy("this") protected val batches = new ArrayBuffer[Dataset[A]] + @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) def schema: StructType = encoder.schema @@ -67,10 +71,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) def addData(data: TraversableOnce[A]): Offset = { import sqlContext.implicits._ + val ds = data.toVector.toDS() + logDebug(s"Adding ds: $ds") this.synchronized { currentOffset = currentOffset + 1 - val ds = data.toVector.toDS() - logDebug(s"Adding ds: $ds") batches.append(ds) currentOffset } @@ -78,10 +82,12 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${output.mkString(",")}]" - override def getOffset: Option[Offset] = if (batches.isEmpty) { - None - } else { - Some(currentOffset) + override def getOffset: Option[Offset] = synchronized { + if (batches.isEmpty) { + None + } else { + Some(currentOffset) + } } /** @@ -91,7 +97,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) val startOrdinal = start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 - val newBlocks = batches.slice(startOrdinal, endOrdinal) + val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) } logDebug( s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") @@ -108,8 +114,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit * tests and does not provide durability. */ -class MemorySink(schema: StructType) extends Sink with Logging { +class MemorySink(val schema: StructType) extends Sink with Logging { /** An order list of batches that have been written to this [[Sink]]. */ + @GuardedBy("this") private val batches = new ArrayBuffer[Array[Row]]() /** Returns all rows that are stored in this [[Sink]]. */ @@ -117,6 +124,8 @@ class MemorySink(schema: StructType) extends Sink with Logging { batches.flatten } + def lastBatch: Seq[Row] = synchronized { batches.last } + def toDebugString: String = synchronized { batches.zipWithIndex.map { case (b, i) => val dataStr = try b.mkString(" ") catch { @@ -126,7 +135,7 @@ class MemorySink(schema: StructType) extends Sink with Logging { }.mkString("\n") } - override def addBatch(batchId: Long, data: DataFrame): Unit = { + override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized { if (batchId == batches.size) { logDebug(s"Committing batch $batchId") batches.append(data.collect()) @@ -136,3 +145,9 @@ class MemorySink(schema: StructType) extends Sink with Logging { } } +/** + * Used to query the data that has been written into a [[MemorySink]]. + */ +case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode { + def this(sink: MemorySink) = this(sink, sink.schema.toAttributes) +} |