aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-29 13:36:25 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-29 13:36:25 -0700
commit6a6f3c91ee1f63dd464eb03d156d02c1a5887d88 (patch)
tree905f27e0e5432108208972998091f6fb3ee48131
parent277148b285748e863f2b9fdf6cf12963977f91ca (diff)
downloadspark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.tar.gz
spark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.tar.bz2
spark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.zip
[SPARK-10330] Use SparkHadoopUtil TaskAttemptContext reflection methods in more places
SparkHadoopUtil contains methods that use reflection to work around TaskAttemptContext binary incompatibilities between Hadoop 1.x and 2.x. We should use these methods in more places. Author: Josh Rosen <joshrosen@databricks.com> Closes #8499 from JoshRosen/use-hadoop-reflection-in-more-places.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala7
5 files changed, 28 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 78f48a5cd7..879fd69863 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
@@ -145,7 +146,8 @@ private[sql] abstract class BaseWriterContainer(
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
- val committerClass = context.getConfiguration.getClass(
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val committerClass = configuration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
@@ -227,7 +229,8 @@ private[sql] class DefaultWriterContainer(
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
- taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ configuration.set("spark.sql.sources.output.path", outputPath)
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
writer.initConverter(dataSchema)
@@ -395,7 +398,8 @@ private[sql] class DynamicPartitionWriterContainer(
def newOutputWriter(key: InternalRow): OutputWriter = {
val partitionPath = getPartitionString(key).getString(0)
val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set(
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ configuration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
newWriter.initConverter(dataSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index ab8ca5f748..7a49157d9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -169,8 +170,10 @@ private[json] class JsonOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}.getRecordWriter(context)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 64982f37cf..c6bbc392ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -40,6 +40,7 @@ import org.apache.parquet.{Log => ApacheParquetLog}
import org.slf4j.bridge.SLF4JBridgeHandler
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -81,8 +82,10 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1cff5cf9c3..4eeca9aec1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
@@ -77,7 +78,8 @@ private[orc] class OrcOutputWriter(
}.mkString(":"))
val serde = new OrcSerde
- serde.initialize(context.getConfiguration, table)
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ serde.initialize(configuration, table)
serde
}
@@ -109,9 +111,10 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val conf = context.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
- val partition = context.getTaskAttemptID.getTaskID.getId
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val partition = taskAttemptId.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
new OrcOutputFormat().getRecordWriter(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e8141923a9..527ca7a81c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructType}
@@ -53,8 +54,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}