diff options
author | Wenchen Fan <wenchen@databricks.com> | 2017-01-17 23:37:59 -0800 |
---|---|---|
committer | gatorsmile <gatorsmile@gmail.com> | 2017-01-17 23:37:59 -0800 |
commit | 4494cd9716d64a6c7cfa548abadb5dd0c4c143a6 (patch) | |
tree | 6c06c19fb977106fc819a883889e9aa2ffefdcb9 /core/src | |
parent | e7f982b20d8a1c0db711e0dcfe26b2f39f98dd64 (diff) | |
download | spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.gz spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.bz2 spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.zip |
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface
## What changes were proposed in this pull request?
Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`.
Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`.
This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`.
## How was this patch tested?
existing tests
Author: Wenchen Fan <wenchen@databricks.com>
Closes #16517 from cloud-fan/insert-hive.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 |
1 files changed, 1 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index b2d9b8d2a0..2f33f2e4ff 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) } private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { - // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet + // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. val split = taskContext.getTaskAttemptID.getTaskID.getId |