aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
committerSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
commit15bd73627e04591fd13667b4838c9098342db965 (patch)
tree5b23cdd4e75138f38ad51ebb17dcf45305c9e3c0 /core
parent94f7a12b3c8e4a6ecd969893e562feb7ffba4c24 (diff)
downloadspark-15bd73627e04591fd13667b4838c9098342db965.tar.gz
spark-15bd73627e04591fd13667b4838c9098342db965.tar.bz2
spark-15bd73627e04591fd13667b4838c9098342db965.zip
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x
Remove use of deprecated Hadoop APIs now that 2.2+ is required Author: Sean Owen <sowen@cloudera.com> Closes #10446 from srowen/SPARK-12481.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala19
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
24 files changed, 78 insertions, 260 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bbdc9158d8..77e44ee026 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -874,11 +874,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
- val job = new NewHadoopJob(hadoopConfiguration)
+ val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
@@ -923,11 +923,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
- val job = new NewHadoopJob(hadoopConfiguration)
+ val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
@@ -1100,13 +1100,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
- // The call to new NewHadoopJob automatically adds security credentials to conf,
+ // The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
- val job = new NewHadoopJob(conf)
+ val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
- val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
@@ -1369,7 +1369,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
- val isDir = fs.getFileStatus(hadoopPath).isDir
+ val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index ac6eaab20d..dd400b8ae8 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -25,6 +25,7 @@ import java.util.Date
import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
@@ -37,10 +38,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(jobConf: JobConf)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
+class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
private val now = new Date()
private val conf = new SerializableJobConf(jobConf)
@@ -131,7 +129,7 @@ class SparkHadoopWriter(jobConf: JobConf)
private def getJobContext(): JobContext = {
if (jobContext == null) {
- jobContext = newJobContext(conf.value, jID.value)
+ jobContext = new JobContextImpl(conf.value, jID.value)
}
jobContext
}
@@ -143,6 +141,12 @@ class SparkHadoopWriter(jobConf: JobConf)
taskContext
}
+ protected def newTaskAttemptContext(
+ conf: JobConf,
+ attemptId: TaskAttemptID): TaskAttemptContext = {
+ new TaskAttemptContextImpl(conf, attemptId)
+ }
+
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
jobID = jobid
splitID = splitid
@@ -150,7 +154,7 @@ class SparkHadoopWriter(jobConf: JobConf)
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
}
@@ -168,9 +172,9 @@ object SparkHadoopWriter {
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
+ if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
- outputPath.makeQualified(fs)
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 59e90564b3..4bd94f13e5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -33,9 +33,6 @@ import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.JobContext
-import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
-import org.apache.hadoop.mapreduce.{TaskAttemptID => MapReduceTaskAttemptID}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.annotation.DeveloperApi
@@ -76,9 +73,6 @@ class SparkHadoopUtil extends Logging {
}
}
- @deprecated("use newConfiguration with SparkConf argument", "1.2.0")
- def newConfiguration(): Configuration = newConfiguration(null)
-
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
@@ -191,33 +185,6 @@ class SparkHadoopUtil extends Logging {
}
/**
- * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
- * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
- * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
- * while it's interface in Hadoop 2.+.
- */
- def getConfigurationFromJobContext(context: JobContext): Configuration = {
- // scalastyle:off jobconfig
- val method = context.getClass.getMethod("getConfiguration")
- // scalastyle:on jobconfig
- method.invoke(context).asInstanceOf[Configuration]
- }
-
- /**
- * Using reflection to call `getTaskAttemptID` from TaskAttemptContext. If we directly
- * call `TaskAttemptContext.getTaskAttemptID`, it will generate different byte codes
- * for Hadoop 1.+ and Hadoop 2.+ because TaskAttemptContext is class in Hadoop 1.+
- * while it's interface in Hadoop 2.+.
- */
- def getTaskAttemptIDFromTaskAttemptContext(
- context: MapReduceTaskAttemptContext): MapReduceTaskAttemptID = {
- // scalastyle:off jobconfig
- val method = context.getClass.getMethod("getTaskAttemptID")
- // scalastyle:on jobconfig
- method.invoke(context).asInstanceOf[MapReduceTaskAttemptID]
- }
-
- /**
* Get [[FileStatus]] objects for all leaf children (files) under the given base path. If the
* given path points to a file, return a single-element collection containing [[FileStatus]] of
* that file.
@@ -233,11 +200,11 @@ class SparkHadoopUtil extends Logging {
*/
def listLeafStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
- val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDir)
+ val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
leaves ++ directories.flatMap(f => listLeafStatuses(fs, f))
}
- if (baseStatus.isDir) recurse(baseStatus) else Seq(baseStatus)
+ if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
}
def listLeafDirStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
@@ -246,12 +213,12 @@ class SparkHadoopUtil extends Logging {
def listLeafDirStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
def recurse(status: FileStatus): Seq[FileStatus] = {
- val (directories, files) = fs.listStatus(status.getPath).partition(_.isDir)
+ val (directories, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
val leaves = if (directories.isEmpty) Seq(status) else Seq.empty[FileStatus]
leaves ++ directories.flatMap(dir => listLeafDirStatuses(fs, dir))
}
- assert(baseStatus.isDir)
+ assert(baseStatus.isDirectory)
recurse(baseStatus)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 6e91d73b6e..c93bc8c127 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -28,6 +28,7 @@ import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
+import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -167,7 +168,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
throw new IllegalArgumentException(msg)
}
- if (!fs.getFileStatus(path).isDir) {
+ if (!fs.getFileStatus(path).isDirectory) {
throw new IllegalArgumentException(
"Logging directory specified is not a directory: %s".format(logDir))
}
@@ -304,7 +305,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime
} finally {
- if (!fs.delete(path)) {
+ if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
@@ -603,7 +604,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* As of Spark 1.3, these files are consolidated into a single one that replaces the directory.
* See SPARK-2261 for more detail.
*/
- private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
+ private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDirectory
/**
* Returns the modification time of the given event log. If the status points at an empty
@@ -648,8 +649,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
/**
- * Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
- * so we have to resort to ugly reflection (as usual...).
+ * Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
@@ -663,11 +663,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
- val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
- val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
- val action = actionClass.getField("SAFEMODE_GET").get(null)
- val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
- method.invoke(dfs, action).asInstanceOf[Boolean]
+ dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET)
}
}
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
index 532850dd57..30431a9b98 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala
@@ -23,7 +23,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
/**
* Custom Input Format for reading and splitting flat binary files that contain records,
@@ -36,7 +35,7 @@ private[spark] object FixedLengthBinaryInputFormat {
/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
- SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
+ context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
}
}
diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
index 67a96925da..25596a15d9 100644
--- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
-import org.apache.spark.deploy.SparkHadoopUtil
/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
@@ -83,16 +82,16 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
- val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val conf = context.getConfiguration
// check compression
- val codec = new CompressionCodecFactory(job).getCodec(file)
+ val codec = new CompressionCodecFactory(conf).getCodec(file)
if (codec != null) {
throw new IOException("FixedLengthRecordReader does not support reading compressed files")
}
// get the record length
recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
// get the filesystem
- val fs = file.getFileSystem(job)
+ val fs = file.getFileSystem(conf)
// open the File
fileInputStream = fs.open(file)
// seek to the splitStart position
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 280e7a5fe8..cb76e3c344 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -27,8 +27,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}
-import org.apache.spark.deploy.SparkHadoopUtil
-
/**
* A general format for reading whole files in as streams, byte arrays,
* or other functions to be added
@@ -44,7 +42,7 @@ private[spark] abstract class StreamFileInputFormat[T]
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
- val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+ val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong
super.setMaxSplitSize(maxSplitSize)
}
@@ -135,8 +133,7 @@ class PortableDataStream(
private val confBytes = {
val baos = new ByteArrayOutputStream()
- SparkHadoopUtil.get.getConfigurationFromJobContext(context).
- write(new DataOutputStream(baos))
+ context.getConfiguration.write(new DataOutputStream(baos))
baos.toByteArray
}
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 413408723b..fa34f1e886 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -53,7 +53,7 @@ private[spark] class WholeTextFileInputFormat
*/
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
- val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum
+ val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
super.setMaxSplitSize(maxSplitSize)
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index b56b2aa88a..998c898a3f 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -26,8 +26,6 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.spark.deploy.SparkHadoopUtil
-
/**
* A trait to implement [[org.apache.hadoop.conf.Configurable Configurable]] interface.
@@ -52,8 +50,7 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[Text, Text] with Configurable {
private[this] val path = split.getPath(index)
- private[this] val fs = path.getFileSystem(
- SparkHadoopUtil.get.getConfigurationFromJobContext(context))
+ private[this] val fs = path.getFileSystem(context.getConfiguration)
// True means the current file has been processed, then skip it.
private[this] var processed = false
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f7298e8d5c..249bdf5994 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -18,61 +18,12 @@
package org.apache.spark.mapred
import java.io.IOException
-import java.lang.reflect.Modifier
-import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.{Logging, SparkEnv, TaskContext}
-import org.apache.spark.util.{Utils => SparkUtils}
-
-private[spark]
-trait SparkHadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
- val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
- "org.apache.hadoop.mapred.JobContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf],
- classOf[org.apache.hadoop.mapreduce.JobID])
- // In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
- // Make it accessible if it's not in order to access it.
- if (!Modifier.isPublic(ctor.getModifiers)) {
- ctor.setAccessible(true)
- }
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
- val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
- "org.apache.hadoop.mapred.TaskAttemptContext")
- val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
- // See above
- if (!Modifier.isPublic(ctor.getModifiers)) {
- ctor.setAccessible(true)
- }
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int): TaskAttemptID = {
- new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
- }
-
- private def firstAvailableClass(first: String, second: String): Class[_] = {
- try {
- SparkUtils.classForName(first)
- } catch {
- case e: ClassNotFoundException =>
- SparkUtils.classForName(second)
- }
- }
-}
object SparkHadoopMapRedUtil extends Logging {
/**
@@ -93,7 +44,7 @@ object SparkHadoopMapRedUtil extends Logging {
jobId: Int,
splitId: Int): Unit = {
- val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
+ val mrTaskAttemptID = mrTaskContext.getTaskAttemptID
// Called after we have decided to commit
def performCommit(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
deleted file mode 100644
index 82d807fad8..0000000000
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.mapreduce
-
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
-import org.apache.spark.util.Utils
-
-private[spark]
-trait SparkHadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.JobContextImpl")
- val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
- ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
- }
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl")
- val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
- ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
- }
-
- def newTaskAttemptID(
- jtIdentifier: String,
- jobId: Int,
- isMap: Boolean,
- taskId: Int,
- attemptId: Int): TaskAttemptID = {
- val klass = Utils.classForName("org.apache.hadoop.mapreduce.TaskAttemptID")
- try {
- // First, attempt to use the old-style constructor that takes a boolean isMap
- // (not available in YARN)
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- } catch {
- case exc: NoSuchMethodException => {
- // If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
- val taskTypeClass = Utils.classForName("org.apache.hadoop.mapreduce.TaskType")
- .asInstanceOf[Class[Enum[_]]]
- val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
- taskTypeClass, if (isMap) "MAP" else "REDUCE")
- val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
- classOf[Int], classOf[Int])
- ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
- new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
- }
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index aedced7408..2bf2337d49 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{ Configurable, Configuration }
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.JobContextImpl
+
import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }
@@ -40,7 +42,7 @@ private[spark] class BinaryFileRDD[T](
configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f37c95bedc..920d3bf219 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.JobID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.lib.CombineFileSplit
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -357,7 +358,7 @@ private[spark] object HadoopRDD extends Logging {
def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
conf: JobConf) {
val jobID = new JobID(jobTrackerId, jobId)
- val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+ val taId = new TaskAttemptID(new TaskID(jobID, TaskType.MAP, splitId), attemptId)
conf.set("mapred.tip.id", taId.getTaskID.toString)
conf.set("mapred.task.id", taId.toString)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 86f38ae836..8b330a34c3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -26,11 +26,11 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -66,9 +66,7 @@ class NewHadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
@transient private val _conf: Configuration)
- extends RDD[(K, V)](sc, Nil)
- with SparkHadoopMapReduceUtil
- with Logging {
+ extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
@@ -109,7 +107,7 @@ class NewHadoopRDD[K, V](
configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(_conf, jobId)
+ val jobContext = new JobContextImpl(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -144,8 +142,8 @@ class NewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
- val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
- val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
+ val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 44d195587a..b872301425 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -33,15 +33,14 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat,
- RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskType, TaskAttemptID}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -53,10 +52,7 @@ import org.apache.spark.util.random.StratifiedSamplingUtils
*/
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
- extends Logging
- with SparkHadoopMapReduceUtil
- with Serializable
-{
+ extends Logging with Serializable {
/**
* :: Experimental ::
@@ -985,11 +981,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val job = new NewAPIHadoopJob(hadoopConf)
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
- val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfiguration = job.getConfiguration
jobConfiguration.set("mapred.output.dir", path)
saveAsNewAPIHadoopDataset(jobConfiguration)
}
@@ -1074,11 +1070,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val job = new NewAPIHadoopJob(hadoopConf)
+ val job = NewAPIHadoopJob.getInstance(hadoopConf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val jobConfiguration = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfiguration = job.getConfiguration
val wrappedConf = new SerializableConfiguration(jobConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
@@ -1091,9 +1087,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
/* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+ val attemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.REDUCE, context.partitionId,
context.attemptNumber)
- val hadoopContext = newTaskAttemptContext(config, attemptId)
+ val hadoopContext = new TaskAttemptContextImpl(config, attemptId)
val format = outfmt.newInstance
format match {
case c: Configurable => c.setConf(config)
@@ -1125,8 +1121,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
1
} : Int
- val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+ val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, TaskType.MAP, 0, 0)
+ val jobTaskContext = new TaskAttemptContextImpl(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
// When speculation is on and output committer class name contains "Direct", we should warn
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fa71b8c262..a9b3d52bbe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -174,7 +174,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
- fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
+ fs.create(tempOutputPath, false, bufferSize,
+ fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
index e3f14fe7ef..8e1baae796 100644
--- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.{Text, Writable}
import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.task.JobContextImpl
import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.input.WholeTextFileInputFormat
@@ -44,7 +45,7 @@ private[spark] class WholeTextFileRDD(
configurable.setConf(conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = new JobContextImpl(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index eaa07acc51..68792c58c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -77,14 +77,6 @@ private[spark] class EventLoggingListener(
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
- // The Hadoop APIs have changed over time, so we use reflection to figure out
- // the correct method to use to flush a hadoop data stream. See SPARK-1518
- // for details.
- private val hadoopFlushMethod = {
- val cls = classOf[FSDataOutputStream]
- scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
- }
-
private var writer: Option[PrintWriter] = None
// For testing. Keep track of all JSON serialized events that have been logged.
@@ -97,7 +89,7 @@ private[spark] class EventLoggingListener(
* Creates the log file in the configured log directory.
*/
def start() {
- if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDir) {
+ if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
}
@@ -147,7 +139,7 @@ private[spark] class EventLoggingListener(
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
- hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
+ hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 0e438ab436..8235b10245 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
org.apache.hadoop.mapreduce.InputFormat[_, _]]
- val job = new Job(conf)
+ val job = Job.getInstance(conf)
val retval = new ArrayBuffer[SplitInfo]()
val list = instance.getSplits(job)
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 0065b1fc66..acc24ca0fb 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
import java.io.File
import java.util.PriorityQueue
-import scala.util.{Failure, Success, Try}
+import scala.util.Try
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
@@ -177,21 +177,8 @@ private [util] class SparkShutdownHookManager {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
- Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
- case Success(shmClass) =>
- val fsPriority = classOf[FileSystem]
- .getField("SHUTDOWN_HOOK_PRIORITY")
- .get(null) // static field, the value is not used
- .asInstanceOf[Int]
- val shm = shmClass.getMethod("get").invoke(null)
- shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
- .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
- case Failure(_) =>
- // scalastyle:off runtimeaddshutdownhook
- Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
- // scalastyle:on runtimeaddshutdownhook
- }
+ org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
+ hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
}
def runAll(): Unit = {
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 11f1248c24..d91948e446 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1246,7 +1246,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
- IntWritable.class, Text.class, new Job().getConfiguration());
+ IntWritable.class, Text.class, Job.getInstance().getConfiguration());
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() {
@Override
public String call(Tuple2<IntWritable, Text> x) {
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index f6a7f4375f..2e47801aaf 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -19,12 +19,11 @@ package org.apache.spark
import java.io.{File, FileWriter}
-import org.apache.spark.deploy.SparkHadoopUtil
+import scala.io.Source
+
import org.apache.spark.input.PortableDataStream
import org.apache.spark.storage.StorageLevel
-import scala.io.Source
-
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
@@ -506,11 +505,11 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- val job = new Job(sc.hadoopConfiguration)
+ val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
- val jobConfig = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val jobConfig = job.getConfiguration
jobConfig.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(jobConfig)
assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 5cb2d4225d..43da6fc5b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -67,11 +67,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
assert(fileSystem.exists(logPath))
val logStatus = fileSystem.getFileStatus(logPath)
- assert(!logStatus.isDir)
+ assert(!logStatus.isDirectory)
// Verify log is renamed after stop()
eventLogger.stop()
- assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDir)
+ assert(!fileSystem.getFileStatus(new Path(eventLogger.logPath)).isDirectory)
}
test("Basic event logging") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 103fc19369..761e82e6cf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -23,7 +23,6 @@ import java.net.URI
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
@@ -115,7 +114,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter {
val applications = fileSystem.listStatus(logDirPath)
assert(applications != null && applications.size > 0)
val eventLog = applications.sortBy(_.getModificationTime).last
- assert(!eventLog.isDir)
+ assert(!eventLog.isDirectory)
// Replay events
val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)