aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-22 10:03:57 -0700
committerYin Huai <yhuai@databricks.com>2015-06-22 10:03:57 -0700
commit0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13 (patch)
treea36710eceeac3eec994270cde2a9ad8f1a55fa56 /sql/core
parent47c1d5629373566df9d12fdc4ceb22f38b869482 (diff)
downloadspark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.tar.gz
spark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.tar.bz2
spark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.zip
[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental overwriting
This PR fixes a Parquet output file name collision bug which may cause data loss. Changes made: 1. Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision. 2. Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue. Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.) 3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected along the way. NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See [SPARK-8501] [1] and [SPARK-8513] [2]. [1]: https://github.com/liancheng/spark/tree/spark-8501 [2]: https://github.com/liancheng/spark/tree/spark-8513 ---- Some background and a summary of offline discussion with yhuai about this issue for better understanding: In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data sources that are based on Hadoop `FileSystem` interface. Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier. To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e., `<id>` in output file name `part-r-<id>.gz.parquet`) at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive part numbers in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other. Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive tables. From a user's perspective, these two look similar. However, they differ a lot internally. When data are inserted into Hive tables via Spark SQL, `InsertIntoHiveTable` simulates Hive's behaviors: 1. Write data to a temporary location 2. Move data in the temporary location to the final destination location using - `Hive.loadTable()` for non-partitioned table - `Hive.loadPartition()` for static partitions - `Hive.loadDynamicPartitions()` for dynamic partitions The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff). If a file in the source directory and another file in the destination directory happen to have the same name, say `part-r-00001.parquet`, the former is moved to the destination directory and renamed with a `_copy_N` postfix (`part-r-00001_copy_1.parquet`). That's how Hive handles appending and avoids name collision between different write jobs. Some alternatives fixes considered for this issue: 1. Use a similar approach as Hive This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions. That's why `InsertIntoHadoopFsRelation` just inserts to destination directory directly, and is often used together with `DirectParquetOutputCommitter` to reduce latency when working with S3. This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning. 2. Same as 1.3, just move max part number detection back to driver side This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account. When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job. Checking all partition directories is simply too expensive for tables with thousands of partitions. 3. Add extra component to output file names to avoid name collision This seems to be the only reasonable solution for now. To be more specific, we need a JOB level unique identifier to identify all write jobs issued by `InsertIntoHadoopFile`. Notice that TASK level unique identifiers can NOT be used. Because in this way a speculative task will write to a different output file from the original task. If both tasks succeed, duplicate output will be left behind. Currently, the ORC data source adds `System.currentTimeMillis` to the output file name for uniqueness. This doesn't work because of exactly the same reason. That's why this PR adds a job level random UUID in `BaseWriterContainer` (which is used by `InsertIntoHadoopFsRelation` to issue write jobs). The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job). However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the `_copy_N` trick breaks the order. Author: Cheng Lian <lian@databricks.com> Closes #6864 from liancheng/spark-8406 and squashes the following commits: db7a46a [Cheng Lian] More comments f5c1133 [Cheng Lian] Addresses comments 85c478e [Cheng Lian] Workarounds SPARK-8513 088c76c [Cheng Lian] Adds comment about SPARK-8501 99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids double task abortion 4088226 [Cheng Lian] Works around SPARK-8501 1d7d206 [Cheng Lian] Adds more logs 8966bbb [Cheng Lian] Fixes Scala style issue 18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account 3806190 [Cheng Lian] Lets TestHive use all cores by default 748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental overwriting
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala64
2 files changed, 58 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c9de45e0dd..e049d54bf5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
- val conf = context.getConfiguration
val outputFormat = {
- // When appending new Parquet files to an existing Parquet file directory, to avoid
- // overwriting existing data files, we need to find out the max task ID encoded in these data
- // file names.
- // TODO Make this snippet a utility function for other data source developers
- val maxExistingTaskId = {
- // Note that `path` may point to a temporary location. Here we retrieve the real
- // destination path from the configuration
- val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
- val fs = outputPath.getFileSystem(conf)
-
- if (fs.exists(outputPath)) {
- // Pattern used to match task ID in part file names, e.g.:
- //
- // part-r-00001.gz.parquet
- // ^~~~~
- val partFilePattern = """part-.-(\d{1,}).*""".r
-
- fs.listStatus(outputPath).map(_.getPath.getName).map {
- case partFilePattern(id) => id.toInt
- case name if name.startsWith("_") => 0
- case name if name.startsWith(".") => 0
- case name => throw new AnalysisException(
- s"Trying to write Parquet files to directory $outputPath, " +
- s"but found items with illegal name '$name'.")
- }.reduceOption(_ max _).getOrElse(0)
- } else {
- 0
- }
- }
-
new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
- // 1. To allow appending. We need to generate output file name based on the max available
- // task ID computed above.
+ // 1. To allow appending. We need to generate unique output file names to avoid
+ // overwriting existing files (either exist before the write job, or are just written
+ // by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
- new Path(path, f"part-r-$split%05d$extension")
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+ val split = context.getTaskAttemptID.getTaskID.getId
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c16bd9ae52..215e53c020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.sources
-import java.util.Date
+import java.util.{Date, UUID}
import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
}
}
+/**
+ * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job. Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file. This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ * 1. Driver side setup, including output committer initialization and data source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
@@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+ // This UUID is used to avoid output file name collision between different appending write jobs.
+ // These jobs may belong to different SparkContext instances. Concrete data source implementations
+ // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+ // The reason why this ID is used to identify a job rather than a single task output file is
+ // that, speculative tasks must generate the same output file name as the original task.
+ private val uniqueWriteJobId = UUID.randomUUID()
// This is only used on driver side.
@transient private val jobContext: JobContext = job
@@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()
+ // This UUID is sent to executor side together with the serialized `Configuration` object within
+ // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
+ // unique task output files.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
@@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
- } catch {
- case cause: Throwable =>
- super.abortTask()
- throw new RuntimeException("Failed to commit task", cause)
+ } catch { case cause: Throwable =>
+ // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+ // cause `abortTask()` to be invoked.
+ throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
+ // It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
@@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}
- override def commitTask(): Unit = {
- try {
+ private def clearOutputWriters(): Unit = {
+ if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
+ }
+ }
+
+ override def commitTask(): Unit = {
+ try {
+ clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
- super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
- outputWriters.values.foreach(_.close())
- outputWriters.clear()
+ clearOutputWriters()
} finally {
super.abortTask()
}