aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala2
-rw-r--r--scalastyle-config.xml8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala4
15 files changed, 61 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbfe8bf31c..e27b3c4962 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
@@ -910,7 +910,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = job.getConfiguration
+ val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
@@ -1092,7 +1092,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updatedConf = job.getConfiguration
+ val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index f7723ef5bd..a0b7365df9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -192,7 +192,9 @@ class SparkHadoopUtil extends Logging {
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getConfiguration")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[Configuration]
}
@@ -204,7 +206,9 @@ class SparkHadoopUtil extends Logging {
*/
def getTaskAttemptIDFromTaskAttemptContext(
context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
+ // scalastyle:off jobconfig
val method = context.getClass.getMethod("getTaskAttemptID")
+ // scalastyle:on jobconfig
method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c59f0d4aa7..199d79b811 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -996,8 +996,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
- job.getConfiguration.set("mapred.output.dir", path)
- saveAsNewAPIHadoopDataset(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ jobConfiguration.set("mapred.output.dir", path)
+ saveAsNewAPIHadoopDataset(jobConfiguration)
}
/**
@@ -1064,7 +1065,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val wrappedConf = new SerializableConfiguration(job.getConfiguration)
+ val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 9babe56267..0228c54e05 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -86,7 +86,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
- job.getConfiguration
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job)
}
private val jobTrackerId: String = {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 418763f4e5..fdb00aafc4 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.{File, FileWriter}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel
@@ -506,8 +507,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
- job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
- randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
+ val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
+ randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index fa07c1e501..d1b9b8d398 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -16,6 +16,7 @@
*/
// scalastyle:off println
+ // scalastyle:off jobcontext
package org.apache.spark.examples
import java.nio.ByteBuffer
@@ -81,6 +82,7 @@ object CassandraCQLTest {
val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
+ val configuration = job.getConfiguration
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
@@ -135,3 +137,4 @@ object CassandraCQLTest {
}
}
// scalastyle:on println
+// scalastyle:on jobcontext
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 2e56d24c60..1e679bfb55 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -16,6 +16,7 @@
*/
// scalastyle:off println
+// scalastyle:off jobcontext
package org.apache.spark.examples
import java.nio.ByteBuffer
@@ -130,6 +131,7 @@ object CassandraTest {
}
}
// scalastyle:on println
+// scalastyle:on jobcontext
/*
create keyspace casDemo;
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 68fdb4141c..64a0c71bbe 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -168,6 +168,14 @@ This file is divided into 3 sections:
scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
</check>
+ <!-- As of SPARK-10330 JobContext methods should not be called directly -->
+ <check customId="jobcontext" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
+ <parameters><parameter name="regex">^getConfiguration$|^getTaskAttemptID$</parameter></parameters>
+ <customMessage>Instead of calling .getConfiguration() or .getTaskAttemptID() directly,
+ use SparkHadoopUtil's getConfigurationFromJobContext() and getTaskAttemptIDFromTaskAttemptContext() methods.
+ </customMessage>
+ </check>
+
<!-- ================================================================================ -->
<!-- rules we'd like to enforce, but haven't cleaned up the codebase yet -->
<!-- ================================================================================ -->
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9a573db0c0..f8ef674ed2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -47,7 +47,8 @@ private[sql] abstract class BaseWriterContainer(
protected val dataSchema = relation.dataSchema
- protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+ protected val serializableConf =
+ new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job))
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
@@ -89,7 +90,8 @@ private[sql] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job).
+ set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
@@ -182,7 +184,9 @@ private[sql] abstract class BaseWriterContainer(
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
+ // scalastyle:off jobcontext
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+ // scalastyle:on jobcontext
}
private def setupConf(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 7a49157d9e..8ee0127c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -81,7 +81,7 @@ private[sql] class JSONRelation(
private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val paths = inputPaths.map(_.getPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 5a8166fac5..8c819f1a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -72,7 +72,11 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
- val conf = context.getConfiguration
+ val conf = {
+ // scalastyle:off jobcontext
+ context.getConfiguration
+ // scalastyle:on jobcontext
+ }
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its metadata.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 2c6b914328..de1fd0166a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -53,7 +53,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
- val configuration = ContextUtil.getConfiguration(jobContext)
+ val configuration = {
+ // scalastyle:off jobcontext
+ ContextUtil.getConfiguration(jobContext)
+ // scalastyle:on jobcontext
+ }
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index c6bbc392ca..953fcab126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -211,7 +211,11 @@ private[sql] class ParquetRelation(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- val conf = ContextUtil.getConfiguration(job)
+ val conf = {
+ // scalastyle:off jobcontext
+ ContextUtil.getConfiguration(job)
+ // scalastyle:on jobcontext
+ }
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
@@ -528,7 +532,7 @@ private[sql] object ParquetRelation extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
// Try to push down filters when filter push-down is enabled.
@@ -572,7 +576,7 @@ private[sql] object ParquetRelation extends Logging {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
- overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
+ overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job))
}
private[parquet] def readSchema(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
index 142301fe87..b647bb6116 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
@@ -123,7 +123,11 @@ private[parquet] object ParquetTypesConverter extends Logging {
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
}
val job = new Job()
- val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
+ val conf = {
+ // scalastyle:off jobcontext
+ configuration.getOrElse(ContextUtil.getConfiguration(job))
+ // scalastyle:on jobcontext
+ }
val fs: FileSystem = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7e89109259..d1f30e188e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -208,7 +208,7 @@ private[sql] class OrcRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- job.getConfiguration match {
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -289,7 +289,7 @@ private[orc] case class OrcTableScan(
def execute(): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {