aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala7
5 files changed, 28 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 78f48a5cd7..879fd69863 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
@@ -145,7 +146,8 @@ private[sql] abstract class BaseWriterContainer(
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
- val committerClass = context.getConfiguration.getClass(
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val committerClass = configuration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
@@ -227,7 +229,8 @@ private[sql] class DefaultWriterContainer(
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
- taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ configuration.set("spark.sql.sources.output.path", outputPath)
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
writer.initConverter(dataSchema)
@@ -395,7 +398,8 @@ private[sql] class DynamicPartitionWriterContainer(
def newOutputWriter(key: InternalRow): OutputWriter = {
val partitionPath = getPartitionString(key).getString(0)
val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set(
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ configuration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
newWriter.initConverter(dataSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index ab8ca5f748..7a49157d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -169,8 +170,10 @@ private[json] class JsonOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}.getRecordWriter(context)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 64982f37cf..c6bbc392ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -40,6 +40,7 @@ import org.apache.parquet.{Log => ApacheParquetLog}
import org.slf4j.bridge.SLF4JBridgeHandler
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -81,8 +82,10 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1cff5cf9c3..4eeca9aec1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
@@ -77,7 +78,8 @@ private[orc] class OrcOutputWriter(
}.mkString(":"))
val serde = new OrcSerde
- serde.initialize(context.getConfiguration, table)
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ serde.initialize(configuration, table)
serde
}
@@ -109,9 +111,10 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val conf = context.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
- val partition = context.getTaskAttemptID.getTaskID.getId
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val partition = taskAttemptId.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
new OrcOutputFormat().getRecordWriter(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e8141923a9..527ca7a81c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructType}
@@ -53,8 +54,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}