aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-22 10:03:57 -0700
committerYin Huai <yhuai@databricks.com>2015-06-22 10:03:57 -0700
commit0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13 (patch)
treea36710eceeac3eec994270cde2a9ad8f1a55fa56
parent47c1d5629373566df9d12fdc4ceb22f38b869482 (diff)
downloadspark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.tar.gz
spark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.tar.bz2
spark-0818fdec3733ec5c0a9caa48a9c0f2cd25f84d13.zip
[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental overwriting
This PR fixes a Parquet output file name collision bug which may cause data loss. Changes made: 1. Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision. 2. Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue. Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.) 3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected along the way. NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See [SPARK-8501] [1] and [SPARK-8513] [2]. [1]: https://github.com/liancheng/spark/tree/spark-8501 [2]: https://github.com/liancheng/spark/tree/spark-8513 ---- Some background and a summary of offline discussion with yhuai about this issue for better understanding: In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data sources that are based on Hadoop `FileSystem` interface. Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier. To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e., `<id>` in output file name `part-r-<id>.gz.parquet`) at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive part numbers in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other. Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive tables. From a user's perspective, these two look similar. However, they differ a lot internally. When data are inserted into Hive tables via Spark SQL, `InsertIntoHiveTable` simulates Hive's behaviors: 1. Write data to a temporary location 2. Move data in the temporary location to the final destination location using - `Hive.loadTable()` for non-partitioned table - `Hive.loadPartition()` for static partitions - `Hive.loadDynamicPartitions()` for dynamic partitions The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff). If a file in the source directory and another file in the destination directory happen to have the same name, say `part-r-00001.parquet`, the former is moved to the destination directory and renamed with a `_copy_N` postfix (`part-r-00001_copy_1.parquet`). That's how Hive handles appending and avoids name collision between different write jobs. Some alternatives fixes considered for this issue: 1. Use a similar approach as Hive This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions. That's why `InsertIntoHadoopFsRelation` just inserts to destination directory directly, and is often used together with `DirectParquetOutputCommitter` to reduce latency when working with S3. This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning. 2. Same as 1.3, just move max part number detection back to driver side This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account. When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job. Checking all partition directories is simply too expensive for tables with thousands of partitions. 3. Add extra component to output file names to avoid name collision This seems to be the only reasonable solution for now. To be more specific, we need a JOB level unique identifier to identify all write jobs issued by `InsertIntoHadoopFile`. Notice that TASK level unique identifiers can NOT be used. Because in this way a speculative task will write to a different output file from the original task. If both tasks succeed, duplicate output will be left behind. Currently, the ORC data source adds `System.currentTimeMillis` to the output file name for uniqueness. This doesn't work because of exactly the same reason. That's why this PR adds a job level random UUID in `BaseWriterContainer` (which is used by `InsertIntoHadoopFsRelation` to issue write jobs). The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job). However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the `_copy_N` trick breaks the order. Author: Cheng Lian <lian@databricks.com> Closes #6864 from liancheng/spark-8406 and squashes the following commits: db7a46a [Cheng Lian] More comments f5c1133 [Cheng Lian] Addresses comments 85c478e [Cheng Lian] Workarounds SPARK-8513 088c76c [Cheng Lian] Adds comment about SPARK-8501 99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids double task abortion 4088226 [Cheng Lian] Works around SPARK-8501 1d7d206 [Cheng Lian] Adds more logs 8966bbb [Cheng Lian] Fixes Scala style issue 18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account 3806190 [Cheng Lian] Lets TestHive use all cores by default 748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental overwriting
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala64
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala37
8 files changed, 120 insertions, 72 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c9de45e0dd..e049d54bf5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
- val conf = context.getConfiguration
val outputFormat = {
- // When appending new Parquet files to an existing Parquet file directory, to avoid
- // overwriting existing data files, we need to find out the max task ID encoded in these data
- // file names.
- // TODO Make this snippet a utility function for other data source developers
- val maxExistingTaskId = {
- // Note that `path` may point to a temporary location. Here we retrieve the real
- // destination path from the configuration
- val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
- val fs = outputPath.getFileSystem(conf)
-
- if (fs.exists(outputPath)) {
- // Pattern used to match task ID in part file names, e.g.:
- //
- // part-r-00001.gz.parquet
- // ^~~~~
- val partFilePattern = """part-.-(\d{1,}).*""".r
-
- fs.listStatus(outputPath).map(_.getPath.getName).map {
- case partFilePattern(id) => id.toInt
- case name if name.startsWith("_") => 0
- case name if name.startsWith(".") => 0
- case name => throw new AnalysisException(
- s"Trying to write Parquet files to directory $outputPath, " +
- s"but found items with illegal name '$name'.")
- }.reduceOption(_ max _).getOrElse(0)
- } else {
- 0
- }
- }
-
new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
- // 1. To allow appending. We need to generate output file name based on the max available
- // task ID computed above.
+ // 1. To allow appending. We need to generate unique output file names to avoid
+ // overwriting existing files (either exist before the write job, or are just written
+ // by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
- new Path(path, f"part-r-$split%05d$extension")
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+ val split = context.getTaskAttemptID.getTaskID.getId
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c16bd9ae52..215e53c020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.sources
-import java.util.Date
+import java.util.{Date, UUID}
import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
}
}
+/**
+ * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job. Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file. This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ * 1. Driver side setup, including output committer initialization and data source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
@@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+ // This UUID is used to avoid output file name collision between different appending write jobs.
+ // These jobs may belong to different SparkContext instances. Concrete data source implementations
+ // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+ // The reason why this ID is used to identify a job rather than a single task output file is
+ // that, speculative tasks must generate the same output file name as the original task.
+ private val uniqueWriteJobId = UUID.randomUUID()
// This is only used on driver side.
@transient private val jobContext: JobContext = job
@@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()
+ // This UUID is sent to executor side together with the serialized `Configuration` object within
+ // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
+ // unique task output files.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
@@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
- } catch {
- case cause: Throwable =>
- super.abortTask()
- throw new RuntimeException("Failed to commit task", cause)
+ } catch { case cause: Throwable =>
+ // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+ // cause `abortTask()` to be invoked.
+ throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
+ // It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
@@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}
- override def commitTask(): Unit = {
- try {
+ private def clearOutputWriters(): Unit = {
+ if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
+ }
+ }
+
+ override def commitTask(): Unit = {
+ try {
+ clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
- super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
- outputWriters.values.foreach(_.close())
- outputWriters.clear()
+ clearOutputWriters()
} finally {
super.abortTask()
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1e51173a19..e3ab9442b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType
-private[orc] object OrcFileOperator extends Logging{
+private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
-
+ logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
@@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
+ logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}
@@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
- val path = origPath.makeQualified(fs)
+ val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))
- if (paths == null || paths.size == 0) {
+ if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index dbce39f21d..705f48f1cd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging}
import org.apache.spark.util.SerializableConfiguration
/* Implicit conversions */
@@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true
val conf = context.getConfiguration
+ val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
- val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
+ val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f901bd8171..ea325cc93c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
- System.getProperty("spark.sql.test.master", "local[2]"),
+ System.getProperty("spark.sql.test.master", "local[32]"),
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 82e08caf46..a0cdd0db42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._
+ // Originally we were using a 10-row RDD for testing. However, when default parallelism is
+ // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
+ // which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
+ // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
+ // for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
+ // number in this RDD to avoid empty partitions.
sparkContext
- .makeRDD(1 to 10)
+ .makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
@@ -70,35 +76,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}
test("create temporary orc table") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 10).map(i => Row(i, s"part-$i")))
+ (1 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 10).map(i => Row(1, s"part-$i")))
+ (1 to 100).map(i => Row(1, s"part-$i")))
}
test("create temporary orc table as") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 10).map(i => Row(i, s"part-$i")))
+ (1 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 10).map(i => Row(1, s"part-$i")))
+ (1 to 100).map(i => Row(1, s"part-$i")))
}
test("appending insert") {
@@ -106,7 +112,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+ (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
@@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0f959b3d0b..5d7cd16c12 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
- new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+ new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}
@@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
+ super.close()
sys.error("Intentional task commitment failure for testing purpose.")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 76469d7a3d..e0d8277a8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
import sqlContext.sql
import sqlContext.implicits._
- val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+ val dataSourceName: String
val dataSchema =
StructType(
@@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect())
}
}
+
+ // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
+ // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
+ // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
+ // requirement. We probably want to move this test case to spark-integration-tests or spark-perf
+ // later.
+ test("SPARK-8406: Avoids name collision while writing Parquet files") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext
+ .range(10000)
+ .repartition(250)
+ .write
+ .mode(SaveMode.Overwrite)
+ .format(dataSourceName)
+ .save(path)
+
+ assertResult(10000) {
+ sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
+ .load(path)
+ .count()
+ }
+ }
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
- import TestHive.implicits._
-
override val sqlContext = TestHive
+ // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
- val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+ // Here we coalesce partition number to 1 to ensure that only a single task is issued. This
+ // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+ // directory while committing/aborting the job. See SPARK-8513 for more details.
+ val df = sqlContext.range(0, 10).coalesce(1)
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}