aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-11-03 02:45:54 -0700
committerReynold Xin <rxin@databricks.com>2016-11-03 02:45:54 -0700
commit0ea5d5b24c1f7b29efeac0e72d271aba279523f7 (patch)
tree4988c9ae3ce68ee6bfe8eb22627ce91fae48dcd4 /sql
parent937af592e65f4dd878aafcabf8fe2cfe7fa3d9b3 (diff)
downloadspark-0ea5d5b24c1f7b29efeac0e72d271aba279523f7.tar.gz
spark-0ea5d5b24c1f7b29efeac0e72d271aba279523f7.tar.bz2
spark-0ea5d5b24c1f7b29efeac0e72d271aba279523f7.zip
[SQL] minor - internal doc improvement for InsertIntoTable.
## What changes were proposed in this pull request? I was reading this part of the code and was really confused by the "partition" parameter. This patch adds some documentation for it to reduce confusion in the future. I also looked around other logical plans but most of them are either already documented, or pretty self-evident to people that know Spark SQL. ## How was this patch tested? N/A - doc change only. Author: Reynold Xin <rxin@databricks.com> Closes #15749 from rxin/doc-improvement.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala31
2 files changed, 42 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 7a15c2285d..65ceab2ce2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -360,6 +360,22 @@ case class OverwriteOptions(
}
}
+/**
+ * Insert some data into a table.
+ *
+ * @param table the logical plan representing the table. In the future this should be a
+ * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
+ * and data source tables.
+ * @param partition a map from the partition key to the partition value (optional). If the partition
+ * value is optional, dynamic partition insert will be performed.
+ * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
+ * Map('a' -> Some('1'), 'b' -> Some('2')),
+ * and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ * would have Map('a' -> Some('1'), 'b' -> None).
+ * @param child the logical plan representing data to write to.
+ * @param overwrite overwrite existing table or partitions.
+ * @param ifNotExists If true, only write if the table or partition does not exist.
+ */
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 05164d774c..15be12cfc0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -35,13 +35,35 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
+/**
+ * Command for writing data out to a Hive table.
+ *
+ * This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to
+ * follow Hive's internal implementations closely, which itself was a mess too). Please don't
+ * blame Reynold for this! He was just moving code around!
+ *
+ * In the future we should converge the write path for Hive with the normal data source write path,
+ * as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]].
+ *
+ * @param table the logical plan representing the table. In the future this should be a
+ * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
+ * and data source tables.
+ * @param partition a map from the partition key to the partition value (optional). If the partition
+ * value is optional, dynamic partition insert will be performed.
+ * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
+ * Map('a' -> Some('1'), 'b' -> Some('2')),
+ * and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ * would have Map('a' -> Some('1'), 'b' -> None).
+ * @param child the logical plan representing data to write to.
+ * @param overwrite overwrite existing table or partitions.
+ * @param ifNotExists If true, only write if the table or partition does not exist.
+ */
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
@@ -81,8 +103,7 @@ case class InsertIntoHiveTable(
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
fs.deleteOnExit(dir)
- }
- catch {
+ } catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
@@ -123,7 +144,7 @@ case class InsertIntoHiveTable(
FileOutputFormat.setOutputPath(
conf.value,
- SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
+ SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
@@ -263,7 +284,7 @@ case class InsertIntoHiveTable(
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
- oldPart.get.storage.locationUri.map { uri =>
+ oldPart.get.storage.locationUri.foreach { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {