aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-29 13:36:25 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-29 13:36:25 -0700
commit6a6f3c91ee1f63dd464eb03d156d02c1a5887d88 (patch)
tree905f27e0e5432108208972998091f6fb3ee48131 /sql/hive
parent277148b285748e863f2b9fdf6cf12963977f91ca (diff)
downloadspark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.tar.gz
spark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.tar.bz2
spark-6a6f3c91ee1f63dd464eb03d156d02c1a5887d88.zip
[SPARK-10330] Use SparkHadoopUtil TaskAttemptContext reflection methods in more places
SparkHadoopUtil contains methods that use reflection to work around TaskAttemptContext binary incompatibilities between Hadoop 1.x and 2.x. We should use these methods in more places. Author: Josh Rosen <joshrosen@databricks.com> Closes #8499 from JoshRosen/use-hadoop-reflection-in-more-places.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala7
2 files changed, 11 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1cff5cf9c3..4eeca9aec1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
@@ -77,7 +78,8 @@ private[orc] class OrcOutputWriter(
}.mkString(":"))
val serde = new OrcSerde
- serde.initialize(context.getConfiguration, table)
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ serde.initialize(configuration, table)
serde
}
@@ -109,9 +111,10 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val conf = context.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
- val partition = context.getTaskAttemptID.getTaskID.getId
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val partition = taskAttemptId.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
new OrcOutputFormat().getRecordWriter(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index e8141923a9..527ca7a81c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types.{DataType, StructType}
@@ -53,8 +54,10 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val uniqueWriteJobId = context.getConfiguration.get("spark.sql.sources.writeJobUUID")
- val split = context.getTaskAttemptID.getTaskID.getId
+ val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
}