aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-06 16:58:10 -0800
committerReynold Xin <rxin@databricks.com>2016-01-06 16:58:10 -0800
commit917d3fc069fb9ea1c1487119c9c12b373f4f9b77 (patch)
tree44041146c82d7be4270da08ac88afbba0d7c18c3 /sql/hive
parent6f7ba6409a39fd2e34865e3e7a84a3dd0b00d6a4 (diff)
downloadspark-917d3fc069fb9ea1c1487119c9c12b373f4f9b77.tar.gz
spark-917d3fc069fb9ea1c1487119c9c12b373f4f9b77.tar.bz2
spark-917d3fc069fb9ea1c1487119c9c12b373f4f9b77.zip
[SPARK-12539][SQL] support writing bucketed table
This PR adds bucket write support to Spark SQL. User can specify bucketing columns, numBuckets and sorting columns with or without partition columns. For example: ``` df.write.partitionBy("year").bucketBy(8, "country").sortBy("amount").saveAsTable("sales") ``` When bucketing is used, we will calculate bucket id for each record, and group the records by bucket id. For each group, we will create a file with bucket id in its name, and write data into it. For each bucket file, if sorting columns are specified, the data will be sorted before write. Note that there may be multiply files for one bucket, as the data is distributed. Currently we store the bucket metadata at hive metastore in a non-hive-compatible way. We use different bucketing hash function compared to hive, so we can't be compatible anyway. Limitations: * Can't write bucketed data without hive metastore. * Can't insert bucketed data into existing hive tables. Author: Wenchen Fan <wenchen@databricks.com> Closes #10498 from cloud-fan/bucket-write.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala1
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala169
6 files changed, 220 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 1616c45952..43d84d507b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.util.DataTypeParser
import org.apache.spark.sql.execution.{datasources, FileRelation}
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, _}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
@@ -211,6 +211,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
@@ -240,6 +241,25 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}
}
+ if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) {
+ val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get
+
+ tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString)
+ tableProperties.put("spark.sql.sources.schema.numBucketCols",
+ bucketColumnNames.length.toString)
+ bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.bucketCol.$index", bucketCol)
+ }
+
+ if (sortColumnNames.nonEmpty) {
+ tableProperties.put("spark.sql.sources.schema.numSortCols",
+ sortColumnNames.length.toString)
+ sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
+ tableProperties.put(s"spark.sql.sources.schema.sortCol.$index", sortCol)
+ }
+ }
+ }
+
if (userSpecifiedSchema.isEmpty && partitionColumns.length > 0) {
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
@@ -596,6 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
+ bucketSpec = None,
mode,
options = Map.empty[String, String],
child
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 0b4f5a0fd6..3687dd6f5a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -88,10 +88,9 @@ private[hive] trait HiveStrategies {
tableIdent, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)
ExecutedCommand(cmd) :: Nil
- case CreateTableUsingAsSelect(
- tableIdent, provider, false, partitionCols, mode, opts, query) =>
- val cmd =
- CreateMetastoreDataSourceAsSelect(tableIdent, provider, partitionCols, mode, opts, query)
+ case c: CreateTableUsingAsSelect =>
+ val cmd = CreateMetastoreDataSourceAsSelect(c.tableIdent, c.provider, c.partitionColumns,
+ c.bucketSpec, c.mode, c.options, c.child)
ExecutedCommand(cmd) :: Nil
case _ => Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 94210a5394..612f01cda8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -151,6 +151,7 @@ case class CreateMetastoreDataSource(
tableIdent,
userSpecifiedSchema,
Array.empty[String],
+ bucketSpec = None,
provider,
optionsWithPath,
isExternal)
@@ -164,6 +165,7 @@ case class CreateMetastoreDataSourceAsSelect(
tableIdent: TableIdentifier,
provider: String,
partitionColumns: Array[String],
+ bucketSpec: Option[BucketSpec],
mode: SaveMode,
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {
@@ -254,8 +256,14 @@ case class CreateMetastoreDataSourceAsSelect(
}
// Create the relation based on the data of df.
- val resolved =
- ResolvedDataSource(sqlContext, provider, partitionColumns, mode, optionsWithPath, df)
+ val resolved = ResolvedDataSource(
+ sqlContext,
+ provider,
+ partitionColumns,
+ bucketSpec,
+ mode,
+ optionsWithPath,
+ df)
if (createMetastoreTable) {
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -265,6 +273,7 @@ case class CreateMetastoreDataSourceAsSelect(
tableIdent,
Some(resolved.relation.schema),
partitionColumns,
+ bucketSpec,
provider,
optionsWithPath,
isExternal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 3538d642d5..14fa152c23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -37,13 +37,13 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
+private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {
override def shortName(): String = "orc"
@@ -52,17 +52,19 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourc
paths: Array[String],
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
+ bucketSpec: Option[BucketSpec],
parameters: Map[String, String]): HadoopFsRelation = {
assert(
sqlContext.isInstanceOf[HiveContext],
"The ORC data source can only be used with HiveContext.")
- new OrcRelation(paths, dataSchema, None, partitionColumns, parameters)(sqlContext)
+ new OrcRelation(paths, dataSchema, None, partitionColumns, bucketSpec, parameters)(sqlContext)
}
}
private[orc] class OrcOutputWriter(
path: String,
+ bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter with HiveInspectors {
@@ -101,7 +103,8 @@ private[orc] class OrcOutputWriter(
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
- val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
+ val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+ val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
@@ -153,6 +156,7 @@ private[sql] class OrcRelation(
maybeDataSchema: Option[StructType],
maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
+ override val bucketSpec: Option[BucketSpec],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters)
@@ -169,6 +173,7 @@ private[sql] class OrcRelation(
maybeDataSchema,
maybePartitionSpec,
maybePartitionSpec.map(_.partitionColumns),
+ None,
parameters)(sqlContext)
}
@@ -205,7 +210,7 @@ private[sql] class OrcRelation(
OrcTableScan(output, this, filters, inputPaths).execute()
}
- override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+ override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
@@ -216,12 +221,13 @@ private[sql] class OrcRelation(
classOf[MapRedOutputFormat[_, _]])
}
- new OutputWriterFactory {
+ new BucketedOutputWriterFactory {
override def newInstance(
path: String,
+ bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new OrcOutputWriter(path, dataSchema, context)
+ new OrcOutputWriter(path, bucketId, dataSchema, context)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index e22dac3bc9..202851ae13 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -707,6 +707,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
+ bucketSpec = None,
provider = "json",
options = Map("path" -> "just a dummy path"),
isExternal = false)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
new file mode 100644
index 0000000000..579da0291f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+
+class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+ import testImplicits._
+
+ test("bucketed by non-existing column") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+ intercept[AnalysisException](df.write.bucketBy(2, "k").saveAsTable("tt"))
+ }
+
+ test("numBuckets not greater than 0 or less than 100000") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+ intercept[IllegalArgumentException](df.write.bucketBy(0, "i").saveAsTable("tt"))
+ intercept[IllegalArgumentException](df.write.bucketBy(100000, "i").saveAsTable("tt"))
+ }
+
+ test("specify sorting columns without bucketing columns") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+ intercept[IllegalArgumentException](df.write.sortBy("j").saveAsTable("tt"))
+ }
+
+ test("sorting by non-orderable column") {
+ val df = Seq("a" -> Map(1 -> 1), "b" -> Map(2 -> 2)).toDF("i", "j")
+ intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
+ }
+
+ test("write bucketed data to unsupported data source") {
+ val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
+ intercept[AnalysisException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
+ }
+
+ test("write bucketed data to non-hive-table or existing hive table") {
+ val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
+ intercept[IllegalArgumentException](df.write.bucketBy(2, "i").parquet("/tmp/path"))
+ intercept[IllegalArgumentException](df.write.bucketBy(2, "i").json("/tmp/path"))
+ intercept[IllegalArgumentException](df.write.bucketBy(2, "i").insertInto("tt"))
+ }
+
+ private val testFileName = """.*-(\d+)$""".r
+ private val otherFileName = """.*-(\d+)\..*""".r
+ private def getBucketId(fileName: String): Int = {
+ fileName match {
+ case testFileName(bucketId) => bucketId.toInt
+ case otherFileName(bucketId) => bucketId.toInt
+ }
+ }
+
+ private def testBucketing(
+ dataDir: File,
+ source: String,
+ bucketCols: Seq[String],
+ sortCols: Seq[String] = Nil): Unit = {
+ val allBucketFiles = dataDir.listFiles().filterNot(f =>
+ f.getName.startsWith(".") || f.getName.startsWith("_")
+ )
+ val groupedBucketFiles = allBucketFiles.groupBy(f => getBucketId(f.getName))
+ assert(groupedBucketFiles.size <= 8)
+
+ for ((bucketId, bucketFiles) <- groupedBucketFiles) {
+ for (bucketFile <- bucketFiles) {
+ val df = sqlContext.read.format(source).load(bucketFile.getAbsolutePath)
+ .select((bucketCols ++ sortCols).map(col): _*)
+
+ if (sortCols.nonEmpty) {
+ checkAnswer(df.sort(sortCols.map(col): _*), df.collect())
+ }
+
+ val rows = df.select(bucketCols.map(col): _*).queryExecution.toRdd.map(_.copy()).collect()
+
+ for (row <- rows) {
+ assert(row.isInstanceOf[UnsafeRow])
+ val actualBucketId = (row.hashCode() % 8 + 8) % 8
+ assert(actualBucketId == bucketId)
+ }
+ }
+ }
+ }
+
+ private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
+
+ test("write bucketed data") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .partitionBy("i")
+ .bucketBy(8, "j", "k")
+ .saveAsTable("bucketed_table")
+
+ val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+ for (i <- 0 until 5) {
+ testBucketing(new File(tableDir, s"i=$i"), source, Seq("j", "k"))
+ }
+ }
+ }
+ }
+
+ test("write bucketed data with sortBy") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .partitionBy("i")
+ .bucketBy(8, "j")
+ .sortBy("k")
+ .saveAsTable("bucketed_table")
+
+ val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+ for (i <- 0 until 5) {
+ testBucketing(new File(tableDir, s"i=$i"), source, Seq("j"), Seq("k"))
+ }
+ }
+ }
+ }
+
+ test("write bucketed data without partitionBy") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .bucketBy(8, "i", "j")
+ .saveAsTable("bucketed_table")
+
+ val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+ testBucketing(tableDir, source, Seq("i", "j"))
+ }
+ }
+ }
+
+ test("write bucketed data without partitionBy with sortBy") {
+ for (source <- Seq("parquet", "json", "orc")) {
+ withTable("bucketed_table") {
+ df.write
+ .format(source)
+ .bucketBy(8, "i", "j")
+ .sortBy("k")
+ .saveAsTable("bucketed_table")
+
+ val tableDir = new File(hiveContext.warehousePath, "bucketed_table")
+ testBucketing(tableDir, source, Seq("i", "j"), Seq("k"))
+ }
+ }
+ }
+}