aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala43
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala64
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala37
8 files changed, 120 insertions, 72 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c9de45e0dd..e049d54bf5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -60,50 +60,21 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
- val conf = context.getConfiguration
val outputFormat = {
- // When appending new Parquet files to an existing Parquet file directory, to avoid
- // overwriting existing data files, we need to find out the max task ID encoded in these data
- // file names.
- // TODO Make this snippet a utility function for other data source developers
- val maxExistingTaskId = {
- // Note that `path` may point to a temporary location. Here we retrieve the real
- // destination path from the configuration
- val outputPath = new Path(conf.get("spark.sql.sources.output.path"))
- val fs = outputPath.getFileSystem(conf)
-
- if (fs.exists(outputPath)) {
- // Pattern used to match task ID in part file names, e.g.:
- //
- // part-r-00001.gz.parquet
- // ^~~~~
- val partFilePattern = """part-.-(\d{1,}).*""".r
-
- fs.listStatus(outputPath).map(_.getPath.getName).map {
- case partFilePattern(id) => id.toInt
- case name if name.startsWith("_") => 0
- case name if name.startsWith(".") => 0
- case name => throw new AnalysisException(
- s"Trying to write Parquet files to directory $outputPath, " +
- s"but found items with illegal name '$name'.")
- }.reduceOption(_ max _).getOrElse(0)
- } else {
- 0
- }
- }
-
new ParquetOutputFormat[InternalRow]() {
// Here we override `getDefaultWorkFile` for two reasons:
//
- // 1. To allow appending. We need to generate output file name based on the max available
- // task ID computed above.
+ // 1. To allow appending. We need to generate unique output file names to avoid
+ // overwriting existing files (either exist before the write job, or are just written
+ // by other tasks within the same write job).
//
// 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val split = context.getTaskAttemptID.getTaskID.getId + maxExistingTaskId + 1
- new Path(path, f"part-r-$split%05d$extension")
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
+ val split = context.getTaskAttemptID.getTaskID.getId
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c16bd9ae52..215e53c020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.sources
-import java.util.Date
+import java.util.{Date, UUID}
import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter => MapReduceFileOutputCommitter}
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -59,6 +58,28 @@ private[sql] case class InsertIntoDataSource(
}
}
+/**
+ * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job. Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file. This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ * 1. Driver side setup, including output committer initialization and data source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
@@ -261,7 +282,14 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+ // This UUID is used to avoid output file name collision between different appending write jobs.
+ // These jobs may belong to different SparkContext instances. Concrete data source implementations
+ // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+ // The reason why this ID is used to identify a job rather than a single task output file is
+ // that, speculative tasks must generate the same output file name as the original task.
+ private val uniqueWriteJobId = UUID.randomUUID()
// This is only used on driver side.
@transient private val jobContext: JobContext = job
@@ -290,6 +318,11 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()
+ // This UUID is sent to executor side together with the serialized `Configuration` object within
+ // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
+ // unique task output files.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
// configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
@@ -417,15 +450,16 @@ private[sql] class DefaultWriterContainer(
assert(writer != null, "OutputWriter instance should have been initialized")
writer.close()
super.commitTask()
- } catch {
- case cause: Throwable =>
- super.abortTask()
- throw new RuntimeException("Failed to commit task", cause)
+ } catch { case cause: Throwable =>
+ // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
+ // cause `abortTask()` to be invoked.
+ throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
+ // It's possible that the task fails before `writer` gets initialized
if (writer != null) {
writer.close()
}
@@ -469,21 +503,25 @@ private[sql] class DynamicPartitionWriterContainer(
})
}
- override def commitTask(): Unit = {
- try {
+ private def clearOutputWriters(): Unit = {
+ if (outputWriters.nonEmpty) {
outputWriters.values.foreach(_.close())
outputWriters.clear()
+ }
+ }
+
+ override def commitTask(): Unit = {
+ try {
+ clearOutputWriters()
super.commitTask()
} catch { case cause: Throwable =>
- super.abortTask()
throw new RuntimeException("Failed to commit task", cause)
}
}
override def abortTask(): Unit = {
try {
- outputWriters.values.foreach(_.close())
- outputWriters.clear()
+ clearOutputWriters()
} finally {
super.abortTask()
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 1e51173a19..e3ab9442b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -27,13 +27,13 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType
-private[orc] object OrcFileOperator extends Logging{
+private[orc] object OrcFileOperator extends Logging {
def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
-
+ logDebug(s"Creating ORC Reader from ${orcFiles.head}")
// TODO Need to consider all files when schema evolution is taken into account.
OrcFile.createReader(fs, orcFiles.head)
}
@@ -42,6 +42,7 @@ private[orc] object OrcFileOperator extends Logging{
val reader = getFileReader(path, conf)
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
+ logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}
@@ -52,14 +53,14 @@ private[orc] object OrcFileOperator extends Logging{
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
- val path = origPath.makeQualified(fs)
+ val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDir)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))
- if (paths == null || paths.size == 0) {
+ if (paths == null || paths.isEmpty) {
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index dbce39f21d..705f48f1cd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, Reco
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -39,7 +40,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging}
import org.apache.spark.util.SerializableConfiguration
/* Implicit conversions */
@@ -105,8 +105,9 @@ private[orc] class OrcOutputWriter(
recordWriterInstantiated = true
val conf = context.getConfiguration
+ val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val partition = context.getTaskAttemptID.getTaskID.getId
- val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc"
+ val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index f901bd8171..ea325cc93c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._
object TestHive
extends TestHiveContext(
new SparkContext(
- System.getProperty("spark.sql.test.master", "local[2]"),
+ System.getProperty("spark.sql.test.master", "local[32]"),
"TestSQLContext",
new SparkConf()
.set("spark.sql.test", "")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 82e08caf46..a0cdd0db42 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,8 +43,14 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._
+ // Originally we were using a 10-row RDD for testing. However, when default parallelism is
+ // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
+ // which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
+ // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
+ // for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
+ // number in this RDD to avoid empty partitions.
sparkContext
- .makeRDD(1 to 10)
+ .makeRDD(1 to 100)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
@@ -70,35 +76,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}
test("create temporary orc table") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 10).map(i => Row(i, s"part-$i")))
+ (1 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 10).map(i => Row(1, s"part-$i")))
+ (1 to 100).map(i => Row(1, s"part-$i")))
}
test("create temporary orc table as") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 10).map(i => Row(i, s"part-$i")))
+ (1 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 10).map(i => Row(1, s"part-$i")))
+ (1 to 100).map(i => Row(1, s"part-$i")))
}
test("appending insert") {
@@ -106,7 +112,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
+ (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
@@ -119,7 +125,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
- (6 to 10).map(i => Row(i, s"part-$i")))
+ (6 to 100).map(i => Row(i, s"part-$i")))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 0f959b3d0b..5d7cd16c12 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -53,9 +53,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
val split = context.getTaskAttemptID.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
- new Path(outputFile, s"$name-${numberFormat.format(split)}-${UUID.randomUUID()}")
+ new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}
}
@@ -156,6 +157,7 @@ class CommitFailureTestRelation(
context: TaskAttemptContext): OutputWriter = {
new SimpleTextOutputWriter(path, context) {
override def close(): Unit = {
+ super.close()
sys.error("Intentional task commitment failure for testing purpose.")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 76469d7a3d..e0d8277a8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -35,7 +35,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
import sqlContext.sql
import sqlContext.implicits._
- val dataSourceName = classOf[SimpleTextSource].getCanonicalName
+ val dataSourceName: String
val dataSchema =
StructType(
@@ -470,6 +470,33 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
checkAnswer(sqlContext.table("t"), df.select('b, 'c, 'a).collect())
}
}
+
+ // NOTE: This test suite is not super deterministic. On nodes with only relatively few cores
+ // (4 or even 1), it's hard to reproduce the data loss issue. But on nodes with for example 8 or
+ // more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this
+ // requirement. We probably want to move this test case to spark-integration-tests or spark-perf
+ // later.
+ test("SPARK-8406: Avoids name collision while writing Parquet files") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext
+ .range(10000)
+ .repartition(250)
+ .write
+ .mode(SaveMode.Overwrite)
+ .format(dataSourceName)
+ .save(path)
+
+ assertResult(10000) {
+ sqlContext
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", StructType(StructField("id", LongType) :: Nil).json)
+ .load(path)
+ .count()
+ }
+ }
+ }
}
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
@@ -502,15 +529,17 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
class CommitFailureTestRelationSuite extends SparkFunSuite with SQLTestUtils {
- import TestHive.implicits._
-
override val sqlContext = TestHive
+ // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
withTempPath { file =>
- val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+ // Here we coalesce partition number to 1 to ensure that only a single task is issued. This
+ // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+ // directory while committing/aborting the job. See SPARK-8513 for more details.
+ val df = sqlContext.range(0, 10).coalesce(1)
intercept[SparkException] {
df.write.format(dataSourceName).save(file.getCanonicalPath)
}