diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 109 |
1 files changed, 92 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c07bd0e7b7..54d250867f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -29,8 +29,10 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource} import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils -import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.HadoopFsRelation +import org.apache.spark.util.Utils /** * :: Experimental :: @@ -78,6 +80,35 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * def.writer.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * def.writer.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * def.writer.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataFrameWriter = { + this.trigger = trigger + this + } + + /** * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. * * @since 1.4.0 @@ -246,22 +277,64 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 2.0.0 */ def startStream(): ContinuousQuery = { - val dataSource = - DataSource( - df.sqlContext, - className = source, - options = extraOptions.toMap, - partitionColumns = normalizedParCols.getOrElse(Nil)) - - val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) - val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { - new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString - }) - df.sqlContext.sessionState.continuousQueryManager.startQuery( - queryName, - checkpointLocation, - df, - dataSource.createSink()) + if (source == "memory") { + val queryName = + extraOptions.getOrElse( + "queryName", throw new AnalysisException("queryName must be specified for memory sink")) + val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified => + new Path(userSpecified).toUri.toString + }.orElse { + val checkpointConfig: Option[String] = + df.sqlContext.conf.getConf( + SQLConf.CHECKPOINT_LOCATION, + None) + + checkpointConfig.map { location => + new Path(location, queryName).toUri.toString + } + }.getOrElse { + Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath + } + + // If offsets have already been created, we trying to resume a query. + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.") + } else { + checkpointPath.toUri.toString + } + + val sink = new MemorySink(df.schema) + val resultDf = Dataset.ofRows(df.sqlContext, new MemoryPlan(sink)) + resultDf.registerTempTable(queryName) + val continuousQuery = df.sqlContext.sessionState.continuousQueryManager.startQuery( + queryName, + checkpointLocation, + df, + sink, + trigger) + continuousQuery + } else { + val dataSource = + DataSource( + df.sqlContext, + className = source, + options = extraOptions.toMap, + partitionColumns = normalizedParCols.getOrElse(Nil)) + + val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) + val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { + new Path(df.sqlContext.conf.checkpointLocation, queryName).toUri.toString + }) + df.sqlContext.sessionState.continuousQueryManager.startQuery( + queryName, + checkpointLocation, + df, + dataSource.createSink(), + trigger) + } } /** @@ -552,6 +625,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { private var mode: SaveMode = SaveMode.ErrorIfExists + private var trigger: Trigger = ProcessingTime(0L) + private var extraOptions = new scala.collection.mutable.HashMap[String, String] private var partitioningColumns: Option[Seq[String]] = None |