aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala31
2 files changed, 42 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 7a15c2285d..65ceab2ce2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -360,6 +360,22 @@ case class OverwriteOptions(
}
}
+/**
+ * Insert some data into a table.
+ *
+ * @param table the logical plan representing the table. In the future this should be a
+ * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
+ * and data source tables.
+ * @param partition a map from the partition key to the partition value (optional). If the partition
+ * value is optional, dynamic partition insert will be performed.
+ * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
+ * Map('a' -> Some('1'), 'b' -> Some('2')),
+ * and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ * would have Map('a' -> Some('1'), 'b' -> None).
+ * @param child the logical plan representing data to write to.
+ * @param overwrite overwrite existing table or partitions.
+ * @param ifNotExists If true, only write if the table or partition does not exist.
+ */
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 05164d774c..15be12cfc0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -35,13 +35,35 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
+/**
+ * Command for writing data out to a Hive table.
+ *
+ * This class is mostly a mess, for legacy reasons (since it evolved in organic ways and had to
+ * follow Hive's internal implementations closely, which itself was a mess too). Please don't
+ * blame Reynold for this! He was just moving code around!
+ *
+ * In the future we should converge the write path for Hive with the normal data source write path,
+ * as defined in [[org.apache.spark.sql.execution.datasources.FileFormatWriter]].
+ *
+ * @param table the logical plan representing the table. In the future this should be a
+ * [[org.apache.spark.sql.catalyst.catalog.CatalogTable]] once we converge Hive tables
+ * and data source tables.
+ * @param partition a map from the partition key to the partition value (optional). If the partition
+ * value is optional, dynamic partition insert will be performed.
+ * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have
+ * Map('a' -> Some('1'), 'b' -> Some('2')),
+ * and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ * would have Map('a' -> Some('1'), 'b' -> None).
+ * @param child the logical plan representing data to write to.
+ * @param overwrite overwrite existing table or partitions.
+ * @param ifNotExists If true, only write if the table or partition does not exist.
+ */
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
@@ -81,8 +103,7 @@ case class InsertIntoHiveTable(
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
fs.deleteOnExit(dir)
- }
- catch {
+ } catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
@@ -123,7 +144,7 @@ case class InsertIntoHiveTable(
FileOutputFormat.setOutputPath(
conf.value,
- SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
+ SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
@@ -263,7 +284,7 @@ case class InsertIntoHiveTable(
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
- oldPart.get.storage.locationUri.map { uri =>
+ oldPart.get.storage.locationUri.foreach { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {