aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-14 19:21:12 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-14 19:21:12 -0700
commit17eec0a71ba8713c559d641e3f43a1be726b037c (patch)
tree6f2a6c5a7aef585ef58bb2d6fba4f63bc58f167a /sql/catalyst
parent992142b87ed5b507493e4f9fac3f72ba14fafbbc (diff)
downloadspark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.gz
spark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.bz2
spark-17eec0a71ba8713c559d641e3f43a1be726b037c.zip
[SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files
This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed. Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties: - It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns in the public API of `org.apache.spark.sql.sources.FileFormat` - Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns - It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf) - it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning. - Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm. Currently only a testing source is planned / tested using this strategy. In follow-up PRs we will port the existing formats to this API. A stub for `FileScanRDD` is also added, but most methods remain unimplemented. Other minor cleanups: - partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic. This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore) - The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out. - `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls - Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes. Author: Michael Armbrust <michael@databricks.com> Closes #11646 from marmbrus/fileStrategy.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala18
2 files changed, 32 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 2c7c58e66b..35884139b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -17,8 +17,22 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.sql.catalyst.analysis._
+
private[spark] trait CatalystConf {
def caseSensitiveAnalysis: Boolean
+
+ /**
+ * Returns the [[Resolver]] for the current configuration, which can be used to determin if two
+ * identifiers are equal.
+ */
+ def resolver: Resolver = {
+ if (caseSensitiveAnalysis) {
+ caseSensitiveResolution
+ } else {
+ caseInsensitiveResolution
+ }
+ }
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index b32c7d0fcb..c8aadb2ed5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
+import org.apache.spark.sql.types.StructType
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
@@ -117,6 +118,23 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
override lazy val canonicalized: LogicalPlan = EliminateSubqueryAliases(this)
/**
+ * Resolves a given schema to concrete [[Attribute]] references in this query plan. This function
+ * should only be called on analyzed plans since it will throw [[AnalysisException]] for
+ * unresolved [[Attribute]]s.
+ */
+ def resolve(schema: StructType, resolver: Resolver): Seq[Attribute] = {
+ schema.map { field =>
+ resolveQuoted(field.name, resolver).map {
+ case a: AttributeReference => a
+ case other => sys.error(s"can not handle nested schema yet... plan $this")
+ }.getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve ${field.name} given [${output.map(_.name).mkString(", ")}]")
+ }
+ }
+ }
+
+ /**
* Optionally resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.