aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-20 12:18:56 -0700
committerReynold Xin <rxin@databricks.com>2016-10-20 12:18:56 -0700
commit7f9ec19eae60abe589ffd22259a9065e7e353a57 (patch)
tree304e751a63b5ec83ec4e8fa918573020890f2ae5 /sql/hive
parent947f4f25273161dc4719419a35613a71c2e2a150 (diff)
downloadspark-7f9ec19eae60abe589ffd22259a9065e7e353a57.tar.gz
spark-7f9ec19eae60abe589ffd22259a9065e7e353a57.tar.bz2
spark-7f9ec19eae60abe589ffd22259a9065e7e353a57.zip
[SPARK-18021][SQL] Refactor file name specification for data sources
## What changes were proposed in this pull request? Currently each data source OutputWriter is responsible for specifying the entire file name for each file output. This, however, does not make any sense because we rely on file naming schemes for certain behaviors in Spark SQL, e.g. bucket id. The current approach allows individual data sources to break the implementation of bucketing. On the flip side, we also don't want to move file naming entirely out of data sources, because different data sources do want to specify different extensions. This patch divides file name specification into two parts: the first part is a prefix specified by the caller of OutputWriter (in WriteOutput), and the second part is the suffix that can be specified by the OutputWriter itself. Note that a side effect of this change is that now all file based data sources also support bucketing automatically. There are also some other minor cleanups: - Removed the UUID passed through generic Configuration string - Some minor rewrites for better clarity - Renamed "path" in multiple places to "stagingDir", to more accurately reflect its meaning ## How was this patch tested? This should be covered by existing data source tests. Author: Reynold Xin <rxin@databricks.com> Closes #15562 from rxin/SPARK-18021.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala26
4 files changed, 24 insertions, 34 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 1af3280e18..1ceacb458a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -83,11 +83,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new OrcOutputWriter(path, bucketId, dataSchema, context)
+ new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
}
}
}
@@ -210,8 +210,8 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
}
private[orc] class OrcOutputWriter(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
@@ -226,10 +226,7 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val partition = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+
val compressionExtension = {
val name = conf.get(OrcRelation.ORC_COMPRESSION)
OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
@@ -237,12 +234,12 @@ private[orc] class OrcOutputWriter(
// It has the `.orc` extension at the end because (de)compression tools
// such as gunzip would not be able to decompress this as the compression
// is not applied on this whole file but on each "stream" in ORC format.
- val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
+ val filename = s"$fileNamePrefix$compressionExtension.orc"
new OrcOutputFormat().getRecordWriter(
- new Path(path, filename).getFileSystem(conf),
+ new Path(stagingDir, filename).getFileSystem(conf),
conf.asInstanceOf[JobConf],
- new Path(path, filename).toString,
+ new Path(stagingDir, filename).toString,
Reporter.NULL
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 997445114b..2eafe18b85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -54,11 +54,6 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
}
- test("write bucketed data to unsupported data source") {
- val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
- intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
- }
-
test("write bucketed data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 5a8a7f0ab5..d504468402 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -39,11 +39,11 @@ class CommitFailureTestSource extends SimpleTextSource {
dataSchema: StructType): OutputWriterFactory =
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context) {
+ new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) {
var failed = false
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
failed = true
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 906de6bbcb..9e13b217ec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -51,11 +51,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context)
+ new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context)
}
}
}
@@ -120,9 +120,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
}
}
-class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+class SimpleTextOutputWriter(
+ stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext)
+ extends OutputWriter {
private val recordWriter: RecordWriter[NullWritable, Text] =
- new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+ new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context)
override def write(row: Row): Unit = {
val serialized = row.toSeq.map { v =>
@@ -136,19 +138,15 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
}
}
-class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
- val numberFormat = NumberFormat.getInstance()
+class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
+ extends TextOutputFormat[NullWritable, Text] {
+ val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- val name = FileOutputFormat.getOutputName(context)
- new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+ new Path(stagingDir, fileNamePrefix)
}
}