aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-14 19:21:12 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-14 19:21:12 -0700
commit17eec0a71ba8713c559d641e3f43a1be726b037c (patch)
tree6f2a6c5a7aef585ef58bb2d6fba4f63bc58f167a /sql/hive
parent992142b87ed5b507493e4f9fac3f72ba14fafbbc (diff)
downloadspark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.gz
spark-17eec0a71ba8713c559d641e3f43a1be726b037c.tar.bz2
spark-17eec0a71ba8713c559d641e3f43a1be726b037c.zip
[SPARK-13664][SQL] Add a strategy for planning partitioned and bucketed scans of files
This PR adds a new strategy, `FileSourceStrategy`, that can be used for planning scans of collections of files that might be partitioned or bucketed. Compared with the existing planning logic in `DataSourceStrategy` this version has the following desirable properties: - It removes the need to have `RDD`, `broadcastedHadoopConf` and other distributed concerns in the public API of `org.apache.spark.sql.sources.FileFormat` - Partition column appending is delegated to the format to avoid an extra copy / devectorization when appending partition columns - It minimizes the amount of data that is shipped to each executor (i.e. it does not send the whole list of files to every worker in the form of a hadoop conf) - it natively supports bucketing files into partitions, and thus does not require coalescing / creating a `UnionRDD` with the correct partitioning. - Small files are automatically coalesced into fewer tasks using an approximate bin-packing algorithm. Currently only a testing source is planned / tested using this strategy. In follow-up PRs we will port the existing formats to this API. A stub for `FileScanRDD` is also added, but most methods remain unimplemented. Other minor cleanups: - partition pruning is pushed into `FileCatalog` so both the new and old code paths can use this logic. This will also allow future implementations to use indexes or other tricks (i.e. a MySQL metastore) - The partitions from the `FileCatalog` now propagate information about file sizes all the way up to the planner so we can intelligently spread files out. - `Array` -> `Seq` in some internal APIs to avoid unnecessary `toArray` calls - Rename `Partition` to `PartitionDirectory` to differentiate partitions used earlier in pruning from those where we have already enumerated the files and their sizes. Author: Michael Armbrust <michael@databricks.com> Closes #11646 from marmbrus/fileStrategy.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala4
3 files changed, 9 insertions, 8 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8f6cd66f1f..c70510b483 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -41,11 +41,11 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.{datasources, FileRelation}
-import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, _}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog}
import org.apache.spark.sql.types._
private[hive] case class HiveSerDe(
@@ -469,7 +469,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
- PartitionSpec(StructType(Nil), Array.empty[datasources.Partition])
+ PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory])
}
if (useCached) {
@@ -499,7 +499,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte
val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
})
- ParquetPartition(values, location)
+ PartitionDirectory(values, location)
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
@@ -753,7 +753,7 @@ class MetaStoreFileCatalog(
hive: HiveContext,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
- extends HDFSFileCatalog(hive, Map.empty, paths) {
+ extends HDFSFileCatalog(hive, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) {
override def getStatus(path: Path): Array[FileStatus] = {
@@ -761,7 +761,7 @@ class MetaStoreFileCatalog(
fs.listStatus(path)
}
- override def partitionSpec(schema: Option[StructType]): PartitionSpec = partitionSpecFromHive
+ override def partitionSpec(): PartitionSpec = partitionSpecFromHive
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 614f9e05d7..cbb6333336 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -79,6 +79,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
override def strategies: Seq[Strategy] = {
ctx.experimental.extraStrategies ++ Seq(
+ FileSourceStrategy,
DataSourceStrategy,
HiveCommandStrategy(ctx),
HiveDDLStrategy,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 8a39d95fc5..ae041c5137 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -111,7 +111,7 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
- inputFiles: Array[FileStatus],
+ inputFiles: Seq[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
@@ -221,7 +221,7 @@ private[orc] case class OrcTableScan(
@transient sqlContext: SQLContext,
attributes: Seq[Attribute],
filters: Array[Filter],
- @transient inputPaths: Array[FileStatus])
+ @transient inputPaths: Seq[FileStatus])
extends Logging
with HiveInspectors {