aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-15 17:20:01 -0800
committerReynold Xin <rxin@databricks.com>2016-01-15 17:20:01 -0800
commit3b5ccb12b8d33d99df0f206fecf00f51c2b88fdb (patch)
tree9be77df8147a125cadc46e9bc6da4641669b58da /sql/core
parent8dbbf3e75e70e98391b4a1705472caddd129945a (diff)
downloadspark-3b5ccb12b8d33d99df0f206fecf00f51c2b88fdb.tar.gz
spark-3b5ccb12b8d33d99df0f206fecf00f51c2b88fdb.tar.bz2
spark-3b5ccb12b8d33d99df0f206fecf00f51c2b88fdb.zip
[SPARK-12649][SQL] support reading bucketed table
This PR adds the support to read bucketed tables, and correctly populate `outputPartitioning`, so that we can avoid shuffle for some cases. TODO(follow-up PRs): * bucket pruning * avoid shuffle for bucketed table join when use any super-set of the bucketing key. (we should re-visit it after https://issues.apache.org/jira/browse/SPARK-12704 is fixed) * recognize hive bucketed table Author: Wenchen Fan <wenchen@databricks.com> Closes #10604 from cloud-fan/bucket-read.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala55
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala2
13 files changed, 111 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 8f852e5216..634c1bd473 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -109,6 +109,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
+ bucketSpec = None,
provider = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(resolved.relation))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 7976795ff5..4e3662724c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -422,6 +422,10 @@ private[spark] object SQLConf {
doc = "The maximum number of concurrent files to open before falling back on sorting when " +
"writing out files using dynamic partitioning.")
+ val BUCKETING_ENABLED = booleanConf("spark.sql.sources.bucketing.enabled",
+ defaultValue = Some(true),
+ doc = "When false, we will treat bucketed table as normal table")
+
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
@@ -590,6 +594,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
private[spark] def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
+ private[spark] def bucketingEnabled(): Boolean = getConf(SQLConf.BUCKETING_ENABLED)
+
// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 569a21feaa..92cfd5f841 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,11 +18,12 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
@@ -98,7 +99,8 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
- isUnsafeRow: Boolean = false)
+ isUnsafeRow: Boolean = false,
+ override val outputPartitioning: Partitioning = UnknownPartitioning(0))
extends LeafNode {
protected override def doExecute(): RDD[InternalRow] = {
@@ -130,6 +132,24 @@ private[sql] object PhysicalRDD {
metadata: Map[String, String] = Map.empty): PhysicalRDD = {
// All HadoopFsRelations output UnsafeRows
val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation]
- PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
+
+ val bucketSpec = relation match {
+ case r: HadoopFsRelation => r.getBucketSpec
+ case _ => None
+ }
+
+ def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+ throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+ s"(${output.map(_.name).mkString(", ")})")
+ }
+
+ bucketSpec.map { spec =>
+ val numBuckets = spec.numBuckets
+ val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+ val partitioning = HashPartitioning(bucketColumns, numBuckets)
+ PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, partitioning)
+ }.getOrElse {
+ PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 7a8691e7cb..314c957d57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -125,7 +125,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|Actual: ${partitionColumns.mkString(", ")}
""".stripMargin)
- val writerContainer = if (partitionColumns.isEmpty && relation.bucketSpec.isEmpty) {
+ val writerContainer = if (partitionColumns.isEmpty && relation.getBucketSpec.isEmpty) {
new DefaultWriterContainer(relation, job, isAppend)
} else {
val output = df.queryExecution.executedPlan.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index ece9b8a9a9..cc8dcf5930 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -97,6 +97,7 @@ object ResolvedDataSource extends Logging {
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val clazz: Class[_] = lookupDataSource(provider)
@@ -142,6 +143,7 @@ object ResolvedDataSource extends Logging {
paths,
Some(dataSchema),
maybePartitionsSchema,
+ bucketSpec,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
@@ -173,7 +175,7 @@ object ResolvedDataSource extends Logging {
SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
}
}
- dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
+ dataSource.createRelation(sqlContext, paths, None, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
throw new AnalysisException(
s"A schema needs to be specified when using $className.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index fc77529b7d..563fd9eefc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -311,7 +311,7 @@ private[sql] class DynamicPartitionWriterContainer(
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
- private val bucketSpec = relation.bucketSpec
+ private val bucketSpec = relation.getBucketSpec
private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap {
spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
index 9976829638..c7ecd6125d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -44,9 +44,7 @@ private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProv
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation =
- // TODO: throw exception here as we won't call this method during execution, after bucketed read
- // support is finished.
- createRelation(sqlContext, paths, dataSchema, partitionColumns, bucketSpec = None, parameters)
+ throw new UnsupportedOperationException("use the overload version with bucketSpec parameter")
}
private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFactory {
@@ -54,5 +52,20 @@ private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFact
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter =
- throw new UnsupportedOperationException("use bucket version")
+ throw new UnsupportedOperationException("use the overload version with bucketSpec parameter")
+}
+
+private[sql] object BucketingUtils {
+ // The file name of bucketed data should have 3 parts:
+ // 1. some other information in the head of file name, ends with `-`
+ // 2. bucket id part, some numbers
+ // 3. optional file extension part, in the tail of file name, starts with `.`
+ // An example of bucketed parquet file name with bucket id 3:
+ // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-00003.gz.parquet
+ private val bucketedFileName = """.*-(\d+)(?:\..*)?$""".r
+
+ def getBucketId(fileName: String): Option[Int] = fileName match {
+ case bucketedFileName(bucketId) => Some(bucketId.toInt)
+ case other => None
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 0897fcadbc..c3603936df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -91,7 +91,7 @@ case class CreateTempTableUsing(
def run(sqlContext: SQLContext): Seq[Row] = {
val resolved = ResolvedDataSource(
- sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
+ sqlContext, userSpecifiedSchema, Array.empty[String], bucketSpec = None, provider, options)
sqlContext.catalog.registerTable(
tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 8a6fa4aeeb..20c60b9c43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -57,7 +57,7 @@ class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegi
maybeDataSchema = dataSchema,
maybePartitionSpec = None,
userDefinedPartitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
+ maybeBucketSpec = bucketSpec,
paths = paths,
parameters = parameters)(sqlContext)
}
@@ -68,7 +68,7 @@ private[sql] class JSONRelation(
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val bucketSpec: Option[BucketSpec] = None,
+ override val maybeBucketSpec: Option[BucketSpec] = None,
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 991a5d5aef..30ddec686c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -112,7 +112,7 @@ private[sql] class ParquetRelation(
// This is for metastore conversion.
private val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
- override val bucketSpec: Option[BucketSpec],
+ override val maybeBucketSpec: Option[BucketSpec],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index dd3e66d8a9..9358c9c37b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -36,6 +36,7 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array(),
+ bucketSpec = None,
provider = u.tableIdentifier.database.get,
options = Map("path" -> u.tableIdentifier.table))
val plan = LogicalRelation(resolved.relation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 9f3607369c..7800776fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,13 +28,13 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
-import org.apache.spark.sql.execution.datasources.{BucketSpec, Partition, PartitioningUtils, PartitionSpec}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -458,7 +458,12 @@ abstract class HadoopFsRelation private[sql](
private var _partitionSpec: PartitionSpec = _
- private[sql] def bucketSpec: Option[BucketSpec] = None
+ private[this] var malformedBucketFile = false
+
+ private[sql] def maybeBucketSpec: Option[BucketSpec] = None
+
+ final private[sql] def getBucketSpec: Option[BucketSpec] =
+ maybeBucketSpec.filter(_ => sqlContext.conf.bucketingEnabled() && !malformedBucketFile)
private class FileStatusCache {
var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
@@ -664,6 +669,35 @@ abstract class HadoopFsRelation private[sql](
})
}
+ /**
+ * Groups the input files by bucket id, if bucketing is enabled and this data source is bucketed.
+ * Returns None if there exists any malformed bucket files.
+ */
+ private def groupBucketFiles(
+ files: Array[FileStatus]): Option[scala.collection.Map[Int, Array[FileStatus]]] = {
+ malformedBucketFile = false
+ if (getBucketSpec.isDefined) {
+ val groupedBucketFiles = mutable.HashMap.empty[Int, mutable.ArrayBuffer[FileStatus]]
+ var i = 0
+ while (!malformedBucketFile && i < files.length) {
+ val bucketId = BucketingUtils.getBucketId(files(i).getPath.getName)
+ if (bucketId.isEmpty) {
+ logError(s"File ${files(i).getPath} is expected to be a bucket file, but there is no " +
+ "bucket id information in file name. Fall back to non-bucketing mode.")
+ malformedBucketFile = true
+ } else {
+ val bucketFiles =
+ groupedBucketFiles.getOrElseUpdate(bucketId.get, mutable.ArrayBuffer.empty)
+ bucketFiles += files(i)
+ }
+ i += 1
+ }
+ if (malformedBucketFile) None else Some(groupedBucketFiles.mapValues(_.toArray))
+ } else {
+ None
+ }
+ }
+
final private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
@@ -683,7 +717,20 @@ abstract class HadoopFsRelation private[sql](
}
}
- buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
+ groupBucketFiles(inputStatuses).map { groupedBucketFiles =>
+ // For each bucket id, firstly we get all files belong to this bucket, by detecting bucket
+ // id from file name. Then read these files into a RDD(use one-partition empty RDD for empty
+ // bucket), and coalesce it to one partition. Finally union all bucket RDDs to one result.
+ val perBucketRows = (0 until maybeBucketSpec.get.numBuckets).map { bucketId =>
+ groupedBucketFiles.get(bucketId).map { inputStatuses =>
+ buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf).coalesce(1)
+ }.getOrElse(sqlContext.emptyResult)
+ }
+
+ new UnionRDD(sqlContext.sparkContext, perBucketRows)
+ }.getOrElse {
+ buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index e70eb2a060..8de8ba355e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1223,6 +1223,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
+ bucketSpec = None,
provider = classOf[DefaultSource].getCanonicalName,
options = Map("path" -> path))
@@ -1230,6 +1231,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array.empty[String],
+ bucketSpec = None,
provider = classOf[DefaultSource].getCanonicalName,
options = Map("path" -> path))
assert(d1 === d2)