aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-04-12 10:46:28 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-12 10:46:28 -0700
commit6bf692147c21dd74e91e2bd95845f11ef0a303e6 (patch)
treefb7cf4acded554c3a57807296b011120f54667f6 /sql/core/src
parentda60b34d2f6eba19633e4f1b46504ce92cd6c179 (diff)
downloadspark-6bf692147c21dd74e91e2bd95845f11ef0a303e6.tar.gz
spark-6bf692147c21dd74e91e2bd95845f11ef0a303e6.tar.bz2
spark-6bf692147c21dd74e91e2bd95845f11ef0a303e6.zip
[SPARK-14474][SQL] Move FileSource offset log into checkpointLocation
## What changes were proposed in this pull request? Now that we have a single location for storing checkpointed state. This PR just propagates the checkpoint location into FileStreamSource so that we don't have one random log off on its own. ## How was this patch tested? test("metadataPath should be in checkpointLocation") Author: Shixiong Zhu <shixiong@databricks.com> Closes #12247 from zsxwing/file-source-log-location.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala62
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala9
8 files changed, 141 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
index d7f71bd4b0..1343e81569 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
@@ -178,10 +178,13 @@ class ContinuousQueryManager(sqlContext: SQLContext) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
+ var nextSourceId = 0L
val logicalPlan = df.logicalPlan.transform {
case StreamingRelation(dataSource, _, output) =>
// Materialize source to avoid creating it in every batch
- val source = dataSource.createSource()
+ val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
+ val source = dataSource.createSource(metadataPath)
+ nextSourceId += 1
// We still need to use the previous `output` instead of `source.schema` as attributes in
// "df.logicalPlan" has already used attributes of the previous `output`.
StreamingExecutionRelation(source, output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index f55cedb1b6..10fde152ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -123,36 +123,58 @@ case class DataSource(
}
}
- /** Returns a source that can be used to continually read data. */
- def createSource(): Source = {
+ private def inferFileFormatSchema(format: FileFormat): StructType = {
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val allPaths = caseInsensitiveOptions.get("path")
+ val globbedPaths = allPaths.toSeq.flatMap { path =>
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ SparkHadoopUtil.get.globPathIfNecessary(qualified)
+ }.toArray
+
+ val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
+ userSpecifiedSchema.orElse {
+ format.inferSchema(
+ sqlContext,
+ caseInsensitiveOptions,
+ fileCatalog.allFiles())
+ }.getOrElse {
+ throw new AnalysisException("Unable to infer schema. It must be specified manually.")
+ }
+ }
+
+ /** Returns the name and schema of the source that can be used to continually read data. */
+ def sourceSchema(): (String, StructType) = {
providingClass.newInstance() match {
case s: StreamSourceProvider =>
- s.createSource(sqlContext, userSpecifiedSchema, className, options)
+ s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
case format: FileFormat =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
- val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata")
+ (s"FileSource[$path]", inferFileFormatSchema(format))
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $className does not support streamed reading")
+ }
+ }
- val allPaths = caseInsensitiveOptions.get("path")
- val globbedPaths = allPaths.toSeq.flatMap { path =>
- val hdfsPath = new Path(path)
- val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- SparkHadoopUtil.get.globPathIfNecessary(qualified)
- }.toArray
+ /** Returns a source that can be used to continually read data. */
+ def createSource(metadataPath: String): Source = {
+ providingClass.newInstance() match {
+ case s: StreamSourceProvider =>
+ s.createSource(sqlContext, metadataPath, userSpecifiedSchema, className, options)
- val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, None)
- val dataSchema = userSpecifiedSchema.orElse {
- format.inferSchema(
- sqlContext,
- caseInsensitiveOptions,
- fileCatalog.allFiles())
- }.getOrElse {
- throw new AnalysisException("Unable to infer schema. It must be specified manually.")
- }
+ case format: FileFormat =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val path = caseInsensitiveOptions.getOrElse("path", {
+ throw new IllegalArgumentException("'path' is not specified")
+ })
+
+ val dataSchema = inferFileFormatSchema(format)
def dataFrameBuilder(files: Array[String]): DataFrame = {
Dataset.ofRows(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f951dea735..d2872e49ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
- val source = dataSource.createSource()
- StreamingRelation(dataSource, source.toString, source.schema.toAttributes)
+ val (name, schema) = dataSource.sourceSchema()
+ StreamingRelation(dataSource, name, schema.toAttributes)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 65b1f61349..bea243a3be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -129,8 +129,17 @@ trait SchemaRelationProvider {
* Implemented by objects that can produce a streaming [[Source]] for a specific format or system.
*/
trait StreamSourceProvider {
+
+ /** Returns the name and schema of the source that can be used to continually read data. */
+ def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType)
+
def createSource(
sqlContext: SQLContext,
+ metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index 28c558208f..00efe21d39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
+import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
@@ -31,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
object LastOptions {
+
+ var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
+ var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil
+
+ def clear(): Unit = {
+ parameters = null
+ schema = null
+ partitionColumns = null
+ reset(mockStreamSourceProvider)
+ reset(mockStreamSinkProvider)
+ }
}
/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = schema
+ LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters)
+ ("dummySource", fakeSchema)
+ }
+
override def createSource(
sqlContext: SQLContext,
+ metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
LastOptions.parameters = parameters
LastOptions.schema = schema
+ LastOptions.mockStreamSourceProvider.createSource(
+ sqlContext, metadataPath, schema, providerName, parameters)
new Source {
- override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+ override def schema: StructType = fakeSchema
override def getOffset: Option[Offset] = Some(new LongOffset(0))
@@ -64,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
partitionColumns: Seq[String]): Sink = {
LastOptions.parameters = parameters
LastOptions.partitionColumns = partitionColumns
+ LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {}
}
@@ -117,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("opt2") == "2")
assert(LastOptions.parameters("opt3") == "3")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
@@ -181,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("path") == "/test")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
@@ -204,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("boolOpt") == "false")
assert(LastOptions.parameters("doubleOpt") == "6.7")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
.option("intOpt", 56)
@@ -303,4 +333,39 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
}
+
+ test("source metadataPath") {
+ LastOptions.clear()
+
+ val checkpointLocation = newMetadataDir
+
+ val df1 = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ val df2 = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ val q = df1.union(df2).write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation)
+ .trigger(ProcessingTime(10.seconds))
+ .startStream()
+ q.stop()
+
+ verify(LastOptions.mockStreamSourceProvider).createSource(
+ sqlContext,
+ checkpointLocation + "/sources/0",
+ None,
+ "org.apache.spark.sql.streaming.test",
+ Map.empty)
+
+ verify(LastOptions.mockStreamSourceProvider).createSource(
+ sqlContext,
+ checkpointLocation + "/sources/1",
+ None,
+ "org.apache.spark.sql.streaming.test",
+ Map.empty)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 09daa7f81a..73d1b1b1d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -63,6 +63,7 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
format: String,
path: String,
schema: Option[StructType] = None): FileStreamSource = {
+ val checkpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
val reader =
if (schema.isDefined) {
sqlContext.read.format(format).schema(schema.get)
@@ -72,7 +73,8 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
reader.stream(path)
.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
- dataSource.createSource().asInstanceOf[FileStreamSource]
+ // There is only one source in our tests so just set sourceId to 0
+ dataSource.createSource(s"$checkpointLocation/sources/0").asInstanceOf[FileStreamSource]
}.head
}
@@ -98,9 +100,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
df.queryExecution.analyzed
.collect { case StreamingRelation(dataSource, _, _) =>
- dataSource.createSource().asInstanceOf[FileStreamSource]
- }.head
- .schema
+ dataSource.sourceSchema()
+ }.head._2
}
test("FileStreamSource schema: no path") {
@@ -340,7 +341,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
Utils.deleteRecursively(src)
Utils.deleteRecursively(tmp)
}
-
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQLContext {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 5249aa28dd..1f28340545 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -59,7 +59,7 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
}
test("error if attempting to resume specific checkpoint") {
- val location = Utils.createTempDir("steaming.checkpoint").getCanonicalPath
+ val location = Utils.createTempDir(namePrefix = "steaming.checkpoint").getCanonicalPath
val input = MemoryStream[Int]
val query = input.toDF().write
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e4ea555526..2bd27c7efd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -115,8 +115,17 @@ class StreamSuite extends StreamTest with SharedSQLContext {
*/
class FakeDefaultSource extends StreamSourceProvider {
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = ("fakeSource", fakeSchema)
+
override def createSource(
sqlContext: SQLContext,
+ metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {