aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis.yml2
-rw-r--r--appveyor.yml2
-rw-r--r--assembly/README2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala66
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala74
-rw-r--r--dev/appveyor-install-dependencies.ps12
-rwxr-xr-xdev/create-release/release-build.sh5
-rw-r--r--dev/deps/spark-deps-hadoop-2.2166
-rw-r--r--dev/deps/spark-deps-hadoop-2.3174
-rw-r--r--dev/deps/spark-deps-hadoop-2.4174
-rwxr-xr-xdev/make-distribution.sh14
-rwxr-xr-xdev/run-tests.py3
-rwxr-xr-xdev/test-dependencies.sh3
-rw-r--r--docs/building-spark.md61
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--pom.xml39
-rw-r--r--project/SparkBuild.scala38
-rw-r--r--resource-managers/yarn/pom.xml22
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala169
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala10
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala19
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala31
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala53
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala65
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala13
34 files changed, 202 insertions, 1201 deletions
diff --git a/.travis.yml b/.travis.yml
index 8739849a20..d94872db64 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -44,7 +44,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install
+ - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
# 6. Run lint-java.
script:
diff --git a/appveyor.yml b/appveyor.yml
index 5e756835bc..6bc66c0ea5 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -43,7 +43,7 @@ install:
- cmd: R -e "packageVersion('survival')"
build_script:
- - cmd: mvn -DskipTests -Phadoop-2.6 -Psparkr -Phive -Phive-thriftserver package
+ - cmd: mvn -DskipTests -Psparkr -Phive -Phive-thriftserver package
test_script:
- cmd: .\bin\spark-submit2.cmd --conf spark.hadoop.fs.default.name="file:///" R\pkg\tests\run-all.R
diff --git a/assembly/README b/assembly/README
index 14a5ff8dfc..d5dafab477 100644
--- a/assembly/README
+++ b/assembly/README
@@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command
If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
- -Dhadoop.version=2.0.6-alpha
+ -Dhadoop.version=2.7.3
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 23156072c3..941e2d13fb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy
import java.io.IOException
-import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
@@ -29,7 +28,6 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
-import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -140,54 +138,29 @@ class SparkHadoopUtil extends Logging {
/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
- * return the bytes read on r since t. Reflection is required because thread-level FileSystem
- * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
- * Returns None if the required method can't be found.
+ * return the bytes read on r since t.
+ *
+ * @return None if the required method can't be found.
*/
- private[spark] def getFSBytesReadOnThreadCallback(): Option[() => Long] = {
- try {
- val threadStats = getFileSystemThreadStatistics()
- val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
- val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
- val baselineBytesRead = f()
- Some(() => f() - baselineBytesRead)
- } catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
- logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
- None
- }
+ private[spark] def getFSBytesReadOnThreadCallback(): () => Long = {
+ val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+ val f = () => threadStats.map(_.getBytesRead).sum
+ val baselineBytesRead = f()
+ () => f() - baselineBytesRead
}
/**
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
- * return the bytes written on r since t. Reflection is required because thread-level FileSystem
- * statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
- * Returns None if the required method can't be found.
+ * return the bytes written on r since t.
+ *
+ * @return None if the required method can't be found.
*/
- private[spark] def getFSBytesWrittenOnThreadCallback(): Option[() => Long] = {
- try {
- val threadStats = getFileSystemThreadStatistics()
- val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
- val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
- val baselineBytesWritten = f()
- Some(() => f() - baselineBytesWritten)
- } catch {
- case e @ (_: NoSuchMethodException | _: ClassNotFoundException) =>
- logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
- None
- }
- }
-
- private def getFileSystemThreadStatistics(): Seq[AnyRef] = {
- FileSystem.getAllStatistics.asScala.map(
- Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
- }
-
- private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
- val statisticsDataClass =
- Utils.classForName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
- statisticsDataClass.getDeclaredMethod(methodName)
+ private[spark] def getFSBytesWrittenOnThreadCallback(): () => Long = {
+ val threadStats = FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics)
+ val f = () => threadStats.map(_.getBytesWritten).sum
+ val baselineBytesWritten = f()
+ () => f() - baselineBytesWritten
}
/**
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 63918ef12a..1e0a1e605c 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -125,8 +125,7 @@ object SparkHadoopMapReduceWriter extends Logging {
val taskContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
committer.setupTask(taskContext)
- val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+ val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
// Initiate the writer.
val taskFormat = outputFormat.newInstance()
@@ -149,8 +148,7 @@ object SparkHadoopMapReduceWriter extends Logging {
writer.write(pair._1, pair._2)
// Update bytes written metric every few records
- SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback, recordsWritten)
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
if (writer != null) {
@@ -171,11 +169,8 @@ object SparkHadoopMapReduceWriter extends Logging {
}
})
- outputMetricsAndBytesWrittenCallback.foreach {
- case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
ret
} catch {
@@ -222,24 +217,18 @@ object SparkHadoopWriterUtils {
// TODO: these don't seem like the right abstractions.
// We should abstract the duplicate code in a less awkward way.
- // return type: (output metrics, bytes written callback), defined only if the latter is defined
- def initHadoopOutputMetrics(
- context: TaskContext): Option[(OutputMetrics, () => Long)] = {
+ def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
- bytesWrittenCallback.map { b =>
- (context.taskMetrics().outputMetrics, b)
- }
+ (context.taskMetrics().outputMetrics, bytesWrittenCallback)
}
def maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)],
+ outputMetrics: OutputMetrics,
+ callback: () => Long,
recordsWritten: Long): Unit = {
if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
- outputMetricsAndBytesWrittenCallback.foreach {
- case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a83e139c13..5fa6a7ed31 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -25,15 +25,7 @@ import scala.collection.immutable.Map
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapred.FileSplit
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.InputSplit
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.JobID
-import org.apache.hadoop.mapred.RecordReader
-import org.apache.hadoop.mapred.Reporter
-import org.apache.hadoop.mapred.TaskAttemptID
-import org.apache.hadoop.mapred.TaskID
+import org.apache.hadoop.mapred._
import org.apache.hadoop.mapred.lib.CombineFileSplit
import org.apache.hadoop.mapreduce.TaskType
import org.apache.hadoop.util.ReflectionUtils
@@ -47,7 +39,7 @@ import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils}
+import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -229,11 +221,11 @@ class HadoopRDD[K, V](
// creating RecordReader, because RecordReader's constructor might read some bytes
private val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
@@ -280,13 +272,9 @@ class HadoopRDD[K, V](
(key, value)
}
- override def close() {
+ override def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
- // corruption issues when reading compressed input.
try {
reader.close()
} catch {
@@ -326,18 +314,10 @@ class HadoopRDD[K, V](
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
- val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
- val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
- HadoopRDD.convertSplitLocationInfo(infos)
- } catch {
- case e: Exception =>
- logDebug("Failed to use InputSplitWithLocations.", e)
- None
- }
- case None => None
+ val locs = hsplit match {
+ case lsplit: InputSplitWithLocationInfo =>
+ HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
+ case _ => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
@@ -413,32 +393,12 @@ private[spark] object HadoopRDD extends Logging {
}
}
- private[spark] class SplitInfoReflections {
- val inputSplitWithLocationInfo =
- Utils.classForName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
- val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
- val newInputSplit = Utils.classForName("org.apache.hadoop.mapreduce.InputSplit")
- val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
- val splitLocationInfo = Utils.classForName("org.apache.hadoop.mapred.SplitLocationInfo")
- val isInMemory = splitLocationInfo.getMethod("isInMemory")
- val getLocation = splitLocationInfo.getMethod("getLocation")
- }
-
- private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
- Some(new SplitInfoReflections)
- } catch {
- case e: Exception =>
- logDebug("SplitLocationInfo and other new Hadoop classes are " +
- "unavailable. Using the older Hadoop location info code.", e)
- None
- }
-
- private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
+ private[spark] def convertSplitLocationInfo(
+ infos: Array[SplitLocationInfo]): Option[Seq[String]] = {
Option(infos).map(_.flatMap { loc =>
- val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
- val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
+ val locationStr = loc.getLocation
if (locationStr != "localhost") {
- if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
+ if (loc.isInMemory) {
logDebug(s"Partition $locationStr is cached by Hadoop.")
Some(HDFSCacheTaskLocation(locationStr).toString)
} else {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 733e85f305..ce3a9a2a1e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -152,11 +152,11 @@ class NewHadoopRDD[K, V](
private val getBytesReadCallback: Option[() => Long] =
split.serializableHadoopSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
case _ => None
}
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
@@ -231,13 +231,9 @@ class NewHadoopRDD[K, V](
(reader.getCurrentKey, reader.getCurrentValue)
}
- private def close() {
+ private def close(): Unit = {
if (reader != null) {
InputFileBlockHolder.unset()
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
- // corruption issues when reading compressed input.
try {
reader.close()
} catch {
@@ -277,18 +273,7 @@ class NewHadoopRDD[K, V](
override def getPreferredLocations(hsplit: Partition): Seq[String] = {
val split = hsplit.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value
- val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- HadoopRDD.convertSplitLocationInfo(infos)
- } catch {
- case e : Exception =>
- logDebug("Failed to use InputSplit#getLocationInfo.", e)
- None
- }
- case None => None
- }
+ val locs = HadoopRDD.convertSplitLocationInfo(split.getLocationInfo)
locs.getOrElse(split.getLocations.filter(_ != "localhost"))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 41093bdb85..567a3183e2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -37,8 +37,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.OutputMetrics
-import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol, SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
@@ -1126,8 +1125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
- val outputMetricsAndBytesWrittenCallback: Option[(OutputMetrics, () => Long)] =
- SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
+ val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
@@ -1139,16 +1137,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
// Update bytes written metric every few records
- SparkHadoopWriterUtils.maybeUpdateOutputMetrics(
- outputMetricsAndBytesWrittenCallback, recordsWritten)
+ SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
recordsWritten += 1
}
}(finallyBlock = writer.close())
writer.commit()
- outputMetricsAndBytesWrittenCallback.foreach { case (om, callback) =>
- om.setBytesWritten(callback())
- om.setRecordsWritten(recordsWritten)
- }
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
}
self.context.runJob(self, writeToFile)
diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
index a73b300ec2..becf3829e7 100644
--- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutput
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.util.Utils
@@ -186,10 +185,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
sc.listenerBus.waitUntilEmpty(500)
assert(inputRead == numRecords)
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- assert(outputWritten == numBuckets)
- }
+ assert(outputWritten == numBuckets)
assert(shuffleRead == shuffleWritten)
}
@@ -262,57 +258,49 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
test("output metrics on records written") {
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val file = new File(tmpDir, getClass.getSimpleName)
- val filePath = "file://" + file.getAbsolutePath
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
- val records = runAndReturnRecordsWritten {
- sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
- }
- assert(records == numRecords)
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
}
+ assert(records == numRecords)
}
test("output metrics on records written - new Hadoop API") {
- // Only supported on newer Hadoop
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val file = new File(tmpDir, getClass.getSimpleName)
- val filePath = "file://" + file.getAbsolutePath
-
- val records = runAndReturnRecordsWritten {
- sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
- .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
- }
- assert(records == numRecords)
+ val file = new File(tmpDir, getClass.getSimpleName)
+ val filePath = "file://" + file.getAbsolutePath
+
+ val records = runAndReturnRecordsWritten {
+ sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
+ .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath)
}
+ assert(records == numRecords)
}
test("output metrics when writing text file") {
val fs = FileSystem.getLocal(new Configuration())
val outPath = new Path(fs.getWorkingDirectory, "outdir")
- if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) {
- val taskBytesWritten = new ArrayBuffer[Long]()
- sc.addSparkListener(new SparkListener() {
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
- }
- })
-
- val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
-
- try {
- rdd.saveAsTextFile(outPath.toString)
- sc.listenerBus.waitUntilEmpty(500)
- assert(taskBytesWritten.length == 2)
- val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
- taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
- assert(bytes >= fileStatus.getLen)
- }
- } finally {
- fs.delete(outPath, true)
+ val taskBytesWritten = new ArrayBuffer[Long]()
+ sc.addSparkListener(new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ taskBytesWritten += taskEnd.taskMetrics.outputMetrics.bytesWritten
+ }
+ })
+
+ val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
+
+ try {
+ rdd.saveAsTextFile(outPath.toString)
+ sc.listenerBus.waitUntilEmpty(500)
+ assert(taskBytesWritten.length == 2)
+ val outFiles = fs.listStatus(outPath).filter(_.getPath.getName != "_SUCCESS")
+ taskBytesWritten.zip(outFiles).foreach { case (bytes, fileStatus) =>
+ assert(bytes >= fileStatus.getLen)
}
+ } finally {
+ fs.delete(outPath, true)
}
}
diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1
index b72d6b53bc..5a729674c3 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -95,7 +95,7 @@ $env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
Pop-Location
# ========================== Hadoop bin package
-$hadoopVer = "2.6.0"
+$hadoopVer = "2.6.5"
$hadoopPath = "$tools\hadoop"
if (!(Test-Path $hadoopPath)) {
New-Item -ItemType Directory -Force -Path $hadoopPath | Out-Null
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index b08577c47c..d616f80c54 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging uploads
BASE_DIR=$(pwd)
MVN="build/mvn --force"
-PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver"
PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
rm -rf spark
@@ -236,11 +236,8 @@ if [[ "$1" == "package" ]]; then
# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
# share the same Zinc server.
FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
- make_binary_release "hadoop2.3" "-Phadoop-2.3 $FLAGS" "3033" &
- make_binary_release "hadoop2.4" "-Phadoop-2.4 $FLAGS" "3034" &
make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" "withr" &
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
- make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
wait
rm -rf spark-$SPARK_VERSION-bin-*/
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
deleted file mode 100644
index 1254188f71..0000000000
--- a/dev/deps/spark-deps-hadoop-2.2
+++ /dev/null
@@ -1,166 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math-2.1.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.2.0.jar
-hadoop-auth-2.2.0.jar
-hadoop-client-2.2.0.jar
-hadoop-common-2.2.0.jar
-hadoop-hdfs-2.2.0.jar
-hadoop-mapreduce-client-app-2.2.0.jar
-hadoop-mapreduce-client-common-2.2.0.jar
-hadoop-mapreduce-client-core-2.2.0.jar
-hadoop-mapreduce-client-jobclient-2.2.0.jar
-hadoop-mapreduce-client-shuffle-2.2.0.jar
-hadoop-yarn-api-2.2.0.jar
-hadoop-yarn-client-2.2.0.jar
-hadoop-yarn-common-2.2.0.jar
-hadoop-yarn-server-common-2.2.0.jar
-hadoop-yarn-server-web-proxy-2.2.0.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.7.1.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
deleted file mode 100644
index 39ba2ae849..0000000000
--- a/dev/deps/spark-deps-hadoop-2.3
+++ /dev/null
@@ -1,174 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-activation-1.1.1.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-base64-2.3.8.jar
-bcprov-jdk15on-1.51.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.3.0.jar
-hadoop-auth-2.3.0.jar
-hadoop-client-2.3.0.jar
-hadoop-common-2.3.0.jar
-hadoop-hdfs-2.3.0.jar
-hadoop-mapreduce-client-app-2.3.0.jar
-hadoop-mapreduce-client-common-2.3.0.jar
-hadoop-mapreduce-client-core-2.3.0.jar
-hadoop-mapreduce-client-jobclient-2.3.0.jar
-hadoop-mapreduce-client-shuffle-2.3.0.jar
-hadoop-yarn-api-2.3.0.jar
-hadoop-yarn-client-2.3.0.jar
-hadoop-yarn-common-2.3.0.jar
-hadoop-yarn-server-common-2.3.0.jar
-hadoop-yarn-server-web-proxy-2.3.0.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-java-xmlbuilder-1.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jaxb-api-2.2.2.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.9.3.jar
-jetty-6.1.26.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-mx4j-3.0.2.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0-2.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
deleted file mode 100644
index d151d12796..0000000000
--- a/dev/deps/spark-deps-hadoop-2.4
+++ /dev/null
@@ -1,174 +0,0 @@
-JavaEWAH-0.3.2.jar
-RoaringBitmap-0.5.11.jar
-ST4-4.0.4.jar
-activation-1.1.1.jar
-antlr-2.7.7.jar
-antlr-runtime-3.4.jar
-antlr4-runtime-4.5.3.jar
-aopalliance-1.0.jar
-aopalliance-repackaged-2.4.0-b34.jar
-apache-log4j-extras-1.2.17.jar
-arpack_combined_all-0.1.jar
-avro-1.7.7.jar
-avro-ipc-1.7.7.jar
-avro-mapred-1.7.7-hadoop2.jar
-base64-2.3.8.jar
-bcprov-jdk15on-1.51.jar
-bonecp-0.8.0.RELEASE.jar
-breeze-macros_2.11-0.12.jar
-breeze_2.11-0.12.jar
-calcite-avatica-1.2.0-incubating.jar
-calcite-core-1.2.0-incubating.jar
-calcite-linq4j-1.2.0-incubating.jar
-chill-java-0.8.0.jar
-chill_2.11-0.8.0.jar
-commons-beanutils-1.7.0.jar
-commons-beanutils-core-1.8.0.jar
-commons-cli-1.2.jar
-commons-codec-1.10.jar
-commons-collections-3.2.2.jar
-commons-compiler-3.0.0.jar
-commons-compress-1.4.1.jar
-commons-configuration-1.6.jar
-commons-crypto-1.0.0.jar
-commons-dbcp-1.4.jar
-commons-digester-1.8.jar
-commons-httpclient-3.1.jar
-commons-io-2.4.jar
-commons-lang-2.6.jar
-commons-lang3-3.5.jar
-commons-logging-1.1.3.jar
-commons-math3-3.4.1.jar
-commons-net-2.2.jar
-commons-pool-1.5.4.jar
-compress-lzf-1.0.3.jar
-core-1.1.2.jar
-curator-client-2.4.0.jar
-curator-framework-2.4.0.jar
-curator-recipes-2.4.0.jar
-datanucleus-api-jdo-3.2.6.jar
-datanucleus-core-3.2.10.jar
-datanucleus-rdbms-3.2.9.jar
-derby-10.12.1.1.jar
-eigenbase-properties-1.1.5.jar
-guava-14.0.1.jar
-guice-3.0.jar
-guice-servlet-3.0.jar
-hadoop-annotations-2.4.1.jar
-hadoop-auth-2.4.1.jar
-hadoop-client-2.4.1.jar
-hadoop-common-2.4.1.jar
-hadoop-hdfs-2.4.1.jar
-hadoop-mapreduce-client-app-2.4.1.jar
-hadoop-mapreduce-client-common-2.4.1.jar
-hadoop-mapreduce-client-core-2.4.1.jar
-hadoop-mapreduce-client-jobclient-2.4.1.jar
-hadoop-mapreduce-client-shuffle-2.4.1.jar
-hadoop-yarn-api-2.4.1.jar
-hadoop-yarn-client-2.4.1.jar
-hadoop-yarn-common-2.4.1.jar
-hadoop-yarn-server-common-2.4.1.jar
-hadoop-yarn-server-web-proxy-2.4.1.jar
-hk2-api-2.4.0-b34.jar
-hk2-locator-2.4.0-b34.jar
-hk2-utils-2.4.0-b34.jar
-httpclient-4.5.2.jar
-httpcore-4.4.4.jar
-ivy-2.4.0.jar
-jackson-annotations-2.6.5.jar
-jackson-core-2.6.5.jar
-jackson-core-asl-1.9.13.jar
-jackson-databind-2.6.5.jar
-jackson-mapper-asl-1.9.13.jar
-jackson-module-paranamer-2.6.5.jar
-jackson-module-scala_2.11-2.6.5.jar
-janino-3.0.0.jar
-java-xmlbuilder-1.0.jar
-javassist-3.18.1-GA.jar
-javax.annotation-api-1.2.jar
-javax.inject-1.jar
-javax.inject-2.4.0-b34.jar
-javax.servlet-api-3.1.0.jar
-javax.ws.rs-api-2.0.1.jar
-javolution-5.5.1.jar
-jaxb-api-2.2.2.jar
-jcl-over-slf4j-1.7.16.jar
-jdo-api-3.0.1.jar
-jersey-client-2.22.2.jar
-jersey-common-2.22.2.jar
-jersey-container-servlet-2.22.2.jar
-jersey-container-servlet-core-2.22.2.jar
-jersey-guava-2.22.2.jar
-jersey-media-jaxb-2.22.2.jar
-jersey-server-2.22.2.jar
-jets3t-0.9.3.jar
-jetty-6.1.26.jar
-jetty-util-6.1.26.jar
-jline-2.12.1.jar
-joda-time-2.9.3.jar
-jodd-core-3.5.2.jar
-jpam-1.1.jar
-json4s-ast_2.11-3.2.11.jar
-json4s-core_2.11-3.2.11.jar
-json4s-jackson_2.11-3.2.11.jar
-jsr305-1.3.9.jar
-jta-1.1.jar
-jtransforms-2.4.0.jar
-jul-to-slf4j-1.7.16.jar
-kryo-shaded-3.0.3.jar
-leveldbjni-all-1.8.jar
-libfb303-0.9.3.jar
-libthrift-0.9.3.jar
-log4j-1.2.17.jar
-lz4-1.3.0.jar
-mail-1.4.7.jar
-mesos-1.0.0-shaded-protobuf.jar
-metrics-core-3.1.2.jar
-metrics-graphite-3.1.2.jar
-metrics-json-3.1.2.jar
-metrics-jvm-3.1.2.jar
-minlog-1.3.0.jar
-mx4j-3.0.2.jar
-netty-3.9.9.Final.jar
-netty-all-4.0.43.Final.jar
-objenesis-2.1.jar
-opencsv-2.3.jar
-oro-2.0.8.jar
-osgi-resource-locator-1.0.1.jar
-paranamer-2.6.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
-parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
-pmml-model-1.2.15.jar
-pmml-schema-1.2.15.jar
-protobuf-java-2.5.0.jar
-py4j-0.10.4.jar
-pyrolite-4.13.jar
-scala-compiler-2.11.8.jar
-scala-library-2.11.8.jar
-scala-parser-combinators_2.11-1.0.4.jar
-scala-reflect-2.11.8.jar
-scala-xml_2.11-1.0.2.jar
-scalap-2.11.8.jar
-shapeless_2.11-2.0.0.jar
-slf4j-api-1.7.16.jar
-slf4j-log4j12-1.7.16.jar
-snappy-0.2.jar
-snappy-java-1.1.2.6.jar
-spire-macros_2.11-0.7.4.jar
-spire_2.11-0.7.4.jar
-stax-api-1.0-2.jar
-stax-api-1.0.1.jar
-stream-2.7.0.jar
-stringtemplate-3.2.1.jar
-super-csv-2.2.0.jar
-univocity-parsers-2.2.1.jar
-validation-api-1.1.0.Final.jar
-xbean-asm5-shaded-4.4.jar
-xmlenc-0.52.jar
-xz-1.0.jar
-zookeeper-3.4.5.jar
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index 6fb25f3b98..dc8dfb9344 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -52,20 +52,6 @@ function exit_with_usage {
# Parse arguments
while (( "$#" )); do
case $1 in
- --hadoop)
- echo "Error: '--hadoop' is no longer supported:"
- echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead."
- echo "Error: Related profiles include hadoop-2.2, hadoop-2.3, hadoop-2.4, hadoop-2.6 and hadoop-2.7."
- exit_with_usage
- ;;
- --with-yarn)
- echo "Error: '--with-yarn' is no longer supported, use Maven option -Pyarn"
- exit_with_usage
- ;;
- --with-hive)
- echo "Error: '--with-hive' is no longer supported, use Maven options -Phive and -Phive-thriftserver"
- exit_with_usage
- ;;
--tgz)
MAKE_TGZ=true
;;
diff --git a/dev/run-tests.py b/dev/run-tests.py
index ab285ac96a..ef9ab1aa1a 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -305,9 +305,6 @@ def get_hadoop_profiles(hadoop_version):
"""
sbt_maven_hadoop_profiles = {
- "hadoop2.2": ["-Phadoop-2.2"],
- "hadoop2.3": ["-Phadoop-2.3"],
- "hadoop2.4": ["-Phadoop-2.4"],
"hadoop2.6": ["-Phadoop-2.6"],
"hadoop2.7": ["-Phadoop-2.7"],
}
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 4014f42e19..eb43f229c2 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -32,9 +32,6 @@ export LC_ALL=C
HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
- hadoop-2.2
- hadoop-2.3
- hadoop-2.4
hadoop-2.6
hadoop-2.7
)
diff --git a/docs/building-spark.md b/docs/building-spark.md
index ffe356f918..690c656bad 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -48,7 +48,7 @@ You can fix these problems by setting the `MAVEN_OPTS` variable as discussed bef
Spark now comes packaged with a self-contained Maven installation to ease building and deployment of Spark from source located under the `build/` directory. This script will automatically download and setup all necessary build requirements ([Maven](https://maven.apache.org/), [Scala](http://www.scala-lang.org/), and [Zinc](https://github.com/typesafehub/zinc)) locally within the `build/` directory itself. It honors any `mvn` binary if present already, however, will pull down its own copy of Scala and Zinc regardless to ensure proper version requirements are met. `build/mvn` execution acts as a pass through to the `mvn` call allowing easy transition from previous build methods. As an example, one can build a version of Spark as follows:
- ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
+ ./build/mvn -DskipTests clean package
Other build examples can be found below.
@@ -63,48 +63,21 @@ with Maven profile settings and so on like the direct Maven build. Example:
This will build Spark distribution along with Python pip and R packages. For more information on usage, run `./dev/make-distribution.sh --help`
-## Specifying the Hadoop Version
+## Specifying the Hadoop Version and Enabling YARN
-Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the `hadoop.version` property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions:
+You can specify the exact version of Hadoop to compile against through the `hadoop.version` property.
+If unset, Spark will build against Hadoop 2.6.X by default.
-<table class="table">
- <thead>
- <tr><th>Hadoop version</th><th>Profile required</th></tr>
- </thead>
- <tbody>
- <tr><td>2.2.x</td><td>hadoop-2.2</td></tr>
- <tr><td>2.3.x</td><td>hadoop-2.3</td></tr>
- <tr><td>2.4.x</td><td>hadoop-2.4</td></tr>
- <tr><td>2.6.x</td><td>hadoop-2.6</td></tr>
- <tr><td>2.7.x and later 2.x</td><td>hadoop-2.7</td></tr>
- </tbody>
-</table>
-
-Note that support for versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0 and may be
-removed in Spark 2.2.0.
-
-
-You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different from `hadoop.version`. Spark only supports YARN versions 2.2.0 and later.
+You can enable the `yarn` profile and optionally set the `yarn.version` property if it is different
+from `hadoop.version`.
Examples:
- # Apache Hadoop 2.2.X
- ./build/mvn -Pyarn -Phadoop-2.2 -DskipTests clean package
-
- # Apache Hadoop 2.3.X
- ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package
-
- # Apache Hadoop 2.4.X or 2.5.X
- ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
-
# Apache Hadoop 2.6.X
- ./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
+ ./build/mvn -Pyarn -DskipTests clean package
# Apache Hadoop 2.7.X and later
- ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -DskipTests clean package
-
- # Different versions of HDFS and YARN.
- ./build/mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests clean package
+ ./build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
## Building With Hive and JDBC Support
@@ -112,8 +85,8 @@ To enable Hive integration for Spark SQL along with its JDBC server and CLI,
add the `-Phive` and `Phive-thriftserver` profiles to your existing build options.
By default Spark will build with Hive 1.2.1 bindings.
- # Apache Hadoop 2.4.X with Hive 1.2.1 support
- ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package
+ # With Hive 1.2.1 support
+ ./build/mvn -Pyarn -Phive -Phive-thriftserver -DskipTests clean package
## Packaging without Hadoop Dependencies for YARN
@@ -132,7 +105,7 @@ like ZooKeeper and Hadoop itself.
To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property:
./dev/change-scala-version.sh 2.10
- ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
+ ./build/mvn -Pyarn -Dscala-2.10 -DskipTests clean package
Note that support for Scala 2.10 is deprecated as of Spark 2.1.0 and may be removed in Spark 2.2.0.
@@ -192,7 +165,7 @@ compilation. More advanced developers may wish to use SBT.
The SBT build is derived from the Maven POM files, and so the same Maven profiles and variables
can be set to control the SBT build. For example:
- ./build/sbt -Pyarn -Phadoop-2.3 package
+ ./build/sbt package
To avoid the overhead of launching sbt each time you need to re-compile, you can launch sbt
in interactive mode by running `build/sbt`, and then run all build commands at the command
@@ -225,7 +198,7 @@ Note that tests should not be run as root or an admin user.
The following is an example of a command to run the tests:
- ./build/mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
+ ./build/mvn test
The ScalaTest plugin also supports running only a specific Scala test suite as follows:
@@ -240,16 +213,16 @@ or a Java test:
The following is an example of a command to run the tests:
- ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test
+ ./build/sbt test
To run only a specific test suite as follows:
- ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.ReplSuite"
- ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.*"
+ ./build/sbt "test-only org.apache.spark.repl.ReplSuite"
+ ./build/sbt "test-only org.apache.spark.repl.*"
To run test suites of a specific sub project as follows:
- ./build/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver core/test
+ ./build/sbt core/test
## Running Java 8 Test Suites
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 051f64e1be..c95f627c12 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -337,7 +337,7 @@ To use a custom metrics.properties for the application master and executors, upd
<td>
Defines the validity interval for AM failure tracking.
If the AM has been running for at least the defined interval, the AM failure count will be reset.
- This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
+ This feature is not enabled if not configured.
</td>
</tr>
<tr>
diff --git a/pom.xml b/pom.xml
index 3679b4e263..ac61a57a61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -122,12 +122,12 @@
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.6.5</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<flume.version>1.6.0</flume.version>
- <zookeeper.version>3.4.5</zookeeper.version>
- <curator.version>2.4.0</curator.version>
+ <zookeeper.version>3.4.6</zookeeper.version>
+ <curator.version>2.6.0</curator.version>
<hive.group>org.spark-project.hive</hive.group>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark2</hive.version>
@@ -144,7 +144,7 @@
<codahale.metrics.version>3.1.2</codahale.metrics.version>
<avro.version>1.7.7</avro.version>
<avro.mapred.classifier>hadoop2</avro.mapred.classifier>
- <jets3t.version>0.7.1</jets3t.version>
+ <jets3t.version>0.9.3</jets3t.version>
<aws.kinesis.client.version>1.6.2</aws.kinesis.client.version>
<!-- the producer is used in tests -->
<aws.kinesis.producer.version>0.10.2</aws.kinesis.producer.version>
@@ -2548,43 +2548,14 @@
-->
<profile>
- <id>hadoop-2.2</id>
- <!-- SPARK-7249: Default hadoop profile. Uses global properties. -->
- </profile>
-
- <profile>
- <id>hadoop-2.3</id>
- <properties>
- <hadoop.version>2.3.0</hadoop.version>
- <jets3t.version>0.9.3</jets3t.version>
- </properties>
- </profile>
-
- <profile>
- <id>hadoop-2.4</id>
- <properties>
- <hadoop.version>2.4.1</hadoop.version>
- <jets3t.version>0.9.3</jets3t.version>
- </properties>
- </profile>
-
- <profile>
<id>hadoop-2.6</id>
- <properties>
- <hadoop.version>2.6.5</hadoop.version>
- <jets3t.version>0.9.3</jets3t.version>
- <zookeeper.version>3.4.6</zookeeper.version>
- <curator.version>2.6.0</curator.version>
- </properties>
+ <!-- Default hadoop profile. Uses global properties. -->
</profile>
<profile>
<id>hadoop-2.7</id>
<properties>
<hadoop.version>2.7.3</hadoop.version>
- <jets3t.version>0.9.3</jets3t.version>
- <zookeeper.version>3.4.6</zookeeper.version>
- <curator.version>2.6.0</curator.version>
</properties>
</profile>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 74edd537f5..bcc00fa3e9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -86,43 +86,11 @@ object SparkBuild extends PomBuild {
val projectsMap: Map[String, Seq[Setting[_]]] = Map.empty
- // Provides compatibility for older versions of the Spark build
- def backwardCompatibility = {
- import scala.collection.mutable
- var profiles: mutable.Seq[String] = mutable.Seq("sbt")
- // scalastyle:off println
- if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) {
- println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pspark-ganglia-lgpl flag.")
- profiles ++= Seq("spark-ganglia-lgpl")
- }
- if (Properties.envOrNone("SPARK_HIVE").isDefined) {
- println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.")
- profiles ++= Seq("hive", "hive-thriftserver")
- }
- Properties.envOrNone("SPARK_HADOOP_VERSION") match {
- case Some(v) =>
- println("NOTE: SPARK_HADOOP_VERSION is deprecated, please use -Dhadoop.version=" + v)
- System.setProperty("hadoop.version", v)
- case None =>
- }
- if (Properties.envOrNone("SPARK_YARN").isDefined) {
- println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.")
- profiles ++= Seq("yarn")
- }
- // scalastyle:on println
- profiles
- }
-
override val profiles = {
val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
- case None => backwardCompatibility
- case Some(v) =>
- if (backwardCompatibility.nonEmpty)
- // scalastyle:off println
- println("Note: We ignore environment variables, when use of profile is detected in " +
- "conjunction with environment variable.")
- // scalastyle:on println
- v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
+ case None => Seq("sbt")
+ case Some(v) =>
+ v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
}
if (System.getProperty("scala-2.10") == "") {
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index f090d2427d..92a33de0ae 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -125,34 +125,12 @@
<scope>test</scope>
</dependency>
- <!--
- See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed
- dependencies, so they need to be added manually for the tests to work.
- -->
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- <version>6.1.26</version>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
<!--
Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f79c66b9ff..9df43aea3f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark._
@@ -460,17 +461,15 @@ private[spark] class ApplicationMaster(
}
failureCount = 0
} catch {
- case i: InterruptedException =>
+ case i: InterruptedException => // do nothing
+ case e: ApplicationAttemptNotFoundException =>
+ failureCount += 1
+ logError("Exception from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
+ e.getMessage)
case e: Throwable =>
failureCount += 1
- // this exception was introduced in hadoop 2.4 and this code would not compile
- // with earlier versions if we refer it directly.
- if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" ==
- e.getClass().getName()) {
- logError("Exception from Reporter thread.", e)
- finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
- e.getMessage)
- } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index b212b0eaaf..635c1ac5e3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -26,7 +26,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
-import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import com.google.common.base.Objects
@@ -47,7 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
@@ -216,18 +215,7 @@ private[spark] class Client(
appContext.setApplicationType("SPARK")
sparkConf.get(APPLICATION_TAGS).foreach { tags =>
- try {
- // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
- // reflection to set it, printing a warning if a tag was specified but the YARN version
- // doesn't support it.
- val method = appContext.getClass().getMethod(
- "setApplicationTags", classOf[java.util.Set[String]])
- method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
- "YARN does not support it")
- }
+ appContext.setApplicationTags(new java.util.HashSet[String](tags.asJava))
}
sparkConf.get(MAX_APP_ATTEMPTS) match {
case Some(v) => appContext.setMaxAppAttempts(v)
@@ -236,15 +224,7 @@ private[spark] class Client(
}
sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
- try {
- val method = appContext.getClass().getMethod(
- "setAttemptFailuresValidityInterval", classOf[Long])
- method.invoke(appContext, interval: java.lang.Long)
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
- "the version of YARN does not support it")
- }
+ appContext.setAttemptFailuresValidityInterval(interval)
}
val capability = Records.newRecord(classOf[Resource])
@@ -253,53 +233,24 @@ private[spark] class Client(
sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
case Some(expr) =>
- try {
- val amRequest = Records.newRecord(classOf[ResourceRequest])
- amRequest.setResourceName(ResourceRequest.ANY)
- amRequest.setPriority(Priority.newInstance(0))
- amRequest.setCapability(capability)
- amRequest.setNumContainers(1)
- val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
- method.invoke(amRequest, expr)
-
- val setResourceRequestMethod =
- appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
- setResourceRequestMethod.invoke(appContext, amRequest)
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
- "of YARN does not support it")
- appContext.setResource(capability)
- }
+ val amRequest = Records.newRecord(classOf[ResourceRequest])
+ amRequest.setResourceName(ResourceRequest.ANY)
+ amRequest.setPriority(Priority.newInstance(0))
+ amRequest.setCapability(capability)
+ amRequest.setNumContainers(1)
+ amRequest.setNodeLabelExpression(expr)
+ appContext.setAMContainerResourceRequest(amRequest)
case None =>
appContext.setResource(capability)
}
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
- try {
- val logAggregationContext = Records.newRecord(
- Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
- .asInstanceOf[Object]
-
- val setRolledLogsIncludePatternMethod =
- logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
- setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
-
- sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
- val setRolledLogsExcludePatternMethod =
- logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
- setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
- }
-
- val setLogAggregationContextMethod =
- appContext.getClass.getMethod("setLogAggregationContext",
- Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
- setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
- } catch {
- case NonFatal(e) =>
- logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
- s"does not support it", e)
+ val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
+ logAggregationContext.setRolledLogsIncludePattern(includePattern)
+ sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+ logAggregationContext.setRolledLogsExcludePattern(excludePattern)
}
+ appContext.setLogAggregationContext(logAggregationContext)
}
appContext
@@ -786,14 +737,12 @@ private[spark] class Client(
val pythonPath = new ListBuffer[String]()
val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
if (pyFiles.nonEmpty) {
- pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_PYTHON_DIR)
+ pythonPath += buildPath(Environment.PWD.$$(), LOCALIZED_PYTHON_DIR)
}
(pySparkArchives ++ pyArchives).foreach { path =>
val uri = Utils.resolveURI(path)
if (uri.getScheme != LOCAL_SCHEME) {
- pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- new Path(uri).getName())
+ pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
} else {
pythonPath += uri.getPath()
}
@@ -802,7 +751,7 @@ private[spark] class Client(
// Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
if (pythonPath.nonEmpty) {
val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
- .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+ .mkString(ApplicationConstants.CLASS_PATH_SEPARATOR)
env("PYTHONPATH") = pythonPathStr
sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
}
@@ -882,10 +831,7 @@ private[spark] class Client(
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
- val tmpDir = new Path(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
- )
+ val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
// TODO: Remove once cpuset version is pushed out.
@@ -982,15 +928,12 @@ private[spark] class Client(
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
- Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
- userArgs ++ Seq(
- "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+ Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
+ Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
- val commands = prefixEnv ++ Seq(
- YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
- ) ++
+ val commands = prefixEnv ++
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
@@ -1265,59 +1208,28 @@ private object Client extends Logging {
private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String])
: Unit = {
val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
- for (c <- classPathElementsToAdd.flatten) {
+ classPathElementsToAdd.foreach { c =>
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
}
}
- private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
+ private def getYarnAppClasspath(conf: Configuration): Seq[String] =
Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
- case Some(s) => Some(s.toSeq)
+ case Some(s) => s.toSeq
case None => getDefaultYarnApplicationClasspath
}
- private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
+ private def getMRAppClasspath(conf: Configuration): Seq[String] =
Option(conf.getStrings("mapreduce.application.classpath")) match {
- case Some(s) => Some(s.toSeq)
+ case Some(s) => s.toSeq
case None => getDefaultMRApplicationClasspath
}
- private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
- val triedDefault = Try[Seq[String]] {
- val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
- val value = field.get(null).asInstanceOf[Array[String]]
- value.toSeq
- } recoverWith {
- case e: NoSuchFieldException => Success(Seq.empty[String])
- }
-
- triedDefault match {
- case f: Failure[_] =>
- logError("Unable to obtain the default YARN Application classpath.", f.exception)
- case s: Success[Seq[String]] =>
- logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
- }
-
- triedDefault.toOption
- }
-
- private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
- val triedDefault = Try[Seq[String]] {
- val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
- StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq
- } recoverWith {
- case e: NoSuchFieldException => Success(Seq.empty[String])
- }
+ private[yarn] def getDefaultYarnApplicationClasspath: Seq[String] =
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
- triedDefault match {
- case f: Failure[_] =>
- logError("Unable to obtain the default MR Application classpath.", f.exception)
- case s: Success[Seq[String]] =>
- logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
- }
-
- triedDefault.toOption
- }
+ private[yarn] def getDefaultMRApplicationClasspath: Seq[String] =
+ StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH).toSeq
/**
* Populate the classpath entry in the given environment map.
@@ -1339,11 +1251,9 @@ private object Client extends Logging {
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}
- addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
+ addClasspathEntry(Environment.PWD.$$(), env)
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
- LOCALIZED_CONF_DIR, env)
+ addClasspathEntry(Environment.PWD.$$() + Path.SEPARATOR + LOCALIZED_CONF_DIR, env)
if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
// in order to properly add the app jar when user classpath is first
@@ -1369,9 +1279,8 @@ private object Client extends Logging {
}
// Add the Spark jars to the classpath, depending on how they were distributed.
- addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- LOCALIZED_LIB_DIR, "*"), env)
- if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), env)
+ if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
sparkConf.get(SPARK_JARS).foreach { jars =>
jars.filter(isLocalUri).foreach { jar =>
addClasspathEntry(getClusterPath(sparkConf, jar), env)
@@ -1430,13 +1339,11 @@ private object Client extends Logging {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
- addClasspathEntry(buildPath(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+ addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
} else if (uri != null) {
val localPath = getQualifiedLocalPath(uri, hadoopConf)
val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
- addClasspathEntry(buildPath(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
+ addClasspathEntry(buildPath(Environment.PWD.$$(), linkName), env)
}
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 868c2edc5a..b55b4b147b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -152,10 +152,7 @@ private[yarn] class ExecutorRunnable(
}
javaOpts += "-Djava.io.tmpdir=" +
- new Path(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
- )
+ new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
@@ -206,9 +203,8 @@ private[yarn] class ExecutorRunnable(
}.toSeq
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
- val commands = prefixEnv ++ Seq(
- YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
- "-server") ++
+ val commands = prefixEnv ++
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e498932e51..8a76dbd1bf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -150,20 +150,6 @@ private[yarn] class YarnAllocator(
private val labelExpression = sparkConf.get(EXECUTOR_NODE_LABEL_EXPRESSION)
- // ContainerRequest constructor that can take a node label expression. We grab it through
- // reflection because it's only available in later versions of YARN.
- private val nodeLabelConstructor = labelExpression.flatMap { expr =>
- try {
- Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
- classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
- classOf[String]))
- } catch {
- case e: NoSuchMethodException =>
- logWarning(s"Node label expression $expr will be ignored because YARN version on" +
- " classpath does not support it.")
- None
- }
- }
// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
@@ -414,10 +400,7 @@ private[yarn] class YarnAllocator(
resource: Resource,
nodes: Array[String],
racks: Array[String]): ContainerRequest = {
- nodeLabelConstructor.map { constructor =>
- constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
- labelExpression.orNull)
- }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
+ new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY, true, labelExpression.orNull)
}
/**
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 53df11eb66..163dfb5a60 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -17,12 +17,8 @@
package org.apache.spark.deploy.yarn
-import java.util.{List => JList}
-
import scala.collection.JavaConverters._
-import scala.util.Try
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
@@ -99,24 +95,11 @@ private[spark] class YarnRMClient extends Logging {
def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
// Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
// so not all stable releases have it.
- val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
- .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
-
- // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
- try {
- val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
- classOf[Configuration])
- val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
- val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
- val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
- Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
- } catch {
- case e: NoSuchMethodException =>
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val uriBase = prefix + proxy + proxyBase
- Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
- }
+ val prefix = WebAppUtils.getHttpSchemePrefix(conf)
+ val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
+ val hosts = proxies.asScala.map(_.split(":").head)
+ val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
}
/** Returns the maximum number of attempts to register the AM. */
@@ -124,12 +107,10 @@ private[spark] class YarnRMClient extends Logging {
val sparkMaxAttempts = sparkConf.get(MAX_APP_ATTEMPTS).map(_.toInt)
val yarnMaxAttempts = yarnConf.getInt(
YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
- val retval: Int = sparkMaxAttempts match {
+ sparkMaxAttempts match {
case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
case None => yarnMaxAttempts
}
-
- retval
}
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index cc53b1b06e..9357885512 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,13 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.{HashMap, ListBuffer}
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
@@ -31,7 +29,6 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
@@ -137,7 +134,12 @@ object YarnSparkHadoopUtil {
* If the map already contains this key, append the value to the existing value instead.
*/
def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
- val newValue = if (env.contains(key)) { env(key) + getClassPathSeparator + value } else value
+ val newValue =
+ if (env.contains(key)) {
+ env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
+ } else {
+ value
+ }
env.put(key, newValue)
}
@@ -156,8 +158,8 @@ object YarnSparkHadoopUtil {
while (m.find()) {
val variable = m.group(1)
var replace = ""
- if (env.get(variable) != None) {
- replace = env.get(variable).get
+ if (env.contains(variable)) {
+ replace = env(variable)
} else {
// if this key is not configured for the child .. get it from the env
replace = System.getenv(variable)
@@ -235,13 +237,11 @@ object YarnSparkHadoopUtil {
YarnCommandBuilderUtils.quoteForBatchScript(arg)
} else {
val escaped = new StringBuilder("'")
- for (i <- 0 to arg.length() - 1) {
- arg.charAt(i) match {
- case '$' => escaped.append("\\$")
- case '"' => escaped.append("\\\"")
- case '\'' => escaped.append("'\\''")
- case c => escaped.append(c)
- }
+ arg.foreach {
+ case '$' => escaped.append("\\$")
+ case '"' => escaped.append("\\\"")
+ case '\'' => escaped.append("'\\''")
+ case c => escaped.append(c)
}
escaped.append("'").toString()
}
@@ -263,33 +263,6 @@ object YarnSparkHadoopUtil {
}
/**
- * Expand environment variable using Yarn API.
- * If environment.$$() is implemented, return the result of it.
- * Otherwise, return the result of environment.$()
- * Note: $$() is added in Hadoop 2.4.
- */
- private lazy val expandMethod =
- Try(classOf[Environment].getMethod("$$"))
- .getOrElse(classOf[Environment].getMethod("$"))
-
- def expandEnvironment(environment: Environment): String =
- expandMethod.invoke(environment).asInstanceOf[String]
-
- /**
- * Get class path separator using Yarn API.
- * If ApplicationConstants.CLASS_PATH_SEPARATOR is implemented, return it.
- * Otherwise, return File.pathSeparator
- * Note: CLASS_PATH_SEPARATOR is added in Hadoop 2.4.
- */
- private lazy val classPathSeparatorField =
- Try(classOf[ApplicationConstants].getField("CLASS_PATH_SEPARATOR"))
- .getOrElse(classOf[File].getField("pathSeparator"))
-
- def getClassPathSeparator(): String = {
- classPathSeparatorField.get(null).asInstanceOf[String]
- }
-
- /**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors requested by the user.
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 7deaf0af94..dd2180a0f5 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -23,8 +23,6 @@ import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.reflect.ClassTag
-import scala.util.Try
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
@@ -67,19 +65,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
}
test("default Yarn application classpath") {
- getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP))
+ getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP)
}
test("default MR application classpath") {
- getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP))
+ getDefaultMRApplicationClasspath should be(Fixtures.knownDefMRAppCP)
}
test("resultant classpath for an application that defines a classpath for YARN") {
withAppConf(Fixtures.mapYARNAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(
- flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath))
+ classpath(env) should be(Fixtures.knownYARNAppCP +: getDefaultMRApplicationClasspath)
}
}
@@ -87,8 +84,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
withAppConf(Fixtures.mapMRAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(
- flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP))
+ classpath(env) should be(getDefaultYarnApplicationClasspath :+ Fixtures.knownMRAppCP)
}
}
@@ -96,7 +92,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
withAppConf(Fixtures.mapAppConf) { conf =>
val env = newEnv
populateHadoopClasspath(conf, env)
- classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
+ classpath(env) should be(Array(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP))
}
}
@@ -104,14 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
private val USER = "local:/userJar"
private val ADDED = "local:/addJar1,local:/addJar2,/addJar3"
- private val PWD =
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- "{{PWD}}"
- } else if (Utils.isWindows) {
- "%PWD%"
- } else {
- Environment.PWD.$()
- }
+ private val PWD = "{{PWD}}"
test("Local jar URIs") {
val conf = new Configuration()
@@ -388,26 +377,18 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
object Fixtures {
val knownDefYarnAppCP: Seq[String] =
- getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration],
- "DEFAULT_YARN_APPLICATION_CLASSPATH",
- Seq[String]())(a => a.toSeq)
-
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.toSeq
val knownDefMRAppCP: Seq[String] =
- getFieldValue2[String, Array[String], Seq[String]](
- classOf[MRJobConfig],
- "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH",
- Seq[String]())(a => a.split(","))(a => a.toSeq)
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH.split(",").toSeq
- val knownYARNAppCP = Some(Seq("/known/yarn/path"))
+ val knownYARNAppCP = "/known/yarn/path"
- val knownMRAppCP = Some(Seq("/known/mr/path"))
+ val knownMRAppCP = "/known/mr/path"
- val mapMRAppConf =
- Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get)
+ val mapMRAppConf = Map("mapreduce.application.classpath" -> knownMRAppCP)
- val mapYARNAppConf =
- Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get)
+ val mapYARNAppConf = Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP)
val mapAppConf = mapYARNAppConf ++ mapMRAppConf
}
@@ -423,28 +404,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
def classpath(env: MutableHashMap[String, String]): Array[String] =
env(Environment.CLASSPATH.name).split(":|;|<CPS>")
- def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] =
- (a ++ b).flatten.toArray
-
- def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = {
- Try(clazz.getField(field))
- .map(_.get(null).asInstanceOf[A])
- .toOption
- .map(mapTo)
- .getOrElse(defaults)
- }
-
- def getFieldValue2[A: ClassTag, A1: ClassTag, B](
- clazz: Class[_],
- field: String,
- defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = {
- Try(clazz.getField(field)).map(_.get(null)).map {
- case v: A => mapTo(v)
- case v1: A1 => mapTo1(v1)
- case _ => defaults
- }.toOption.getOrElse(defaults)
- }
-
private def createClient(
sparkConf: SparkConf,
conf: Configuration = new Configuration(),
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 7fbbe12609..a057618b39 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.io.Text
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
@@ -147,28 +145,6 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
- test("test expandEnvironment result") {
- val target = Environment.PWD
- if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}")
- } else if (Utils.isWindows) {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%")
- } else {
- YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
- }
-
- }
-
- test("test getClassPathSeparator result") {
- if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) {
- YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
- } else if (Utils.isWindows) {
- YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
- } else {
- YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
- }
- }
-
test("check different hadoop utils based on env variable") {
try {
System.setProperty("SPARK_YARN_MODE", "true")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index d197daf7e0..14f721d6a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -74,23 +74,21 @@ class FileScanRDD(
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// apply readFunction, because it might read some bytes.
- private val getBytesReadCallback: Option[() => Long] =
+ private val getBytesReadCallback =
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
+ // We get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
private def updateBytesRead(): Unit = {
- getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
- }
+ inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
}
// If we can't get the bytes read from the FS stats, fall back to the file size,
// which may be inaccurate.
private def updateBytesReadWithFileSize(): Unit = {
- if (getBytesReadCallback.isEmpty && currentFile != null) {
+ if (currentFile != null) {
inputMetrics.incBytesRead(currentFile.length)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
index 938af25a96..c3dd6939ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -59,10 +59,6 @@ class RecordReaderIterator[T](
override def close(): Unit = {
if (rowReader != null) {
try {
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // older Hadoop 2.x releases. That bug can lead to non-deterministic corruption issues
- // when reading compressed input.
rowReader.close()
} finally {
rowReader = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 1b41352893..bfdc2cb0ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -151,7 +151,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
IOUtils.closeQuietly(output)
}
} catch {
- case e: IOException if isFileAlreadyExistsException(e) =>
+ case e: FileAlreadyExistsException =>
// Failed to create "tempPath". There are two cases:
// 1. Someone is creating "tempPath" too.
// 2. This is a restart. "tempPath" has already been created but not moved to the final
@@ -190,7 +190,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
} catch {
- case e: IOException if isFileAlreadyExistsException(e) =>
+ case e: FileAlreadyExistsException =>
// If "rename" fails, it means some other "HDFSMetadataLog" has committed the batch.
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
@@ -206,13 +206,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}
- private def isFileAlreadyExistsException(e: IOException): Boolean = {
- e.isInstanceOf[FileAlreadyExistsException] ||
- // Old Hadoop versions don't throw FileAlreadyExistsException. Although it's fixed in
- // HADOOP-9361 in Hadoop 2.5, we still need to support old Hadoop versions.
- (e.getMessage != null && e.getMessage.startsWith("File already exists: "))
- }
-
/**
* @return the deserialized metadata in a batch file, or None if file not exist.
* @throws IllegalArgumentException when path does not point to a batch file.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 63fdd6b090..d2487a2c03 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -52,7 +52,7 @@ private[hive] object IsolatedClientLoader extends Logging {
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(hiveMetastoreVersion)
// We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
- // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
+ // with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes.
var sharesHadoopClasses = true
val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
resolvedVersions((resolvedVersion, hadoopVersion))
@@ -63,17 +63,14 @@ private[hive] object IsolatedClientLoader extends Logging {
} catch {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
- // version cannot be resolved (e.g. it is a vendor specific version like
- // 2.0.0-cdh4.1.1). If it is the case, we will try just
- // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
- // is used just because we used to hard code it as the hadoop artifact to download.
- logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
- s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
+ // version cannot be resolved.
+ logWarning(s"Failed to resolve Hadoop artifacts for the version $hadoopVersion. " +
+ s"We will change the hadoop version from $hadoopVersion to 2.6.0 and try again. " +
"Hadoop classes will not be shared between Spark and Hive metastore client. " +
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
- (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
+ (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
resolvedVersions((resolvedVersion, actualHadoopVersion))