From 6bf692147c21dd74e91e2bd95845f11ef0a303e6 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 12 Apr 2016 10:46:28 -0700 Subject: [SPARK-14474][SQL] Move FileSource offset log into checkpointLocation ## What changes were proposed in this pull request? Now that we have a single location for storing checkpointed state. This PR just propagates the checkpoint location into FileStreamSource so that we don't have one random log off on its own. ## How was this patch tested? test("metadataPath should be in checkpointLocation") Author: Shixiong Zhu Closes #12247 from zsxwing/file-source-log-location. --- .../apache/spark/sql/ContinuousQueryManager.scala | 5 +- .../sql/execution/datasources/DataSource.scala | 62 ++++++++++++------ .../execution/streaming/StreamingRelation.scala | 4 +- .../org/apache/spark/sql/sources/interfaces.scala | 9 +++ .../sql/streaming/DataFrameReaderWriterSuite.scala | 73 ++++++++++++++++++++-- .../sql/streaming/FileStreamSourceSuite.scala | 10 +-- .../spark/sql/streaming/MemorySinkSuite.scala | 2 +- .../apache/spark/sql/streaming/StreamSuite.scala | 9 +++ 8 files changed, 141 insertions(+), 33 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index d7f71bd4b0..1343e81569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -178,10 +178,13 @@ class ContinuousQueryManager(sqlContext: SQLContext) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } + var nextSourceId = 0L val logicalPlan = df.logicalPlan.transform { case StreamingRelation(dataSource, _, output) => // Materialize source to avoid creating it in every batch - val source = dataSource.createSource() + val metadataPath = s"$checkpointLocation/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 // We still need to use the previous `output` instead of `source.schema` as attributes in // "df.logicalPlan" has already used attributes of the previous `output`. StreamingExecutionRelation(source, output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index f55cedb1b6..10fde152ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -123,36 +123,58 @@ case class DataSource( } } - /** Returns a source that can be used to continually read data. */ - def createSource(): Source = { + private def inferFileFormatSchema(format: FileFormat): StructType = { + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val allPaths = caseInsensitiveOptions.get("path") + val globbedPaths = allPaths.toSeq.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None) + userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException("Unable to infer schema. It must be specified manually.") + } + } + + /** Returns the name and schema of the source that can be used to continually read data. */ + def sourceSchema(): (String, StructType) = { providingClass.newInstance() match { case s: StreamSourceProvider => - s.createSource(sqlContext, userSpecifiedSchema, className, options) + s.sourceSchema(sqlContext, userSpecifiedSchema, className, options) case format: FileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) - val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") + (s"FileSource[$path]", inferFileFormatSchema(format)) + case _ => + throw new UnsupportedOperationException( + s"Data source $className does not support streamed reading") + } + } - val allPaths = caseInsensitiveOptions.get("path") - val globbedPaths = allPaths.toSeq.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray + /** Returns a source that can be used to continually read data. */ + def createSource(metadataPath: String): Source = { + providingClass.newInstance() match { + case s: StreamSourceProvider => + s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options) - val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None) - val dataSchema = userSpecifiedSchema.orElse { - format.inferSchema( - sqlContext, - caseInsensitiveOptions, - fileCatalog.allFiles()) - }.getOrElse { - throw new AnalysisException("Unable to infer schema. It must be specified manually.") - } + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + + val dataSchema = inferFileFormatSchema(format) def dataFrameBuilder(files: Array[String]): DataFrame = { Dataset.ofRows( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index f951dea735..d2872e49ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { - val source = dataSource.createSource() - StreamingRelation(dataSource, source.toString, source.schema.toAttributes) + val (name, schema) = dataSource.sourceSchema() + StreamingRelation(dataSource, name, schema.toAttributes) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 65b1f61349..bea243a3be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -129,8 +129,17 @@ trait SchemaRelationProvider { * Implemented by objects that can produce a streaming [[Source]] for a specific format or system. */ trait StreamSourceProvider { + + /** Returns the name and schema of the source that can be used to continually read data. */ + def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) + def createSource( sqlContext: SQLContext, + metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index 28c558208f..00efe21d39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ +import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ @@ -31,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils object LastOptions { + + var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) + var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil + + def clear(): Unit = { + parameters = null + schema = null + partitionColumns = null + reset(mockStreamSourceProvider) + reset(mockStreamSinkProvider) + } } /** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + LastOptions.parameters = parameters + LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters) + ("dummySource", fakeSchema) + } + override def createSource( sqlContext: SQLContext, + metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { LastOptions.parameters = parameters LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.createSource( + sqlContext, metadataPath, schema, providerName, parameters) new Source { - override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + override def schema: StructType = fakeSchema override def getOffset: Option[Offset] = Some(new LongOffset(0)) @@ -64,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { partitionColumns: Seq[String]): Sink = { LastOptions.parameters = parameters LastOptions.partitionColumns = partitionColumns + LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns) new Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = {} } @@ -117,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("opt2") == "2") assert(LastOptions.parameters("opt3") == "3") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") @@ -181,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("path") == "/test") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") @@ -204,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("boolOpt") == "false") assert(LastOptions.parameters("doubleOpt") == "6.7") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") .option("intOpt", 56) @@ -303,4 +333,39 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000)) } + + test("source metadataPath") { + LastOptions.clear() + + val checkpointLocation = newMetadataDir + + val df1 = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream() + + val df2 = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream() + + val q = df1.union(df2).write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation) + .trigger(ProcessingTime(10.seconds)) + .startStream() + q.stop() + + verify(LastOptions.mockStreamSourceProvider).createSource( + sqlContext, + checkpointLocation + "/sources/0", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + + verify(LastOptions.mockStreamSourceProvider).createSource( + sqlContext, + checkpointLocation + "/sources/1", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 09daa7f81a..73d1b1b1d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { format: String, path: String, schema: Option[StructType] = None): FileStreamSource = { + val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val reader = if (schema.isDefined) { sqlContext.read.format(format).schema(schema.get) @@ -72,7 +73,8 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext { reader.stream(path) .queryExecution.analyzed .collect { case StreamingRelation(dataSource, _, _) => - dataSource.createSource().asInstanceOf[FileStreamSource] + // There is only one source in our tests so just set sourceId to 0 + dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource] }.head } @@ -98,9 +100,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } df.queryExecution.analyzed .collect { case StreamingRelation(dataSource, _, _) => - dataSource.createSource().asInstanceOf[FileStreamSource] - }.head - .schema + dataSource.sourceSchema() + }.head._2 } test("FileStreamSource schema: no path") { @@ -340,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { Utils.deleteRecursively(src) Utils.deleteRecursively(tmp) } - } class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala index 5249aa28dd..1f28340545 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala @@ -59,7 +59,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext { } test("error if attempting to resume specific checkpoint") { - val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath + val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath val input = MemoryStream[Int] val query = input.toDF().write diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e4ea555526..2bd27c7efd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -115,8 +115,17 @@ class StreamSuite extends StreamTest with SharedSQLContext { */ class FakeDefaultSource extends StreamSourceProvider { + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema) + override def createSource( sqlContext: SQLContext, + metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { -- cgit v1.2.3