aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-01-13 13:01:27 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-13 13:01:27 -0800
commit6463e0b9e8067cce70602c5c9006a2546856a9d6 (patch)
tree1cd9f4f118659296334ad3c2278dfb24eb1144ac /sql/core
parent8ead999fd627b12837fb2f082a0e76e9d121d269 (diff)
downloadspark-6463e0b9e8067cce70602c5c9006a2546856a9d6.tar.gz
spark-6463e0b9e8067cce70602c5c9006a2546856a9d6.tar.bz2
spark-6463e0b9e8067cce70602c5c9006a2546856a9d6.zip
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
With changes in this PR, users can persist metadata of tables created based on the data source API in metastore through DDLs. Author: Yin Huai <yhuai@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #3960 from yhuai/persistantTablesWithSchema2 and squashes the following commits: 069c235 [Yin Huai] Make exception messages user friendly. c07cbc6 [Yin Huai] Get the location of test file in a correct way. 4456e98 [Yin Huai] Test data. 5315dfc [Yin Huai] rxin's comments. 7fc4b56 [Yin Huai] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on the data source API. aeaf4b3 [Yin Huai] Add comments. 06f9b0c [Yin Huai] Revert unnecessary changes. feb88aa [Yin Huai] Merge remote-tracking branch 'apache/master' into persistantTablesWithSchema2 172db80 [Yin Huai] Fix unit test. 49bf1ac [Yin Huai] Unit tests. 8f8f1a1 [Yin Huai] [SPARK-4574][SQL] Adding support for defining schema in foreign DDL commands. #3431 f47fda1 [Yin Huai] Unit tests. 2b59723 [Michael Armbrust] Set external when creating tables c00bb1b [Michael Armbrust] Don't use reflection to read options 1ea6e7b [Michael Armbrust] Don't fail when trying to uncache a table that doesn't exist 6edc710 [Michael Armbrust] Add tests. d7da491 [Michael Armbrust] First draft of persistent tables.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala53
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala30
6 files changed, 94 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6c575dd727..e7021cc336 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -330,6 +330,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
DataSourceStrategy ::
+ DDLStrategy ::
TakeOrdered ::
HashAggregation ::
LeftSemiJoin ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 99b6611d3b..d91b1fbc69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
import org.apache.spark.sql.{SQLContext, Strategy, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
@@ -310,4 +311,17 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}
+
+ object DDLStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
+ ExecutedCommand(
+ CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil
+
+ case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+ sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+
+ case _ => Nil
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 0d765c4c92..df8e616151 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -177,7 +177,6 @@ case class DescribeCommand(
override val output: Seq[Attribute]) extends RunnableCommand {
override def run(sqlContext: SQLContext) = {
- Row("# Registered as a temporary table", null, null) +:
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index fe2c4d8436..f8741e0082 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -92,21 +92,21 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable
/**
- * `CREATE TEMPORARY TABLE avroTable
+ * `CREATE [TEMPORARY] TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
- * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
+ * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
(
- CREATE ~ TEMPORARY ~ TABLE ~> ident
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
- case tableName ~ columns ~ provider ~ opts =>
+ case temp ~ tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
- CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
+ CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
}
)
@@ -175,13 +175,12 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
primitiveType
}
-private[sql] case class CreateTableUsing(
- tableName: String,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- options: Map[String, String]) extends RunnableCommand {
-
- def run(sqlContext: SQLContext) = {
+object ResolvedDataSource {
+ def apply(
+ sqlContext: SQLContext,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]): ResolvedDataSource = {
val loader = Utils.getContextOrSparkClassLoader
val clazz: Class[_] = try loader.loadClass(provider) catch {
case cnf: java.lang.ClassNotFoundException =>
@@ -199,22 +198,44 @@ private[sql] case class CreateTableUsing(
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
- sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
+ sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
}
}
case None => {
clazz.newInstance match {
- case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+ case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
- sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
+ sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
}
}
}
- sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
+ new ResolvedDataSource(clazz, relation)
+ }
+}
+
+private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+
+private[sql] case class CreateTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ temporary: Boolean,
+ options: Map[String, String]) extends Command
+
+private [sql] case class CreateTempTableUsing(
+ tableName: String,
+ userSpecifiedSchema: Option[StructType],
+ provider: String,
+ options: Map[String, String]) extends RunnableCommand {
+
+ def run(sqlContext: SQLContext) = {
+ val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
+
+ sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
Seq.empty
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 990f7e0e74..2a7be23e37 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
- * Spark SQL is given a DDL operation with a USING clause specified, this interface is used to
- * pass in the parameters specified by a user.
+ * Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
+ * RelationProvider), this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
@@ -46,10 +46,10 @@ trait RelationProvider {
/**
* ::DeveloperApi::
- * Implemented by objects that produce relations for a specific kind of data source. When
- * Spark SQL is given a DDL operation with
- * 1. USING clause: to specify the implemented SchemaRelationProvider
- * 2. User defined schema: users can define schema optionally when create table
+ * Implemented by objects that produce relations for a specific kind of data source
+ * with a given schema. When Spark SQL is given a DDL operation with a USING clause specified (
+ * to specify the implemented SchemaRelationProvider) and a user defined schema, this interface
+ * is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
@@ -57,6 +57,11 @@ trait RelationProvider {
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
+ *
+ * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that
+ * users need to provide a schema when using a SchemaRelationProvider.
+ * A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]]
+ * if it can support both schema inference and user-specified schemas.
*/
@DeveloperApi
trait SchemaRelationProvider {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index 605190f5ae..a1d2468b25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest {
sql("SELECT * FROM oneToTenDef"),
(1 to 10).map(Row(_)).toSeq)
}
+
+ test("exceptions") {
+ // Make sure we do throw correct exception when users use a relation provider that
+ // only implements the RelationProvier or the SchemaRelationProvider.
+ val schemaNotAllowed = intercept[Exception] {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE relationProvierWithSchema (i int)
+ |USING org.apache.spark.sql.sources.SimpleScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
+
+ val schemaNeeded = intercept[Exception] {
+ sql(
+ """
+ |CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema
+ |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+ |OPTIONS (
+ | From '1',
+ | To '10'
+ |)
+ """.stripMargin)
+ }
+ assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
+ }
}