aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala109
1 files changed, 92 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index c07bd0e7b7..54d250867f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelation
+import org.apache.spark.util.Utils
/**
* :: Experimental ::
@@ -78,6 +80,35 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
/**
+ * :: Experimental ::
+ * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run
+ * the query as fast as possible.
+ *
+ * Scala Example:
+ * {{{
+ * def.writer.trigger(ProcessingTime("10 seconds"))
+ *
+ * import scala.concurrent.duration._
+ * def.writer.trigger(ProcessingTime(10.seconds))
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ * def.writer.trigger(ProcessingTime.create("10 seconds"))
+ *
+ * import java.util.concurrent.TimeUnit
+ * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.0.0
+ */
+ @Experimental
+ def trigger(trigger: Trigger): DataFrameWriter = {
+ this.trigger = trigger
+ this
+ }
+
+ /**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 1.4.0
@@ -246,22 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @since 2.0.0
*/
def startStream(): ContinuousQuery = {
- val dataSource =
- DataSource(
- df.sqlContext,
- className = source,
- options = extraOptions.toMap,
- partitionColumns = normalizedParCols.getOrElse(Nil))
-
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
- val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
- new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
- })
- df.sqlContext.sessionState.continuousQueryManager.startQuery(
- queryName,
- checkpointLocation,
- df,
- dataSource.createSink())
+ if (source == "memory") {
+ val queryName =
+ extraOptions.getOrElse(
+ "queryName", throw new AnalysisException("queryName must be specified for memory sink"))
+ val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ val checkpointConfig: Option[String] =
+ df.sqlContext.conf.getConf(
+ SQLConf.CHECKPOINT_LOCATION,
+ None)
+
+ checkpointConfig.map { location =>
+ new Path(location, queryName).toUri.toString
+ }
+ }.getOrElse {
+ Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
+ } else {
+ checkpointPath.toUri.toString
+ }
+
+ val sink = new MemorySink(df.schema)
+ val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink))
+ resultDf.registerTempTable(queryName)
+ val continuousQuery = df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ sink,
+ trigger)
+ continuousQuery
+ } else {
+ val dataSource =
+ DataSource(
+ df.sqlContext,
+ className = source,
+ options = extraOptions.toMap,
+ partitionColumns = normalizedParCols.getOrElse(Nil))
+
+ val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
+ val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
+ new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString
+ })
+ df.sqlContext.sessionState.continuousQueryManager.startQuery(
+ queryName,
+ checkpointLocation,
+ df,
+ dataSource.createSink(),
+ trigger)
+ }
}
/**
@@ -552,6 +625,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
private var mode: SaveMode = SaveMode.ErrorIfExists
+ private var trigger: Trigger = ProcessingTime(0L)
+
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
private var partitioningColumns: Option[Seq[String]] = None