diff options
author | Michael Armbrust <michael@databricks.com> | 2014-11-02 15:08:35 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-02 15:08:35 -0800 |
commit | 9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2 (patch) | |
tree | 285005822bdff25308e471738bda5e5e157cd9b8 /sql/catalyst | |
parent | f0a4b630abf0766cc0c41e682691e0d435caca04 (diff) | |
download | spark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.tar.gz spark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.tar.bz2 spark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.zip |
[SPARK-3247][SQL] An API for adding data sources to Spark SQL
This PR introduces a new set of APIs to Spark SQL to allow other developers to add support for reading data from new sources in `org.apache.spark.sql.sources`.
New sources must implement the interface `BaseRelation`, which is responsible for describing the schema of the data. BaseRelations have three `Scan` subclasses, which are responsible for producing an RDD containing row objects. The [various Scan interfaces](https://github.com/marmbrus/spark/blob/foreign/sql/core/src/main/scala/org/apache/spark/sql/sources/package.scala#L50) allow for optimizations such as column pruning and filter push down, when the underlying data source can handle these operations.
By implementing a class that inherits from RelationProvider these data sources can be accessed using using pure SQL. I've used the functionality to update the JSON support so it can now be used in this way as follows:
```sql
CREATE TEMPORARY TABLE jsonTableSQL
USING org.apache.spark.sql.json
OPTIONS (
path '/home/michael/data.json'
)
```
Further example usage can be found in the test cases: https://github.com/marmbrus/spark/tree/foreign/sql/core/src/test/scala/org/apache/spark/sql/sources
There is also a library that uses this new API to read avro data available here:
https://github.com/marmbrus/sql-avro
Author: Michael Armbrust <michael@databricks.com>
Closes #2475 from marmbrus/foreign and squashes the following commits:
1ed6010 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
ab2c31f [Michael Armbrust] fix test
1d41bb5 [Michael Armbrust] unify argument names
5b47901 [Michael Armbrust] Remove sealed, more filter types
fab154a [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
e3e690e [Michael Armbrust] Add hook for extraStrategies
a70d602 [Michael Armbrust] Fix style, more tests, FilteredSuite => PrunedFilteredSuite
70da6d9 [Michael Armbrust] Modify API to ease binary compatibility and interop with Java
7d948ae [Michael Armbrust] Fix equality of AttributeReference.
5545491 [Michael Armbrust] Address comments
5031ac3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign
22963ef [Michael Armbrust] package objects compile wierdly...
b069146 [Michael Armbrust] traits => abstract classes
34f836a [Michael Armbrust] Make @DeveloperApi
0d74bcf [Michael Armbrust] Add documention on object life cycle
3e06776 [Michael Armbrust] remove line wraps
de3b68c [Michael Armbrust] Remove empty file
360cb30 [Michael Armbrust] style and java api
2957875 [Michael Armbrust] add override
0fd3a07 [Michael Armbrust] Draft of data sources API
Diffstat (limited to 'sql/catalyst')
4 files changed, 16 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 3310566087..fc90a54a58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -134,7 +134,7 @@ case class AttributeReference( val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] { override def equals(other: Any) = other match { - case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType + case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType case _ => false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala index bdd07bbeb2..a38079ced3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql +/** + * Catalyst is a library for manipulating relational query plans. All classes in catalyst are + * considered an internal API to Spark SQL and are subject to change between minor releases. + */ package object catalyst { /** * A JVM-global lock that should be used to prevent thread safety issues when using things in diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 5839c9f7c4..51b5699aff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -22,6 +22,15 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode /** + * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can + * be used for execution. If this strategy does not apply to the give logical operation then an + * empty list should be returned. + */ +abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + def apply(plan: LogicalPlan): Seq[PhysicalPlan] +} + +/** * Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans. * Child classes are responsible for specifying a list of [[Strategy]] objects that each of which * can return a list of possible physical plan options. If a given strategy is unable to plan all @@ -35,16 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode */ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ - def strategies: Seq[Strategy] - - /** - * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can - * be used for execution. If this strategy does not apply to the give logical operation then an - * empty list should be returned. - */ - abstract protected class Strategy extends Logging { - def apply(plan: LogicalPlan): Seq[PhysicalPlan] - } + def strategies: Seq[GenericStrategy[PhysicalPlan]] /** * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 8dda0b1828..d25f3a619d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -455,7 +455,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT case class StructField( name: String, dataType: DataType, - nullable: Boolean, + nullable: Boolean = true, metadata: Metadata = Metadata.empty) { private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = { |