aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-04-06 10:05:02 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-06 10:05:02 -0700
commit59236e5c5b9d24f90fcf8d09b23ae8b06355657e (patch)
tree77eadd5250a126cbaad7f175765c5643735aab73 /sql
parent5e64dab868be1a0d415fb6d6dd3463e7171fdd1a (diff)
downloadspark-59236e5c5b9d24f90fcf8d09b23ae8b06355657e.tar.gz
spark-59236e5c5b9d24f90fcf8d09b23ae8b06355657e.tar.bz2
spark-59236e5c5b9d24f90fcf8d09b23ae8b06355657e.zip
[SPARK-14288][SQL] Memory Sink for streaming
This PR exposes the internal testing `MemorySink` though the data source API. This will allow users to easily test streaming applications in the Spark shell or other local tests. Usage: ```scala inputStream.write .format("memory") .queryName("memStream") .startStream() // Now you can query the result of the stream here. sqlContext.table("memStream") ``` The most complicated part of the logic is choosing the checkpoint directory. There are a few requirements we are attempting to satisfy here: - when working in the shell locally, it should just work with no extra configuration. - when working on a cluster you should be able to make it easily create the checkpoint on a distributed file system so you can test aggregation (state checkpoints are also stored in this directory and must be accessible from workers). - it should be clear that you can't resume since the data is just in memory. The chosen algorithm proceeds as follows: - the user gives a checkpoint directory, use it - if the conf has a checkpoint location, use `$location/$queryName` - if neither, create a local directory - always check to make sure there are no offsets written to the directory Author: Michael Armbrust <michael@databricks.com> Closes #12119 from marmbrus/memorySink.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala79
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala82
5 files changed, 159 insertions, 18 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3332a997cd..54d250867f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelation
+import org.apache.spark.util.Utils
/**
* :: Experimental ::
@@ -275,23 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def startStream(): ContinuousQuery = {
- val dataSource =
- DataSource(
- df.sqlContext,
- className = source,
- options = extraOptions.toMap,
- partitionColumns = normalizedParCols.getOrElse(Nil))
-
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
- val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
- new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
- })
- df.sqlContext.sessionState.continuousQueryManager.startQuery(
- queryName,
- checkpointLocation,
- df,
- dataSource.createSink(),
- trigger)
+ if (source == "memory") {
+ val queryName =
+ extraOptions.getOrElse(
+ "queryName", throw new AnalysisException("queryName must be specified for memory sink"))
+ val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ val checkpointConfig: Option[String] =
+ df.sqlContext.conf.getConf(
+ SQLConf.CHECKPOINT_LOCATION,
+ None)
+
+ checkpointConfig.map { location =>
+ new Path(location, queryName).toUri.toString
+ }
+ }.getOrElse {
+ Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
+ } else {
+ checkpointPath.toUri.toString
+ }
+
+ val sink = new MemorySink(df.schema)
+ val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink))
+ resultDf.registerTempTable(queryName)
+ val continuousQuery = df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ sink,
+ trigger)
+ continuousQuery
+ } else {
+ val dataSource =
+ DataSource(
+ df.sqlContext,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
+
+ val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
+ val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+ new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
+ })
+ df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ dataSource.createSink(),
+ trigger)
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5f3128d8e4..d77aba7260 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
@@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescri
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, _}
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
+import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
@@ -332,6 +334,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case r: RunnableCommand => ExecutedCommand(r) :: Nil
+ case MemoryPlan(sink, output) =>
+ val encoder = RowEncoder(sink.schema)
+ LocalTableScan(output, sink.allData.map(r => encoder.toRow(r).copy())) :: Nil
+
case logical.Distinct(child) =>
throw new IllegalStateException(
"logical distinct operator should have been replaced by aggregate in the optimizer")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index b652530d7c..351ef404a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
import org.apache.spark.sql.catalyst.encoders.encoderFor
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.types.StructType
object MemoryStream {
@@ -136,3 +138,9 @@ class MemorySink(val schema: StructType) extends Sink with Logging {
}
}
+/**
+ * Used to query the data that has been written into a [[MemorySink]].
+ */
+case class MemoryPlan(sink: MemorySink, output: Seq[Attribute]) extends LeafNode {
+ def this(sink: MemorySink) = this(sink, sink.schema.toAttributes)
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 4e62fac919..48a077d0e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.MemoryPlan
abstract class QueryTest extends PlanTest {
@@ -200,6 +201,7 @@ abstract class QueryTest extends PlanTest {
logicalPlan.transform {
case _: ObjectOperator => return
case _: LogicalRelation => return
+ case _: MemoryPlan => return
}.transformAllExpressions {
case a: ImperativeAggregate => return
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
new file mode 100644
index 0000000000..5249aa28dd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Row, StreamTest}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class MemorySinkSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("registering as a table") {
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+
+ checkDataset(
+ sqlContext.table("memStream").as[Int],
+ 1, 2, 3)
+
+ input.addData(4, 5, 6)
+ query.processAllAvailable()
+ checkDataset(
+ sqlContext.table("memStream").as[Int],
+ 1, 2, 3, 4, 5, 6)
+
+ query.stop()
+ }
+
+ test("error when no name is specified") {
+ val error = intercept[AnalysisException] {
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .startStream()
+ }
+
+ assert(error.message contains "queryName must be specified")
+ }
+
+ test("error if attempting to resume specific checkpoint") {
+ val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
+
+ val input = MemoryStream[Int]
+ val query = input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .startStream()
+ input.addData(1, 2, 3)
+ query.processAllAvailable()
+ query.stop()
+
+ intercept[AnalysisException] {
+ input.toDF().write
+ .format("memory")
+ .queryName("memStream")
+ .option("checkpointLocation", location)
+ .startStream()
+ }
+ }
+}