aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala19
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala4
-rw-r--r--project/MimaExcludes.scala5
-rw-r--r--scalastyle-config.xml8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala3
46 files changed, 150 insertions, 441 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bbdc9158d8..77e44ee026 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -874,11 +874,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
- val job = new NewHadoopJob(hadoopConfiguration)
+ val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
@@ -923,11 +923,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
- val job = new NewHadoopJob(hadoopConfiguration)
+ val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
@@ -1100,13 +1100,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
- // The call to new NewHadoopJob automatically adds security credentials to conf,
+ // The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
- val job = new NewHadoopJob(conf)
+ val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
@@ -1369,7 +1369,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
- val isDir = fs.getFileStatus(hadoopPath).isDir
+ val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ac6eaab20d..dd400b8ae8 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -25,6 +25,7 @@ import java.util.Date
import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
@@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(jobConf: JobConf)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
+class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
private val now = new Date()
private val conf = new SerializableJobConf(jobConf)
@@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf)
private def getJobContext(): JobContext = {
if (jobContext == null) {
- jobContext = newJobContext(conf.value, jID.value)
+ jobContext = new JobContextImpl(conf.value, jID.value)
}
jobContext
}
@@ -143,6 +141,12 @@ class SparkHadoopWriter(jobConf: JobConf)
taskContext
}
+ protected def newTaskAttemptContext(
+ conf: JobConf,
+ attemptId: TaskAttemptID): TaskAttemptContext = {
+ new TaskAttemptContextImpl(conf, attemptId)
+ }
+
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
jobID = jobid
splitID = splitid
@@ -150,7 +154,7 @@ class SparkHadoopWriter(jobConf: JobConf)
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}
@@ -168,9 +172,9 @@ object SparkHadoopWriter {
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
+ if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
- outputPath.makeQualified(fs)
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 59e90564b3..4bd94f13e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
-import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.annotation.DeveloperApi
@@ -76,9 +73,6 @@ class SparkHadoopUtil extends Logging {
}
}
- @deprecated("use newConfiguration with SparkConf argument", "1.2.0")
- def newConfiguration(): Configuration = newConfiguration(null)
-
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
@@ -191,33 +185,6 @@ class SparkHadoopUtil extends Logging {
}
/**
- * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
- * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
- * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
- * while it's interface in Hadoop 2.+.
- */
- def getConfigurationFromJobContext(context: JobContext): Configuration = {
- // scalastyle:off jobconfig
- val method = context.getClass.getMethod("getConfiguration")
- // scalastyle:on jobconfig
- method.invoke(context).asInstanceOf[Configuration]
- }
-
- /**
- * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
- * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
- * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
- * while it's interface in Hadoop 2.+.
- */
- def getTaskAttemptIDFromTaskAttemptContext(
- context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
- // scalastyle:off jobconfig
- val method = context.getClass.getMethod("getTaskAttemptID")
- // scalastyle:on jobconfig
- method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
- }
-
- /**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
@@ -233,11 +200,11 @@ class SparkHadoopUtil extends Logging {
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
- val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
+ val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}
- if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
+ if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
}
def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
@@ -246,12 +213,12 @@ class SparkHadoopUtil extends Logging {
def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
- val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}
- assert(baseStatus.isDir)
+ assert(baseStatus.isDirectory)
recurse(baseStatus)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 6e91d73b6e..c93bc8c127 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -167,7 +168,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
throw new IllegalArgumentException(msg)
}
- if (!fs.getFileStatus(path).isDir) {
+ if (!fs.getFileStatus(path).isDirectory) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}
@@ -304,7 +305,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
- if (!fs.delete(path)) {
+ if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
@@ -603,7 +604,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
* See SPARK-2261 for more detail.
*/
- private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
+ private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
/**
* Returns the modification time of the given event log. If the status points at an empty
@@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
- * so we have to resort to ugly reflection (as usual...).
+ * Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
@@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
- val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
- val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
- val action = actionClass.getField("SAFEMODE_GET").get(null)
- val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
- method.invoke(dfs, action).asInstanceOf[Boolean]
+ dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}
}
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 532850dd57..30431a9b98 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
/**
* Custom Input Format for reading and splitting flat binary files that contain records,
@@ -36,7 +35,7 @@ private[spark] object FixedLengthBinaryInputFormat {
/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
- SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
+ context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
}
}
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
index 67a96925da..25596a15d9 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
-import org.apache.spark.deploy.SparkHadoopUtil
/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -83,16 +82,16 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
- val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val conf = context.getConfiguration
// check compression
- val codec = new CompressionCodecFactory(job).getCodec(file)
+ val codec = new CompressionCodecFactory(conf).getCodec(file)
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
}
// get the record length
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
// get the filesystem
- val fs = file.getFileSystem(job)
+ val fs = file.getFileSystem(conf)
// open the File
fileInputStream = fs.open(file)
// seek to the splitStart position
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 280e7a5fe8..cb76e3c344 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
-import org.apache.spark.deploy.SparkHadoopUtil
-
/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
@@ -44,7 +42,7 @@ private[spark] abstract class StreamFileInputFormat[T]
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
- val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+ val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
super.setMaxSplitSize(maxSplitSize)
}
@@ -135,8 +133,7 @@ class PortableDataStream(
private val confBytes = {
val baos = new ByteArrayOutputStream()
- SparkHadoopUtil.get.getConfigurationFromJobContext(context).
- write(new DataOutputStream(baos))
+ context.getConfiguration.write(new DataOutputStream(baos))
baos.toByteArray
}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 413408723b..fa34f1e886 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -53,7 +53,7 @@ private[spark] class WholeTextFileInputFormat
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
- val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+ val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index b56b2aa88a..998c898a3f 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.deploy.SparkHadoopUtil
-
/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
@@ -52,8 +50,7 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[Text, Text] with Configurable {
private[this] val path = split.getPath(index)
- private[this] val fs = path.getFileSystem(
- SparkHadoopUtil.get.getConfigurationFromJobContext(context))
+ private[this] val fs = path.getFileSystem(context.getConfiguration)
// True means the current file has been processed, then skip it.
private[this] var processed = false
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f7298e8d5c..249bdf5994 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -18,61 +18,12 @@
package org.apache.spark.mapred
import java.io.IOException
-import java.lang.reflect.Modifier
-import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.{Logging, SparkEnv, TaskContext}
-import org.apache.spark.util.{Utils => SparkUtils}
-
-private[spark]
-trait SparkHadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
- val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
- "org.apache.hadoop.mapred.JobContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf],
- classOf[org.apache.hadoop.mapreduce.JobID])
- // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
- // Make it accessible if it's not in order to access it.
- if (!Modifier.isPublic(ctor.getModifiers)) {
- ctor.setAccessible(true)
- }
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
- val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
- "org.apache.hadoop.mapred.TaskAttemptContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
- // See above
- if (!Modifier.isPublic(ctor.getModifiers)) {
- ctor.setAccessible(true)
- }
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int): TaskAttemptID = {
- new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
- }
-
- private def firstAvailableClass(first: String, second: String): Class[_] = {
- try {
- SparkUtils.classForName(first)
- } catch {
- case e: ClassNotFoundException =>
- SparkUtils.classForName(second)
- }
- }
-}
object SparkHadoopMapRedUtil extends Logging {
/**
@@ -93,7 +44,7 @@ object SparkHadoopMapRedUtil extends Logging {
jobId: Int,
splitId: Int): Unit = {
- val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
+ val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
// Called after we have decided to commit
def performCommit(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
deleted file mode 100644
index 82d807fad8..0000000000
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapreduce
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
-import org.apache.spark.util.Utils
-
-private[spark]
-trait SparkHadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.JobContextImpl")
- val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl")
- val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int): TaskAttemptID = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.TaskAttemptID")
- try {
- // First, attempt to use the old-style constructor that takes a boolean isMap
- // (not available in YARN)
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- } catch {
- case exc: NoSuchMethodException => {
- // If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
- val taskTypeClass = Utils.classForName("org.apache.hadoop.mapreduce.TaskType")
- .asInstanceOf[Class[Enum[_]]]
- val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
- taskTypeClass, if (isMap) "MAP" else "REDUCE")
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index aedced7408..2bf2337d49 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{ Configurable, Configuration }
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.JobContextImpl
+
import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }
@@ -40,7 +42,7 @@ private[spark] class BinaryFileRDD[T](
configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f37c95bedc..920d3bf219 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.lib.CombineFileSplit
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -357,7 +358,7 @@ private[spark] object HadoopRDD extends Logging {
def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
conf: JobConf) {
val jobID = new JobID(jobTrackerId, jobId)
- val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+ val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 86f38ae836..8b330a34c3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -26,11 +26,11 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -66,9 +66,7 @@ class NewHadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
@transient private val _conf: Configuration)
- extends RDD[(K, V)](sc, Nil)
- with SparkHadoopMapReduceUtil
- with Logging {
+ extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
@@ -109,7 +107,7 @@ class NewHadoopRDD[K, V](
configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(_conf, jobId)
+ val jobContext = new JobContextImpl(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -144,8 +142,8 @@ class NewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 44d195587a..b872301425 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -33,15 +33,14 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
- RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskType, TaskAttemptID}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -53,10 +52,7 @@ import org.apache.spark.util.random.StratifiedSamplingUtils
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
- extends Logging
- with SparkHadoopMapReduceUtil
- with Serializable
-{
+ extends Logging with Serializable {
/**
* :: Experimental ::
@@ -985,11 +981,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val job = new NewAPIHadoopJob(hadoopConf)
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
- val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}
@@ -1074,11 +1070,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val job = new NewAPIHadoopJob(hadoopConf)
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfiguration = job.getConfiguration
val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
@@ -1091,9 +1087,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
/* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+ val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
context.attemptNumber)
- val hadoopContext = newTaskAttemptContext(config, attemptId)
+ val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(config)
@@ -1125,8 +1121,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1
} : Int
- val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+ val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
// When speculation is on and output committer class name contains "Direct", we should warn
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fa71b8c262..a9b3d52bbe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -174,7 +174,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
- fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
+ fs.create(tempOutputPath, false, bufferSize,
+ fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
index e3f14fe7ef..8e1baae796 100644
--- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.{Text, Writable}
import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.task.JobContextImpl
import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.input.WholeTextFileInputFormat
@@ -44,7 +45,7 @@ private[spark] class WholeTextFileRDD(
configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index eaa07acc51..68792c58c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -77,14 +77,6 @@ private[spark] class EventLoggingListener(
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
- // The Hadoop APIs have changed over time, so we use reflection to figure out
- // the correct method to use to flush a hadoop data stream. See SPARK-1518
- // for details.
- private val hadoopFlushMethod = {
- val cls = classOf[FSDataOutputStream]
- scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
- }
-
private var writer: Option[PrintWriter] = None
// For testing. Keep track of all JSON serialized events that have been logged.
@@ -97,7 +89,7 @@ private[spark] class EventLoggingListener(
* Creates the log file in the configured log directory.
*/
def start() {
- if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) {
+ if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
}
@@ -147,7 +139,7 @@ private[spark] class EventLoggingListener(
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
- hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
+ hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 0e438ab436..8235b10245 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
org.apache.hadoop.mapreduce.InputFormat[_, _]]
- val job = new Job(conf)
+ val job = Job.getInstance(conf)
val retval = new ArrayBuffer[SplitInfo]()
val list = instance.getSplits(job)
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 0065b1fc66..acc24ca0fb 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
import java.io.File
import java.util.PriorityQueue
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
@@ -177,21 +177,8 @@ private [util] class SparkShutdownHookManager {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
- Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
- case Success(shmClass) =>
- val fsPriority = classOf[FileSystem]
- .getField("SHUTDOWN_HOOK_PRIORITY")
- .get(null) // static field, the value is not used
- .asInstanceOf[Int]
- val shm = shmClass.getMethod("get").invoke(null)
- shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
- .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
- case Failure(_) =>
- // scalastyle:off runtimeaddshutdownhook
- Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
- // scalastyle:on runtimeaddshutdownhook
- }
+ org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
+ hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
}
def runAll(): Unit = {
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 11f1248c24..d91948e446 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1246,7 +1246,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
- IntWritable.class, Text.class, new Job().getConfiguration());
+ IntWritable.class, Text.class, Job.getInstance().getConfiguration());
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index f6a7f4375f..2e47801aaf 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,12 +19,11 @@ package org.apache.spark
import java.io.{File, FileWriter}
-import org.apache.spark.deploy.SparkHadoopUtil
+import scala.io.Source
+
import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel
-import scala.io.Source
-
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
@@ -506,11 +505,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- val job = new Job(sc.hadoopConfiguration)
+ val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
- val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfig = job.getConfiguration
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 5cb2d4225d..43da6fc5b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -67,11 +67,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
assert(fileSystem.exists(logPath))
val logStatus = fileSystem.getFileStatus(logPath)
- assert(!logStatus.isDir)
+ assert(!logStatus.isDirectory)
// Verify log is renamed after stop()
eventLogger.stop()
- assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDir)
+ assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory)
}
test("Basic event logging") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 103fc19369..761e82e6cf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -23,7 +23,6 @@ import java.net.URI
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
@@ -115,7 +114,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
val applications = fileSystem.listStatus(logDirPath)
assert(applications != null && applications.size > 0)
val eventLog = applications.sortBy(_.getModificationTime).last
- assert(!eventLog.isDir)
+ assert(!eventLog.isDirectory)
// Replay events
val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index d1b9b8d398..5a80985a49 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -16,7 +16,6 @@
*/
// scalastyle:off println
- // scalastyle:off jobcontext
package org.apache.spark.examples
import java.nio.ByteBuffer
@@ -80,7 +79,7 @@ object CassandraCQLTest {
val InputColumnFamily = "ordercf"
val OutputColumnFamily = "salecount"
- val job = new Job()
+ val job = Job.getInstance()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
val configuration = job.getConfiguration
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
@@ -137,4 +136,3 @@ object CassandraCQLTest {
}
}
// scalastyle:on println
-// scalastyle:on jobcontext
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
index 1e679bfb55..ad39a012b4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
@@ -16,7 +16,6 @@
*/
// scalastyle:off println
-// scalastyle:off jobcontext
package org.apache.spark.examples
import java.nio.ByteBuffer
@@ -59,7 +58,7 @@ object CassandraTest {
val sc = new SparkContext(sparkConf)
// Build the job configuration with ConfigHelper provided by Cassandra
- val job = new Job()
+ val job = Job.getInstance()
job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
val host: String = args(1)
@@ -131,7 +130,6 @@ object CassandraTest {
}
}
// scalastyle:on println
-// scalastyle:on jobcontext
/*
create keyspace casDemo;
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 59886ab762..612ddf86de 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -49,6 +49,11 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
) ++
+ Seq(
+ // SPARK-12481
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.mapred.SparkHadoopMapRedUtil")
+ ) ++
// When 1.6 is officially released, update this exclusion list.
Seq(
MimaBuild.excludeSparkPackage("deploy"),
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 16d18b3328..ee855ca0e0 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -187,14 +187,6 @@ This file is divided into 3 sections:
scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage>
</check>
- <!-- As of SPARK-10330 JobContext methods should not be called directly -->
- <check customId="jobcontext" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
- <parameters><parameter name="regex">^getConfiguration$|^getTaskAttemptID$</parameter></parameters>
- <customMessage>Instead of calling .getConfiguration() or .getTaskAttemptID() directly,
- use SparkHadoopUtil's getConfigurationFromJobContext() and getTaskAttemptIDFromTaskAttemptContext() methods.
- </customMessage>
- </check>
-
<!-- ================================================================================ -->
<!-- rules we'd like to enforce, but haven't cleaned up the codebase yet -->
<!-- ================================================================================ -->
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 735d52f808..758bcd706a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -93,7 +93,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val isAppend = pathExists && (mode == SaveMode.Append)
if (doInsertion) {
- val job = new Job(hadoopConf)
+ val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index eea780cbaa..12f8783f84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -26,10 +26,10 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
import org.apache.spark.storage.StorageLevel
@@ -68,16 +68,14 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
- extends RDD[V](sqlContext.sparkContext, Nil)
- with SparkHadoopMapReduceUtil
- with Logging {
+ extends RDD[V](sqlContext.sparkContext, Nil) with Logging {
protected def getJob(): Job = {
- val conf: Configuration = broadcastedConf.value.value
+ val conf = broadcastedConf.value.value
// "new Job" will make a copy of the conf. Then, it is
// safe to mutate conf properties with initLocalJobFuncOpt
// and initDriverSideJobFuncOpt.
- val newJob = new Job(conf)
+ val newJob = Job.getInstance(conf)
initLocalJobFuncOpt.map(f => f(newJob))
newJob
}
@@ -87,7 +85,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
if (isDriverSide) {
initDriverSideJobFuncOpt.map(f => f(job))
}
- SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ job.getConfiguration
}
private val jobTrackerId: String = {
@@ -110,7 +108,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = new JobContextImpl(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[SparkPartition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -154,8 +152,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
configurable.setConf(conf)
case _ =>
}
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private[this] var reader: RecordReader[Void, V] = null
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 983f4df1de..8b0b647744 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.InternalRow
@@ -41,14 +41,12 @@ private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
@transient private val job: Job,
isAppend: Boolean)
- extends SparkHadoopMapReduceUtil
- with Logging
- with Serializable {
+ extends Logging with Serializable {
protected val dataSchema = relation.dataSchema
protected val serializableConf =
- new SerializableConfiguration(SparkHadoopUtil.get.getConfigurationFromJobContext(job))
+ new SerializableConfiguration(job.getConfiguration)
// This UUID is used to avoid output file name collision between different appending write jobs.
// These jobs may belong to different SparkContext instances. Concrete data source implementations
@@ -90,8 +88,7 @@ private[sql] abstract class BaseWriterContainer(
// This UUID is sent to executor side together with the serialized `Configuration` object within
// the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
// unique task output files.
- SparkHadoopUtil.get.getConfigurationFromJobContext(job).
- set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
// Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor
// clones the Configuration object passed in. If we initialize the TaskAttemptContext first,
@@ -101,7 +98,7 @@ private[sql] abstract class BaseWriterContainer(
// committer, since their initialization involve the job configuration, which can be potentially
// decorated in `prepareJobForWrite`.
outputWriterFactory = relation.prepareJobForWrite(job)
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+ taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
@@ -111,7 +108,7 @@ private[sql] abstract class BaseWriterContainer(
def executorSideSetup(taskContext: TaskContext): Unit = {
setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
setupConf()
- taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
+ taskAttemptContext = new TaskAttemptContextImpl(serializableConf.value, taskAttemptId)
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupTask(taskAttemptContext)
}
@@ -166,7 +163,7 @@ private[sql] abstract class BaseWriterContainer(
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val committerClass = configuration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
@@ -201,10 +198,8 @@ private[sql] abstract class BaseWriterContainer(
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
- this.taskId = new TaskID(this.jobId, true, splitId)
- // scalastyle:off jobcontext
+ this.taskId = new TaskID(this.jobId, TaskType.MAP, splitId)
this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
- // scalastyle:on jobcontext
}
private def setupConf(): Unit = {
@@ -250,7 +245,7 @@ private[sql] class DefaultWriterContainer(
def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
executorSideSetup(taskContext)
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ val configuration = taskAttemptContext.getConfiguration
configuration.set("spark.sql.sources.output.path", outputPath)
val writer = newOutputWriter(getWorkPath)
writer.initConverter(dataSchema)
@@ -421,7 +416,7 @@ private[sql] class DynamicPartitionWriterContainer(
def newOutputWriter(key: InternalRow): OutputWriter = {
val partitionPath = getPartitionString(key).getString(0)
val path = new Path(getWorkPath, partitionPath)
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
+ val configuration = taskAttemptContext.getConfiguration
configuration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = super.newOutputWriter(path.toString)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3e61ba35be..54a8552134 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -30,8 +30,6 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
@@ -89,8 +87,8 @@ private[sql] class JSONRelation(
override val needConversion: Boolean = false
private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = {
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
val paths = inputPaths.map(_.getPath)
@@ -176,7 +174,7 @@ private[json] class JsonOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
- extends OutputWriter with SparkHadoopMapRedUtil with Logging {
+ extends OutputWriter with Logging {
private[this] val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
@@ -186,9 +184,9 @@ private[json] class JsonOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index a958373eb7..e5d8e6088b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -58,9 +58,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
*/
override def init(context: InitContext): ReadContext = {
catalystRequestedSchema = {
- // scalastyle:off jobcontext
val conf = context.getConfiguration
- // scalastyle:on jobcontext
val schemaString = conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 1a4e99ff10..e54f51e383 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -54,11 +54,7 @@ private[datasources] class DirectParquetOutputCommitter(
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
- val configuration = {
- // scalastyle:off jobcontext
- ContextUtil.getConfiguration(jobContext)
- // scalastyle:on jobcontext
- }
+ val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 1af2a394f3..af964b4d35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.task.JobContextImpl
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -40,7 +41,6 @@ import org.apache.parquet.{Log => ApacheParquetLog}
import org.slf4j.bridge.SLF4JBridgeHandler
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
@@ -82,9 +82,9 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
// `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
// partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
@@ -217,11 +217,7 @@ private[sql] class ParquetRelation(
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- val conf = {
- // scalastyle:off jobcontext
- ContextUtil.getConfiguration(job)
- // scalastyle:on jobcontext
- }
+ val conf = ContextUtil.getConfiguration(job)
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
@@ -340,7 +336,7 @@ private[sql] class ParquetRelation(
// URI of the path to create a new Path.
val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
new FileStatus(
- f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
+ f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, f.getModificationTime,
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithEscapedAuthority)
}.toSeq
@@ -359,7 +355,7 @@ private[sql] class ParquetRelation(
}
}
- val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
+ val jobContext = new JobContextImpl(getConf(isDriverSide = true), jobId)
val rawSplits = inputFormat.getSplits(jobContext)
Array.tabulate[SparkPartition](rawSplits.size) { i =>
@@ -564,7 +560,7 @@ private[sql] object ParquetRelation extends Logging {
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
// Try to push down filters when filter push-down is enabled.
@@ -607,7 +603,7 @@ private[sql] object ParquetRelation extends Logging {
FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}
- overrideMinSplitSize(parquetBlockSize, SparkHadoopUtil.get.getConfigurationFromJobContext(job))
+ overrideMinSplitSize(parquetBlockSize, job.getConfiguration)
}
private[parquet] def readSchema(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 41fcb11d84..248467abe9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -88,8 +86,8 @@ private[sql] class TextRelation(
filters: Array[Filter],
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
if (paths.nonEmpty) {
@@ -138,17 +136,16 @@ private[sql] class TextRelation(
}
class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
- extends OutputWriter
- with SparkHadoopMapRedUtil {
+ extends OutputWriter {
private[this] val buffer = new Text()
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index fc8ce6901d..d6c5d14357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -462,7 +462,7 @@ abstract class HadoopFsRelation private[sql](
name.toLowerCase == "_temporary" || name.startsWith(".")
}
- val (dirs, files) = statuses.partition(_.isDir)
+ val (dirs, files) = statuses.partition(_.isDirectory)
// It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500)
if (dirs.isEmpty) {
@@ -858,10 +858,10 @@ private[sql] object HadoopFsRelation extends Logging {
val jobConf = new JobConf(fs.getConf, this.getClass())
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
if (pathFilter != null) {
- val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDir)
+ val (dirs, files) = fs.listStatus(status.getPath, pathFilter).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
} else {
- val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir))
}
}
@@ -896,7 +896,7 @@ private[sql] object HadoopFsRelation extends Logging {
FakeFileStatus(
status.getPath.toString,
status.getLen,
- status.isDir,
+ status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 384ea211df..5d00e73670 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -380,7 +380,7 @@ class HiveContext private[hive](
def calculateTableSize(fs: FileSystem, path: Path): Long = {
val fileStatus = fs.getFileStatus(path)
- val size = if (fileStatus.isDir) {
+ val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath().getName().startsWith(stagingDir)) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 598ccdeee4..d3da22aa0a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -31,9 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkException, Logging}
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -65,74 +63,6 @@ private[hive] class ClientWrapper(
extends ClientInterface
with Logging {
- overrideHadoopShims()
-
- // !! HACK ALERT !!
- //
- // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
- // major version number gathered from Hadoop jar files:
- //
- // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with
- // security.
- // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23.
- //
- // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It
- // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but
- // `Hadoop23Shims` is chosen because the major version number here is 2.
- //
- // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo`
- // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions. If Hadoop version information is
- // not available, we decide whether to override the shims or not by checking for existence of a
- // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions.
- private def overrideHadoopShims(): Unit = {
- val hadoopVersion = VersionInfo.getVersion
- val VersionPattern = """(\d+)\.(\d+).*""".r
-
- hadoopVersion match {
- case null =>
- logError("Failed to inspect Hadoop version")
-
- // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method.
- val probeMethod = "getPathWithoutSchemeAndAuthority"
- if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) {
- logInfo(
- s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " +
- s"we are probably using Hadoop 1.x or 2.0.x")
- loadHadoop20SShims()
- }
-
- case VersionPattern(majorVersion, minorVersion) =>
- logInfo(s"Inspected Hadoop version: $hadoopVersion")
-
- // Loads Hadoop20SShims for 1.x and 2.0.x versions
- val (major, minor) = (majorVersion.toInt, minorVersion.toInt)
- if (major < 2 || (major == 2 && minor == 0)) {
- loadHadoop20SShims()
- }
- }
-
- // Logs the actual loaded Hadoop shims class
- val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName
- logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion")
- }
-
- private def loadHadoop20SShims(): Unit = {
- val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
- logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName")
-
- try {
- val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
- // scalastyle:off classforname
- val shimsClass = Class.forName(hadoop20SShimsClassName)
- // scalastyle:on classforname
- val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
- shimsField.setAccessible(true)
- shimsField.set(null, shims)
- } catch { case cause: Throwable =>
- throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause)
- }
- }
-
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new CircularBuffer()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 93c016b6c6..777e7857d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -27,9 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
-import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
@@ -46,9 +47,7 @@ import org.apache.spark.util.SerializableJobConf
private[hive] class SparkHiveWriterContainer(
jobConf: JobConf,
fileSinkConf: FileSinkDesc)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
+ extends Logging with Serializable {
private val now = new Date()
private val tableDesc: TableDesc = fileSinkConf.getTableInfo
@@ -68,8 +67,8 @@ private[hive] class SparkHiveWriterContainer(
@transient private var writer: FileSinkOperator.RecordWriter = null
@transient protected lazy val committer = conf.value.getOutputCommitter
- @transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
- @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
+ @transient protected lazy val jobContext = new JobContextImpl(conf.value, jID.value)
+ @transient private lazy val taskContext = new TaskAttemptContextImpl(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
@@ -131,7 +130,7 @@ private[hive] class SparkHiveWriterContainer(
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
private def setConfParams() {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 0f9a1a6ef3..b91a14bdbc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -95,7 +95,7 @@ private[orc] object OrcFileOperator extends Logging {
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
- .filterNot(_.isDir)
+ .filterNot(_.isDirectory)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1136670b7a..84ef12a68e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -33,8 +33,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -67,7 +65,7 @@ private[orc] class OrcOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
- extends OutputWriter with SparkHadoopMapRedUtil with HiveInspectors {
+ extends OutputWriter with HiveInspectors {
private val serializer = {
val table = new Properties()
@@ -77,7 +75,7 @@ private[orc] class OrcOutputWriter(
}.mkString(":"))
val serde = new OrcSerde
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
serde.initialize(configuration, table)
serde
}
@@ -99,9 +97,9 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
@@ -208,7 +206,7 @@ private[sql] class OrcRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
+ job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -289,8 +287,8 @@ private[orc] case class OrcTableScan(
}
def execute(): RDD[InternalRow] = {
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 01960fd290..e10d21d5e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions}
@@ -53,9 +52,9 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9418beec0d..15ad2e27d3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -224,7 +224,8 @@ private[streaming] class FileBasedWriteAheadLog(
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ if (fileSystem.exists(logDirectoryPath) &&
+ fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
pastLogs.clear()
pastLogs ++= logFileInfo
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
index 1185f30265..1f5c1d4369 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogWriter.scala
@@ -19,10 +19,7 @@ package org.apache.spark.streaming.util
import java.io._
import java.nio.ByteBuffer
-import scala.util.Try
-
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.spark.util.Utils
@@ -34,11 +31,6 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)
- private lazy val hadoopFlushMethod = {
- // Use reflection to get the right flush operation
- val cls = classOf[FSDataOutputStream]
- Try(cls.getMethod("hflush")).orElse(Try(cls.getMethod("sync"))).toOption
- }
private var nextOffset = stream.getPos()
private var closed = false
@@ -62,7 +54,7 @@ private[streaming] class FileBasedWriteAheadLogWriter(path: String, hadoopConf:
}
private def flush() {
- hadoopFlushMethod.foreach { _.invoke(stream) }
+ stream.hflush()
// Useful for local file system where hflush/sync does not work (HADOOP-7844)
stream.getWrappedStream.flush()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index beaae34535..a670c7d638 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -705,7 +705,8 @@ object WriteAheadLogSuite {
val logDirectoryPath = new Path(directory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
- if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
+ if (fileSystem.exists(logDirectoryPath) &&
+ fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
fileSystem.listStatus(logDirectoryPath).map { _.getPath() }.sortBy {
_.getName().split("-")(1).toLong
}.map {