aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-08-07 16:24:50 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-07 16:24:50 -0700
commit49702bd738de681255a7177339510e0e1b25a8db (patch)
tree4f982b881a15175228b659f917eb1e12e345d6ff /sql
parent902334fd55bbe40a57c1de2a9bdb25eddf1c8cf6 (diff)
downloadspark-49702bd738de681255a7177339510e0e1b25a8db.tar.gz
spark-49702bd738de681255a7177339510e0e1b25a8db.tar.bz2
spark-49702bd738de681255a7177339510e0e1b25a8db.zip
[SPARK-8890] [SQL] Fallback on sorting when writing many dynamic partitions
Previously, we would open a new file for each new dynamic written out using `HadoopFsRelation`. For formats like parquet this is very costly due to the buffers required to get good compression. In this PR I refactor the code allowing us to fall back on an external sort when many partitions are seen. As such each task will open no more than `spark.sql.sources.maxFiles` files. I also did the following cleanup: - Instead of keying the file HashMap on an expensive to compute string representation of the partition, we now use a fairly cheap UnsafeProjection that avoids heap allocations. - The control flow for instantiating and invoking a writer container has been simplified. Now instead of switching in two places based on the use of partitioning, the specific writer container must implement a single method `writeRows` that is invoked using `runJob`. - `InternalOutputWriter` has been removed. Instead we have a `private[sql]` method `writeInternal` that converts and calls the public method. This method can be overridden by internal datasources to avoid the conversion. This change remove a lot of code duplication and per-row `asInstanceOf` checks. - `commands.scala` has been split up. Author: Michael Armbrust <michael@databricks.com> Closes #8010 from marmbrus/fsWriting and squashes the following commits: 00804fe [Michael Armbrust] use shuffleMemoryManager.pageSizeBytes 775cc49 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into fsWriting 17b690e [Michael Armbrust] remove comment 40f0372 [Michael Armbrust] address comments f5675bd [Michael Armbrust] char -> string 7e2d0a4 [Michael Armbrust] make sure we close current writer 8100100 [Michael Armbrust] delete empty commands.scala 71cc717 [Michael Armbrust] update comment 8ec75ac [Michael Armbrust] [SPARK-8890][SQL] Fallback on sorting when writing many dynamic partitions
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala165
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala404
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala606
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala56
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala6
10 files changed, 715 insertions, 623 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 45d3d8c863..e9de14f025 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -366,17 +366,21 @@ private[spark] object SQLConf {
"storing additional schema information in Hive's metastore.",
isPublic = false)
- // Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
defaultValue = Some(true),
doc = "When true, automtically discover data partitions.")
- // Whether to perform partition column type inference. Default to true.
val PARTITION_COLUMN_TYPE_INFERENCE =
booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
defaultValue = Some(true),
doc = "When true, automatically infer the data types for partitioned columns.")
+ val PARTITION_MAX_FILES =
+ intConf("spark.sql.sources.maxConcurrentWrites",
+ defaultValue = Some(5),
+ doc = "The maximum number of concurent files to open before falling back on sorting when " +
+ "writing out files using dynamic partitioning.")
+
// The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
//
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
new file mode 100644
index 0000000000..6ccde7693b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.IOException
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConversions.asScalaIterator
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.util.{Utils, SerializableConfiguration}
+
+
+/**
+ * Inserts the results of `query` in to a relation that extends [[InsertableRelation]].
+ */
+private[sql] case class InsertIntoDataSource(
+ logicalRelation: LogicalRelation,
+ query: LogicalPlan,
+ overwrite: Boolean)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
+ val data = DataFrame(sqlContext, query)
+ // Apply the schema of the existing table to the new data.
+ val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
+ relation.insert(df, overwrite)
+
+ // Invalidate the cache.
+ sqlContext.cacheManager.invalidateCache(logicalRelation)
+
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
new file mode 100644
index 0000000000..735d52f808
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.spark._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
+import org.apache.spark.sql.sources._
+import org.apache.spark.util.Utils
+
+
+/**
+ * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
+ * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
+ * single write job, and owns a UUID that identifies this job. Each concrete implementation of
+ * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
+ * each task output file. This UUID is passed to executor side via a property named
+ * `spark.sql.sources.writeJobUUID`.
+ *
+ * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
+ * are used to write to normal tables and tables with dynamic partitions.
+ *
+ * Basic work flow of this command is:
+ *
+ * 1. Driver side setup, including output committer initialization and data source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
+ * thrown during job commitment, also aborts the job.
+ */
+private[sql] case class InsertIntoHadoopFsRelation(
+ @transient relation: HadoopFsRelation,
+ @transient query: LogicalPlan,
+ mode: SaveMode)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ require(
+ relation.paths.length == 1,
+ s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+
+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val outputPath = new Path(relation.paths.head)
+ val fs = outputPath.getFileSystem(hadoopConf)
+ val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+
+ val pathExists = fs.exists(qualifiedOutputPath)
+ val doInsertion = (mode, pathExists) match {
+ case (SaveMode.ErrorIfExists, true) =>
+ throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
+ case (SaveMode.Overwrite, true) =>
+ Utils.tryOrIOException {
+ if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
+ throw new IOException(s"Unable to clear output " +
+ s"directory $qualifiedOutputPath prior to writing to it")
+ }
+ }
+ true
+ case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
+ true
+ case (SaveMode.Ignore, exists) =>
+ !exists
+ case (s, exists) =>
+ throw new IllegalStateException(s"unsupported save mode $s ($exists)")
+ }
+ // If we are appending data to an existing dir.
+ val isAppend = pathExists && (mode == SaveMode.Append)
+
+ if (doInsertion) {
+ val job = new Job(hadoopConf)
+ job.setOutputKeyClass(classOf[Void])
+ job.setOutputValueClass(classOf[InternalRow])
+ FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
+
+ // A partitioned relation schema's can be different from the input logicalPlan, since
+ // partition columns are all moved after data column. We Project to adjust the ordering.
+ // TODO: this belongs in the analyzer.
+ val project = Project(
+ relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), query)
+ val queryExecution = DataFrame(sqlContext, project).queryExecution
+
+ SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+ val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema)
+ val partitionColumns = relation.partitionColumns.fieldNames
+
+ // Some pre-flight checks.
+ require(
+ df.schema == relation.schema,
+ s"""DataFrame must have the same schema as the relation to which is inserted.
+ |DataFrame schema: ${df.schema}
+ |Relation schema: ${relation.schema}
+ """.stripMargin)
+ val partitionColumnsInSpec = relation.partitionColumns.fieldNames
+ require(
+ partitionColumnsInSpec.sameElements(partitionColumns),
+ s"""Partition columns mismatch.
+ |Expected: ${partitionColumnsInSpec.mkString(", ")}
+ |Actual: ${partitionColumns.mkString(", ")}
+ """.stripMargin)
+
+ val writerContainer = if (partitionColumns.isEmpty) {
+ new DefaultWriterContainer(relation, job, isAppend)
+ } else {
+ val output = df.queryExecution.executedPlan.output
+ val (partitionOutput, dataOutput) =
+ output.partition(a => partitionColumns.contains(a.name))
+
+ new DynamicPartitionWriterContainer(
+ relation,
+ job,
+ partitionOutput,
+ dataOutput,
+ output,
+ PartitioningUtils.DEFAULT_PARTITION_NAME,
+ sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
+ isAppend)
+ }
+
+ // This call shouldn't be put into the `try` block below because it only initializes and
+ // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+ writerContainer.driverSideSetup()
+
+ try {
+ sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writerContainer.writeRows _)
+ writerContainer.commitJob()
+ relation.refresh()
+ } catch { case cause: Throwable =>
+ logError("Aborting job.", cause)
+ writerContainer.abortJob()
+ throw new SparkException("Job aborted.", cause)
+ }
+ }
+ } else {
+ logInfo("Skipping insertion into a relation that already exists.")
+ }
+
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
new file mode 100644
index 0000000000..2f11f40422
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
+import org.apache.spark._
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.types.{StructType, StringType}
+import org.apache.spark.util.SerializableConfiguration
+
+
+private[sql] abstract class BaseWriterContainer(
+ @transient val relation: HadoopFsRelation,
+ @transient job: Job,
+ isAppend: Boolean)
+ extends SparkHadoopMapReduceUtil
+ with Logging
+ with Serializable {
+
+ protected val dataSchema = relation.dataSchema
+
+ protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+
+ // This UUID is used to avoid output file name collision between different appending write jobs.
+ // These jobs may belong to different SparkContext instances. Concrete data source implementations
+ // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
+ // The reason why this ID is used to identify a job rather than a single task output file is
+ // that, speculative tasks must generate the same output file name as the original task.
+ private val uniqueWriteJobId = UUID.randomUUID()
+
+ // This is only used on driver side.
+ @transient private val jobContext: JobContext = job
+
+ // The following fields are initialized and used on both driver and executor side.
+ @transient protected var outputCommitter: OutputCommitter = _
+ @transient private var jobId: JobID = _
+ @transient private var taskId: TaskID = _
+ @transient private var taskAttemptId: TaskAttemptID = _
+ @transient protected var taskAttemptContext: TaskAttemptContext = _
+
+ protected val outputPath: String = {
+ assert(
+ relation.paths.length == 1,
+ s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
+ relation.paths.head
+ }
+
+ protected var outputWriterFactory: OutputWriterFactory = _
+
+ private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
+
+ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit
+
+ def driverSideSetup(): Unit = {
+ setupIDs(0, 0, 0)
+ setupConf()
+
+ // This UUID is sent to executor side together with the serialized `Configuration` object within
+ // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
+ // unique task output files.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+
+ // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
+ // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
+ // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
+ //
+ // Also, the `prepareJobForWrite` call must happen before initializing output format and output
+ // committer, since their initialization involve the job configuration, which can be potentially
+ // decorated in `prepareJobForWrite`.
+ outputWriterFactory = relation.prepareJobForWrite(job)
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+
+ outputFormatClass = job.getOutputFormatClass
+ outputCommitter = newOutputCommitter(taskAttemptContext)
+ outputCommitter.setupJob(jobContext)
+ }
+
+ def executorSideSetup(taskContext: TaskContext): Unit = {
+ setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
+ setupConf()
+ taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+ outputCommitter = newOutputCommitter(taskAttemptContext)
+ outputCommitter.setupTask(taskAttemptContext)
+ }
+
+ protected def getWorkPath: String = {
+ outputCommitter match {
+ // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
+ case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
+ case _ => outputPath
+ }
+ }
+
+ private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+ val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
+
+ if (isAppend) {
+ // If we are appending data to an existing dir, we will only use the output committer
+ // associated with the file output format since it is not safe to use a custom
+ // committer for appending. For example, in S3, direct parquet output committer may
+ // leave partial data in the destination dir when the the appending job fails.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
+ "for appending.")
+ defaultOutputCommitter
+ } else {
+ val committerClass = context.getConfiguration.getClass(
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
+
+ Option(committerClass).map { clazz =>
+ logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
+
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
+ }.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
+ logInfo(
+ s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
+ defaultOutputCommitter
+ }
+ }
+ }
+
+ private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
+ this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
+ this.taskId = new TaskID(this.jobId, true, splitId)
+ this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+ }
+
+ private def setupConf(): Unit = {
+ serializableConf.value.set("mapred.job.id", jobId.toString)
+ serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+ serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
+ serializableConf.value.setBoolean("mapred.task.is.map", true)
+ serializableConf.value.setInt("mapred.task.partition", 0)
+ }
+
+ def commitTask(): Unit = {
+ SparkHadoopMapRedUtil.commitTask(
+ outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+ }
+
+ def abortTask(): Unit = {
+ if (outputCommitter != null) {
+ outputCommitter.abortTask(taskAttemptContext)
+ }
+ logError(s"Task attempt $taskAttemptId aborted.")
+ }
+
+ def commitJob(): Unit = {
+ outputCommitter.commitJob(jobContext)
+ logInfo(s"Job $jobId committed.")
+ }
+
+ def abortJob(): Unit = {
+ if (outputCommitter != null) {
+ outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
+ }
+ logError(s"Job $jobId aborted.")
+ }
+}
+
+/**
+ * A writer that writes all of the rows in a partition to a single file.
+ */
+private[sql] class DefaultWriterContainer(
+ @transient relation: HadoopFsRelation,
+ @transient job: Job,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
+
+ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+ executorSideSetup(taskContext)
+ taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
+ val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
+ writer.initConverter(dataSchema)
+
+ // If anything below fails, we should abort the task.
+ try {
+ while (iterator.hasNext) {
+ val internalRow = iterator.next()
+ writer.writeInternal(internalRow)
+ }
+
+ commitTask()
+ } catch {
+ case cause: Throwable =>
+ logError("Aborting task.", cause)
+ abortTask()
+ throw new SparkException("Task failed while writing rows.", cause)
+ }
+
+ def commitTask(): Unit = {
+ try {
+ assert(writer != null, "OutputWriter instance should have been initialized")
+ writer.close()
+ super.commitTask()
+ } catch {
+ case cause: Throwable =>
+ // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and
+ // will cause `abortTask()` to be invoked.
+ throw new RuntimeException("Failed to commit task", cause)
+ }
+ }
+
+ def abortTask(): Unit = {
+ try {
+ writer.close()
+ } finally {
+ super.abortTask()
+ }
+ }
+ }
+}
+
+/**
+ * A writer that dynamically opens files based on the given partition columns. Internally this is
+ * done by maintaining a HashMap of open files until `maxFiles` is reached. If this occurs, the
+ * writer externally sorts the remaining rows and then writes out them out one file at a time.
+ */
+private[sql] class DynamicPartitionWriterContainer(
+ @transient relation: HadoopFsRelation,
+ @transient job: Job,
+ partitionColumns: Seq[Attribute],
+ dataColumns: Seq[Attribute],
+ inputSchema: Seq[Attribute],
+ defaultPartitionName: String,
+ maxOpenFiles: Int,
+ isAppend: Boolean)
+ extends BaseWriterContainer(relation, job, isAppend) {
+
+ def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
+ val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
+ executorSideSetup(taskContext)
+
+ // Returns the partition key given an input row
+ val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
+ // Returns the data columns to be written given an input row
+ val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
+
+ // Expressions that given a partition key build a string like: col1=val/col2=val/...
+ val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+ val escaped =
+ ScalaUDF(
+ PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
+ val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
+ val partitionName = Literal(c.name + "=") :: str :: Nil
+ if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
+ }
+
+ // Returns the partition path given a partition key.
+ val getPartitionString =
+ UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
+
+ // If anything below fails, we should abort the task.
+ try {
+ // This will be filled in if we have to fall back on sorting.
+ var sorter: UnsafeKVExternalSorter = null
+ while (iterator.hasNext && sorter == null) {
+ val inputRow = iterator.next()
+ val currentKey = getPartitionKey(inputRow)
+ var currentWriter = outputWriters.get(currentKey)
+
+ if (currentWriter == null) {
+ if (outputWriters.size < maxOpenFiles) {
+ currentWriter = newOutputWriter(currentKey)
+ outputWriters.put(currentKey.copy(), currentWriter)
+ currentWriter.writeInternal(getOutputRow(inputRow))
+ } else {
+ logInfo(s"Maximum partitions reached, falling back on sorting.")
+ sorter = new UnsafeKVExternalSorter(
+ StructType.fromAttributes(partitionColumns),
+ StructType.fromAttributes(dataColumns),
+ SparkEnv.get.blockManager,
+ SparkEnv.get.shuffleMemoryManager,
+ SparkEnv.get.shuffleMemoryManager.pageSizeBytes)
+ sorter.insertKV(currentKey, getOutputRow(inputRow))
+ }
+ } else {
+ currentWriter.writeInternal(getOutputRow(inputRow))
+ }
+ }
+
+ // If the sorter is not null that means that we reached the maxFiles above and need to finish
+ // using external sort.
+ if (sorter != null) {
+ while (iterator.hasNext) {
+ val currentRow = iterator.next()
+ sorter.insertKV(getPartitionKey(currentRow), getOutputRow(currentRow))
+ }
+
+ logInfo(s"Sorting complete. Writing out partition files one at a time.")
+
+ val sortedIterator = sorter.sortedIterator()
+ var currentKey: InternalRow = null
+ var currentWriter: OutputWriter = null
+ try {
+ while (sortedIterator.next()) {
+ if (currentKey != sortedIterator.getKey) {
+ if (currentWriter != null) {
+ currentWriter.close()
+ }
+ currentKey = sortedIterator.getKey.copy()
+ logDebug(s"Writing partition: $currentKey")
+
+ // Either use an existing file from before, or open a new one.
+ currentWriter = outputWriters.remove(currentKey)
+ if (currentWriter == null) {
+ currentWriter = newOutputWriter(currentKey)
+ }
+ }
+
+ currentWriter.writeInternal(sortedIterator.getValue)
+ }
+ } finally {
+ if (currentWriter != null) { currentWriter.close() }
+ }
+ }
+
+ commitTask()
+ } catch {
+ case cause: Throwable =>
+ logError("Aborting task.", cause)
+ abortTask()
+ throw new SparkException("Task failed while writing rows.", cause)
+ }
+
+ /** Open and returns a new OutputWriter given a partition key. */
+ def newOutputWriter(key: InternalRow): OutputWriter = {
+ val partitionPath = getPartitionString(key).getString(0)
+ val path = new Path(getWorkPath, partitionPath)
+ taskAttemptContext.getConfiguration.set(
+ "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
+ val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+ newWriter.initConverter(dataSchema)
+ newWriter
+ }
+
+ def clearOutputWriters(): Unit = {
+ outputWriters.asScala.values.foreach(_.close())
+ outputWriters.clear()
+ }
+
+ def commitTask(): Unit = {
+ try {
+ clearOutputWriters()
+ super.commitTask()
+ } catch {
+ case cause: Throwable =>
+ throw new RuntimeException("Failed to commit task", cause)
+ }
+ }
+
+ def abortTask(): Unit = {
+ try {
+ clearOutputWriters()
+ } finally {
+ super.abortTask()
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
deleted file mode 100644
index 42668979c9..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.io.IOException
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.{Utils, SerializableConfiguration}
-
-
-private[sql] case class InsertIntoDataSource(
- logicalRelation: LogicalRelation,
- query: LogicalPlan,
- overwrite: Boolean)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
- val data = DataFrame(sqlContext, query)
- // Apply the schema of the existing table to the new data.
- val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
- relation.insert(df, overwrite)
-
- // Invalidate the cache.
- sqlContext.cacheManager.invalidateCache(logicalRelation)
-
- Seq.empty[Row]
- }
-}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job. Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file. This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- * 1. Driver side setup, including output committer initialization and data source specific
- * preparation work for the write job to be issued.
- * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
- * rows within an RDD partition.
- * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any
- * exception is thrown during task commitment, also aborts that task.
- * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is
- * thrown during job commitment, also aborts the job.
- */
-private[sql] case class InsertIntoHadoopFsRelation(
- @transient relation: HadoopFsRelation,
- @transient query: LogicalPlan,
- mode: SaveMode)
- extends RunnableCommand {
-
- override def run(sqlContext: SQLContext): Seq[Row] = {
- require(
- relation.paths.length == 1,
- s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-
- val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
- val outputPath = new Path(relation.paths.head)
- val fs = outputPath.getFileSystem(hadoopConf)
- val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
- val pathExists = fs.exists(qualifiedOutputPath)
- val doInsertion = (mode, pathExists) match {
- case (SaveMode.ErrorIfExists, true) =>
- throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
- case (SaveMode.Overwrite, true) =>
- Utils.tryOrIOException {
- if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
- throw new IOException(s"Unable to clear output " +
- s"directory $qualifiedOutputPath prior to writing to it")
- }
- }
- true
- case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
- true
- case (SaveMode.Ignore, exists) =>
- !exists
- case (s, exists) =>
- throw new IllegalStateException(s"unsupported save mode $s ($exists)")
- }
- // If we are appending data to an existing dir.
- val isAppend = pathExists && (mode == SaveMode.Append)
-
- if (doInsertion) {
- val job = new Job(hadoopConf)
- job.setOutputKeyClass(classOf[Void])
- job.setOutputValueClass(classOf[InternalRow])
- FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
- // We create a DataFrame by applying the schema of relation to the data to make sure.
- // We are writing data based on the expected schema,
-
- // For partitioned relation r, r.schema's column ordering can be different from the column
- // ordering of data.logicalPlan (partition columns are all moved after data column). We
- // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
- // safely apply the schema of r.schema to the data.
- val project = Project(
- relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
-
- val queryExecution = DataFrame(sqlContext, project).queryExecution
- SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
- val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema)
-
- val partitionColumns = relation.partitionColumns.fieldNames
- if (partitionColumns.isEmpty) {
- insert(new DefaultWriterContainer(relation, job, isAppend), df)
- } else {
- val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
- insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
- }
- }
- }
-
- Seq.empty[Row]
- }
-
- /**
- * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
- */
- private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
- // Uses local vals for serialization
- val needsConversion = relation.needConversion
- val dataSchema = relation.dataSchema
-
- // This call shouldn't be put into the `try` block below because it only initializes and
- // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
- writerContainer.driverSideSetup()
-
- try {
- df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
- writerContainer.commitJob()
- relation.refresh()
- } catch { case cause: Throwable =>
- logError("Aborting job.", cause)
- writerContainer.abortJob()
- throw new SparkException("Job aborted.", cause)
- }
-
- def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- // If anything below fails, we should abort the task.
- try {
- writerContainer.executorSideSetup(taskContext)
-
- if (needsConversion) {
- val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
- .asInstanceOf[InternalRow => Row]
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
- }
- } else {
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- writerContainer.outputWriterForRow(internalRow)
- .asInstanceOf[OutputWriterInternal].writeInternal(internalRow)
- }
- }
-
- writerContainer.commitTask()
- } catch { case cause: Throwable =>
- logError("Aborting task.", cause)
- writerContainer.abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
- }
- }
- }
-
- /**
- * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
- */
- private def insertWithDynamicPartitions(
- sqlContext: SQLContext,
- writerContainer: BaseWriterContainer,
- df: DataFrame,
- partitionColumns: Array[String]): Unit = {
- // Uses a local val for serialization
- val needsConversion = relation.needConversion
- val dataSchema = relation.dataSchema
-
- require(
- df.schema == relation.schema,
- s"""DataFrame must have the same schema as the relation to which is inserted.
- |DataFrame schema: ${df.schema}
- |Relation schema: ${relation.schema}
- """.stripMargin)
-
- val partitionColumnsInSpec = relation.partitionColumns.fieldNames
- require(
- partitionColumnsInSpec.sameElements(partitionColumns),
- s"""Partition columns mismatch.
- |Expected: ${partitionColumnsInSpec.mkString(", ")}
- |Actual: ${partitionColumns.mkString(", ")}
- """.stripMargin)
-
- val output = df.queryExecution.executedPlan.output
- val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name))
- val codegenEnabled = df.sqlContext.conf.codegenEnabled
-
- // This call shouldn't be put into the `try` block below because it only initializes and
- // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
- writerContainer.driverSideSetup()
-
- try {
- df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
- writerContainer.commitJob()
- relation.refresh()
- } catch { case cause: Throwable =>
- logError("Aborting job.", cause)
- writerContainer.abortJob()
- throw new SparkException("Job aborted.", cause)
- }
-
- def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- // If anything below fails, we should abort the task.
- try {
- writerContainer.executorSideSetup(taskContext)
-
- // Projects all partition columns and casts them to strings to build partition directories.
- val partitionCasts = partitionOutput.map(Cast(_, StringType))
- val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
- val dataProj = newProjection(codegenEnabled, dataOutput, output)
-
- if (needsConversion) {
- val converter = CatalystTypeConverters.createToScalaConverter(dataSchema)
- .asInstanceOf[InternalRow => Row]
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- val partitionPart = partitionProj(internalRow)
- val dataPart = converter(dataProj(internalRow))
- writerContainer.outputWriterForRow(partitionPart).write(dataPart)
- }
- } else {
- while (iterator.hasNext) {
- val internalRow = iterator.next()
- val partitionPart = partitionProj(internalRow)
- val dataPart = dataProj(internalRow)
- writerContainer.outputWriterForRow(partitionPart)
- .asInstanceOf[OutputWriterInternal].writeInternal(dataPart)
- }
- }
-
- writerContainer.commitTask()
- } catch { case cause: Throwable =>
- logError("Aborting task.", cause)
- writerContainer.abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
- }
- }
- }
-
- // This is copied from SparkPlan, probably should move this to a more general place.
- private def newProjection(
- codegenEnabled: Boolean,
- expressions: Seq[Expression],
- inputSchema: Seq[Attribute]): Projection = {
- log.debug(
- s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
- if (codegenEnabled) {
-
- try {
- GenerateProjection.generate(expressions, inputSchema)
- } catch {
- case e: Exception =>
- if (sys.props.contains("spark.testing")) {
- throw e
- } else {
- log.error("failed to generate projection, fallback to interpreted", e)
- new InterpretedProjection(expressions, inputSchema)
- }
- }
- } else {
- new InterpretedProjection(expressions, inputSchema)
- }
- }
-}
-
-private[sql] abstract class BaseWriterContainer(
- @transient val relation: HadoopFsRelation,
- @transient job: Job,
- isAppend: Boolean)
- extends SparkHadoopMapReduceUtil
- with Logging
- with Serializable {
-
- protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
-
- // This UUID is used to avoid output file name collision between different appending write jobs.
- // These jobs may belong to different SparkContext instances. Concrete data source implementations
- // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
- // The reason why this ID is used to identify a job rather than a single task output file is
- // that, speculative tasks must generate the same output file name as the original task.
- private val uniqueWriteJobId = UUID.randomUUID()
-
- // This is only used on driver side.
- @transient private val jobContext: JobContext = job
-
- // The following fields are initialized and used on both driver and executor side.
- @transient protected var outputCommitter: OutputCommitter = _
- @transient private var jobId: JobID = _
- @transient private var taskId: TaskID = _
- @transient private var taskAttemptId: TaskAttemptID = _
- @transient protected var taskAttemptContext: TaskAttemptContext = _
-
- protected val outputPath: String = {
- assert(
- relation.paths.length == 1,
- s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
- relation.paths.head
- }
-
- protected val dataSchema = relation.dataSchema
-
- protected var outputWriterFactory: OutputWriterFactory = _
-
- private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
- def driverSideSetup(): Unit = {
- setupIDs(0, 0, 0)
- setupConf()
-
- // This UUID is sent to executor side together with the serialized `Configuration` object within
- // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
- // unique task output files.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
-
- // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
- // clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
- // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
- //
- // Also, the `prepareJobForWrite` call must happen before initializing output format and output
- // committer, since their initialization involve the job configuration, which can be potentially
- // decorated in `prepareJobForWrite`.
- outputWriterFactory = relation.prepareJobForWrite(job)
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-
- outputFormatClass = job.getOutputFormatClass
- outputCommitter = newOutputCommitter(taskAttemptContext)
- outputCommitter.setupJob(jobContext)
- }
-
- def executorSideSetup(taskContext: TaskContext): Unit = {
- setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
- setupConf()
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- outputCommitter = newOutputCommitter(taskAttemptContext)
- outputCommitter.setupTask(taskAttemptContext)
- initWriters()
- }
-
- protected def getWorkPath: String = {
- outputCommitter match {
- // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
- case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
- case _ => outputPath
- }
- }
-
- private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-
- if (isAppend) {
- // If we are appending data to an existing dir, we will only use the output committer
- // associated with the file output format since it is not safe to use a custom
- // committer for appending. For example, in S3, direct parquet output committer may
- // leave partial data in the destination dir when the the appending job fails.
- logInfo(
- s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
- "for appending.")
- defaultOutputCommitter
- } else {
- val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
- Option(committerClass).map { clazz =>
- logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
- // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
- // has an associated output committer. To override this output committer,
- // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
- // If a data source needs to override the output committer, it needs to set the
- // output committer in prepareForWrite method.
- if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
- // The specified output committer is a FileOutputCommitter.
- // So, we will use the FileOutputCommitter-specified constructor.
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
- } else {
- // The specified output committer is just a OutputCommitter.
- // So, we will use the no-argument constructor.
- val ctor = clazz.getDeclaredConstructor()
- ctor.newInstance()
- }
- }.getOrElse {
- // If output committer class is not set, we will use the one associated with the
- // file output format.
- logInfo(
- s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
- defaultOutputCommitter
- }
- }
- }
-
- private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
- this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
- this.taskId = new TaskID(this.jobId, true, splitId)
- this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
- }
-
- private def setupConf(): Unit = {
- serializableConf.value.set("mapred.job.id", jobId.toString)
- serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
- serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
- serializableConf.value.setBoolean("mapred.task.is.map", true)
- serializableConf.value.setInt("mapred.task.partition", 0)
- }
-
- // Called on executor side when writing rows
- def outputWriterForRow(row: InternalRow): OutputWriter
-
- protected def initWriters(): Unit
-
- def commitTask(): Unit = {
- SparkHadoopMapRedUtil.commitTask(
- outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
- }
-
- def abortTask(): Unit = {
- if (outputCommitter != null) {
- outputCommitter.abortTask(taskAttemptContext)
- }
- logError(s"Task attempt $taskAttemptId aborted.")
- }
-
- def commitJob(): Unit = {
- outputCommitter.commitJob(jobContext)
- logInfo(s"Job $jobId committed.")
- }
-
- def abortJob(): Unit = {
- if (outputCommitter != null) {
- outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
- }
- logError(s"Job $jobId aborted.")
- }
-}
-
-private[sql] class DefaultWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
- isAppend: Boolean)
- extends BaseWriterContainer(relation, job, isAppend) {
-
- @transient private var writer: OutputWriter = _
-
- override protected def initWriters(): Unit = {
- taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
- writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
- }
-
- override def outputWriterForRow(row: InternalRow): OutputWriter = writer
-
- override def commitTask(): Unit = {
- try {
- assert(writer != null, "OutputWriter instance should have been initialized")
- writer.close()
- super.commitTask()
- } catch { case cause: Throwable =>
- // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
- // cause `abortTask()` to be invoked.
- throw new RuntimeException("Failed to commit task", cause)
- }
- }
-
- override def abortTask(): Unit = {
- try {
- // It's possible that the task fails before `writer` gets initialized
- if (writer != null) {
- writer.close()
- }
- } finally {
- super.abortTask()
- }
- }
-}
-
-private[sql] class DynamicPartitionWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
- partitionColumns: Array[String],
- defaultPartitionName: String,
- isAppend: Boolean)
- extends BaseWriterContainer(relation, job, isAppend) {
-
- // All output writers are created on executor side.
- @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
-
- override protected def initWriters(): Unit = {
- outputWriters = new java.util.HashMap[String, OutputWriter]
- }
-
- // The `row` argument is supposed to only contain partition column values which have been casted
- // to strings.
- override def outputWriterForRow(row: InternalRow): OutputWriter = {
- val partitionPath = {
- val partitionPathBuilder = new StringBuilder
- var i = 0
-
- while (i < partitionColumns.length) {
- val col = partitionColumns(i)
- val partitionValueString = {
- val string = row.getUTF8String(i)
- if (string.eq(null)) {
- defaultPartitionName
- } else {
- PartitioningUtils.escapePathName(string.toString)
- }
- }
-
- if (i > 0) {
- partitionPathBuilder.append(Path.SEPARATOR_CHAR)
- }
-
- partitionPathBuilder.append(s"$col=$partitionValueString")
- i += 1
- }
-
- partitionPathBuilder.toString()
- }
-
- val writer = outputWriters.get(partitionPath)
- if (writer.eq(null)) {
- val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set(
- "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
- val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
- outputWriters.put(partitionPath, newWriter)
- newWriter
- } else {
- writer
- }
- }
-
- private def clearOutputWriters(): Unit = {
- if (!outputWriters.isEmpty) {
- asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
- outputWriters.clear()
- }
- }
-
- override def commitTask(): Unit = {
- try {
- clearOutputWriters()
- super.commitTask()
- } catch { case cause: Throwable =>
- throw new RuntimeException("Failed to commit task", cause)
- }
- }
-
- override def abortTask(): Unit = {
- try {
- clearOutputWriters()
- } finally {
- super.abortTask()
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 5d37140287..10f1367e69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -152,7 +152,7 @@ private[json] class JsonOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
- extends OutputWriterInternal with SparkHadoopMapRedUtil with Logging {
+ extends OutputWriter with SparkHadoopMapRedUtil with Logging {
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
@@ -170,7 +170,9 @@ private[json] class JsonOutputWriter(
}.getRecordWriter(context)
}
- override def writeInternal(row: InternalRow): Unit = {
+ override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+
+ override protected[sql] def writeInternal(row: InternalRow): Unit = {
JacksonGenerator(dataSchema, gen, row)
gen.flush()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 29c388c22e..48009b2fd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -62,7 +62,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
- extends OutputWriterInternal {
+ extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
val outputFormat = {
@@ -87,7 +87,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
outputFormat.getRecordWriter(context)
}
- override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
+ override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+
+ override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row)
override def close(): Unit = recordWriter.close(context)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 0b2929661b..c5b7ee73eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -342,18 +342,17 @@ abstract class OutputWriter {
* @since 1.4.0
*/
def close(): Unit
-}
-/**
- * This is an internal, private version of [[OutputWriter]] with an writeInternal method that
- * accepts an [[InternalRow]] rather than an [[Row]]. Data sources that return this must have
- * the conversion flag set to false.
- */
-private[sql] abstract class OutputWriterInternal extends OutputWriter {
+ private var converter: InternalRow => Row = _
- override def write(row: Row): Unit = throw new UnsupportedOperationException
+ protected[sql] def initConverter(dataSchema: StructType) = {
+ converter =
+ CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
+ }
- def writeInternal(row: InternalRow): Unit
+ protected[sql] def writeInternal(row: InternalRow): Unit = {
+ write(converter(row))
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
new file mode 100644
index 0000000000..c86ddd7c83
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{Row, QueryTest}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.util.Utils
+
+class PartitionedWriteSuite extends QueryTest {
+ import TestSQLContext.implicits._
+
+ test("write many partitions") {
+ val path = Utils.createTempDir()
+ path.delete()
+
+ val df = TestSQLContext.range(100).select($"id", lit(1).as("data"))
+ df.write.partitionBy("id").save(path.getCanonicalPath)
+
+ checkAnswer(
+ TestSQLContext.read.load(path.getCanonicalPath),
+ (0 to 99).map(Row(1, _)).toSeq)
+
+ Utils.deleteRecursively(path)
+ }
+
+ test("write many partitions with repeats") {
+ val path = Utils.createTempDir()
+ path.delete()
+
+ val base = TestSQLContext.range(100)
+ val df = base.unionAll(base).select($"id", lit(1).as("data"))
+ df.write.partitionBy("id").save(path.getCanonicalPath)
+
+ checkAnswer(
+ TestSQLContext.read.load(path.getCanonicalPath),
+ (0 to 99).map(Row(1, _)).toSeq ++ (0 to 99).map(Row(1, _)).toSeq)
+
+ Utils.deleteRecursively(path)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4a310ff4e9..7c8704b47f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -66,7 +66,7 @@ private[orc] class OrcOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
- extends OutputWriterInternal with SparkHadoopMapRedUtil with HiveInspectors {
+ extends OutputWriter with SparkHadoopMapRedUtil with HiveInspectors {
private val serializer = {
val table = new Properties()
@@ -120,7 +120,9 @@ private[orc] class OrcOutputWriter(
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
- override def writeInternal(row: InternalRow): Unit = {
+ override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+
+ override protected[sql] def writeInternal(row: InternalRow): Unit = {
var i = 0
while (i < row.numFields) {
reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))