aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-02 15:08:35 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:08:35 -0800
commit9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2 (patch)
tree285005822bdff25308e471738bda5e5e157cd9b8 /sql/catalyst
parentf0a4b630abf0766cc0c41e682691e0d435caca04 (diff)
downloadspark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.tar.gz
spark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.tar.bz2
spark-9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2.zip
[SPARK-3247][SQL] An API for adding data sources to Spark SQL
This PR introduces a new set of APIs to Spark SQL to allow other developers to add support for reading data from new sources in `org.apache.spark.sql.sources`. New sources must implement the interface `BaseRelation`, which is responsible for describing the schema of the data. BaseRelations have three `Scan` subclasses, which are responsible for producing an RDD containing row objects. The [various Scan interfaces](https://github.com/marmbrus/spark/blob/foreign/sql/core/src/main/scala/org/apache/spark/sql/sources/package.scala#L50) allow for optimizations such as column pruning and filter push down, when the underlying data source can handle these operations. By implementing a class that inherits from RelationProvider these data sources can be accessed using using pure SQL. I've used the functionality to update the JSON support so it can now be used in this way as follows: ```sql CREATE TEMPORARY TABLE jsonTableSQL USING org.apache.spark.sql.json OPTIONS ( path '/home/michael/data.json' ) ``` Further example usage can be found in the test cases: https://github.com/marmbrus/spark/tree/foreign/sql/core/src/test/scala/org/apache/spark/sql/sources There is also a library that uses this new API to read avro data available here: https://github.com/marmbrus/sql-avro Author: Michael Armbrust <michael@databricks.com> Closes #2475 from marmbrus/foreign and squashes the following commits: 1ed6010 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign ab2c31f [Michael Armbrust] fix test 1d41bb5 [Michael Armbrust] unify argument names 5b47901 [Michael Armbrust] Remove sealed, more filter types fab154a [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign e3e690e [Michael Armbrust] Add hook for extraStrategies a70d602 [Michael Armbrust] Fix style, more tests, FilteredSuite => PrunedFilteredSuite 70da6d9 [Michael Armbrust] Modify API to ease binary compatibility and interop with Java 7d948ae [Michael Armbrust] Fix equality of AttributeReference. 5545491 [Michael Armbrust] Address comments 5031ac3 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into foreign 22963ef [Michael Armbrust] package objects compile wierdly... b069146 [Michael Armbrust] traits => abstract classes 34f836a [Michael Armbrust] Make @DeveloperApi 0d74bcf [Michael Armbrust] Add documention on object life cycle 3e06776 [Michael Armbrust] remove line wraps de3b68c [Michael Armbrust] Remove empty file 360cb30 [Michael Armbrust] style and java api 2957875 [Michael Armbrust] add override 0fd3a07 [Michael Armbrust] Draft of data sources API
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala2
4 files changed, 16 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 3310566087..fc90a54a58 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -134,7 +134,7 @@ case class AttributeReference(
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {
override def equals(other: Any) = other match {
- case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
+ case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType
case _ => false
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
index bdd07bbeb2..a38079ced3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
@@ -17,6 +17,10 @@
package org.apache.spark.sql
+/**
+ * Catalyst is a library for manipulating relational query plans. All classes in catalyst are
+ * considered an internal API to Spark SQL and are subject to change between minor releases.
+ */
package object catalyst {
/**
* A JVM-global lock that should be used to prevent thread safety issues when using things in
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 5839c9f7c4..51b5699aff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -22,6 +22,15 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
/**
+ * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
+ * be used for execution. If this strategy does not apply to the give logical operation then an
+ * empty list should be returned.
+ */
+abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
+ def apply(plan: LogicalPlan): Seq[PhysicalPlan]
+}
+
+/**
* Abstract class for transforming [[plans.logical.LogicalPlan LogicalPlan]]s into physical plans.
* Child classes are responsible for specifying a list of [[Strategy]] objects that each of which
* can return a list of possible physical plan options. If a given strategy is unable to plan all
@@ -35,16 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
*/
abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
- def strategies: Seq[Strategy]
-
- /**
- * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can
- * be used for execution. If this strategy does not apply to the give logical operation then an
- * empty list should be returned.
- */
- abstract protected class Strategy extends Logging {
- def apply(plan: LogicalPlan): Seq[PhysicalPlan]
- }
+ def strategies: Seq[GenericStrategy[PhysicalPlan]]
/**
* Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 8dda0b1828..d25f3a619d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -455,7 +455,7 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
case class StructField(
name: String,
dataType: DataType,
- nullable: Boolean,
+ nullable: Boolean = true,
metadata: Metadata = Metadata.empty) {
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {