aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-12 16:23:55 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-09-12 16:23:55 -0700
commitb3a7480ab0821ab38f710de96e3ac4a13f62dbca (patch)
tree2a286b61a1a7e22cc0f0306b0c073b0ae3ac9e89 /sql
parentf4a22808e03fa12bfe1bfc82cf713cfda7e063a9 (diff)
downloadspark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.tar.gz
spark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.tar.bz2
spark-b3a7480ab0821ab38f710de96e3ac4a13f62dbca.zip
[SPARK-10330] Add Scalastyle rule to require use of SparkHadoopUtil JobContext methods
This is a followup to #8499 which adds a Scalastyle rule to mandate the use of SparkHadoopUtil's JobContext accessor methods and fixes the existing violations. Author: Josh Rosen <joshrosen@databricks.com> Closes #8521 from JoshRosen/SPARK-10330-part2.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala4
7 files changed, 31 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 9a573db0c0..f8ef674ed2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -47,7 +47,8 @@ private[sql] abstract class BaseWriterContainer(
protected val dataSchema = relation.dataSchema
- protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
+ protected val serializableConf =
+ new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job))
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
@@ -89,7 +90,8 @@ private[sql] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
- job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job).
+ set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
@@ -182,7 +184,9 @@ private[sql] abstract class BaseWriterContainer(
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
+ // scalastyle:off jobcontext
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
+ // scalastyle:on jobcontext
}
private def setupConf(): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 7a49157d9e..8ee0127c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -81,7 +81,7 @@ private[sql] class JSONRelation(
private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val paths = inputPaths.map(_.getPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 5a8166fac5..8c819f1a48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -72,7 +72,11 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// Called before `prepareForRead()` when initializing Parquet record reader.
override def init(context: InitContext): ReadContext = {
- val conf = context.getConfiguration
+ val conf = {
+ // scalastyle:off jobcontext
+ context.getConfiguration
+ // scalastyle:on jobcontext
+ }
// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its metadata.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 2c6b914328..de1fd0166a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -53,7 +53,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
- val configuration = ContextUtil.getConfiguration(jobContext)
+ val configuration = {
+ // scalastyle:off jobcontext
+ ContextUtil.getConfiguration(jobContext)
+ // scalastyle:on jobcontext
+ }
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index c6bbc392ca..953fcab126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -211,7 +211,11 @@ private[sql] class ParquetRelation(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- val conf = ContextUtil.getConfiguration(job)
+ val conf = {
+ // scalastyle:off jobcontext
+ ContextUtil.getConfiguration(job)
+ // scalastyle:on jobcontext
+ }
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
@@ -528,7 +532,7 @@ private[sql] object ParquetRelation extends Logging {
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
// Try to push down filters when filter push-down is enabled.
@@ -572,7 +576,7 @@ private[sql] object ParquetRelation extends Logging {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
- overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
+ overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job))
}
private[parquet] def readSchema(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
index 142301fe87..b647bb6116 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
@@ -123,7 +123,11 @@ private[parquet] object ParquetTypesConverter extends Logging {
throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
}
val job = new Job()
- val conf = configuration.getOrElse(ContextUtil.getConfiguration(job))
+ val conf = {
+ // scalastyle:off jobcontext
+ configuration.getOrElse(ContextUtil.getConfiguration(job))
+ // scalastyle:on jobcontext
+ }
val fs: FileSystem = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 7e89109259..d1f30e188e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -208,7 +208,7 @@ private[sql] class OrcRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- job.getConfiguration match {
+ SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -289,7 +289,7 @@ private[orc] case class OrcTableScan(
def execute(): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = job.getConfiguration
+ val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {