aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala86
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala117
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalALS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala15
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTC.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala53
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala63
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala2
39 files changed, 415 insertions, 221 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a12f8860b9..b9fe7f604e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.reflect.{ ClassTag, classTag}
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 668032a3a2..0aa8852649 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -1,19 +1,19 @@
/*
*
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 94cf4ff88b..59d12a3e6f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -17,12 +17,12 @@
package org.apache.spark.deploy
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.actor.ActorSystem
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 16d8f81a71..a98ec06be9 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.Logging
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 34ed9c8f73..97176e4f5b 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -20,8 +20,6 @@ package org.apache.spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.hdfs.DistributedFileSystem
-import org.apache.hadoop.fs.LocalFileSystem
import scala.collection.JavaConversions._
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index 481ff8c3e0..b1e1576dad 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging {
extends FileClientHandler with Logging {
override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
- logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
+ logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 4fb7f3aace..d2a3d60965 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -71,7 +71,7 @@ class CartesianRDD[T: ClassTag, U:ClassTag](
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+ y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
override def getDependencies: Seq[Dependency[_]] = List(
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index bb9b309a70..ea8885b36e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -35,11 +35,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {
@transient
- val partitions: Array[Partition] = rdd.partitions.zipWithIndex
- .filter(s => partitionFilterFunc(s._2))
+ val partitions: Array[Partition] = rdd.partitions
+ .filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
- override def getParents(partitionId: Int) = List(partitions(partitionId).index)
+ override def getParents(partitionId: Int) = {
+ List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 9975ec1ab6..7c9d6a93e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -250,7 +250,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
- var taskFailed = false
synchronized {
try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
@@ -270,9 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
taskIdToExecutorId.remove(tid)
}
- if (state == TaskState.FAILED) {
- taskFailed = true
- }
activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
@@ -294,10 +290,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
- if (taskFailed) {
- // Also revive offers if a task had failed for some reason other than host lost
- backend.reviveOffers()
- }
}
def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
@@ -317,8 +309,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
- if (taskState == TaskState.FINISHED) {
- // The task finished successfully but the result was lost, so we should revive offers.
+ if (taskState != TaskState.KILLED) {
+ // Need to revive offers again now that the task set manager state has been updated to
+ // reflect failed tasks that need to be re-run.
backend.reviveOffers()
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index ee47aaffca..4c5eca8537 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster
+import java.io.NotSerializableException
import java.util.Arrays
import scala.collection.mutable.ArrayBuffer
@@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager(
case ef: ExceptionFailure =>
sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
+ if (ef.className == classOf[NotSerializableException].getName()) {
+ // If the task result wasn't serializable, there's no point in trying to re-execute it.
+ logError("Task %s:%s had a not serializable result: %s; not retrying".format(
+ taskSet.id, index, ef.description))
+ abort("Task %s:%s had a not serializable result: %s".format(
+ taskSet.id, index, ef.description))
+ return
+ }
val key = ef.description
val now = clock.getTime()
val (printFull, dupCount) = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 03cf1e2853..821c30a119 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -194,6 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
}
override def stop() {
+ stopExecutors()
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 6b91935400..e000531a26 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -31,10 +31,6 @@ private[spark] class SimrSchedulerBackend(
val tmpPath = new Path(driverFilePath + "_tmp")
val filePath = new Path(driverFilePath)
- val uiFilePath = driverFilePath + "_ui"
- val tmpUiPath = new Path(uiFilePath + "_tmp")
- val uiPath = new Path(uiFilePath)
-
val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
override def start() {
@@ -49,30 +45,23 @@ private[spark] class SimrSchedulerBackend(
logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
- logInfo("Writing to HDFS file: " + uiFilePath)
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
+ temp.writeUTF(sc.ui.appUIAddress)
temp.close()
// "Atomic" rename
fs.rename(tmpPath, filePath)
-
- // Write Spark UI Address to file
- val uiTemp = fs.create(tmpUiPath, true)
- uiTemp.writeUTF(sc.ui.appUIAddress)
- uiTemp.close()
- fs.rename(tmpUiPath, uiPath)
}
override def stop() {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
- super.stopExecutors()
super.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 42e9be6e19..e596690bc3 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
</tr>
}
- val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
+ val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
@@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
}
- def getExecInfo(a: Int): Seq[String] = {
- val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
- val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
- val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
- val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
- val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
- val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
- val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
- val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
+ def getExecInfo(statusId: Int): Seq[String] = {
+ val status = sc.getExecutorStorageStatus(statusId)
+ val execId = status.blockManagerId.executorId
+ val hostPort = status.blockManagerId.hostPort
+ val rddBlocks = status.blocks.size.toString
+ val memUsed = status.memUsed().toString
+ val maxMem = status.maxMem.toString
+ val diskUsed = status.diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
+ val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+ val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c1c7aa70e6..fbd822867f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
- <h4>Tasks</h4> ++ taskTable;
+ <h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 465c221d5f..b84eb65c62 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -19,35 +19,51 @@ package org.apache.spark.util.collection
import scala.reflect.ClassTag
-/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+/**
+ * An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
+ */
private[spark]
class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
- private var numElements = 0
- private var array: Array[V] = _
+ private var _numElements = 0
+ private var _array: Array[V] = _
// NB: This must be separate from the declaration, otherwise the specialized parent class
- // will get its own array with the same initial size. TODO: Figure out why...
- array = new Array[V](initialSize)
+ // will get its own array with the same initial size.
+ _array = new Array[V](initialSize)
def apply(index: Int): V = {
- require(index < numElements)
- array(index)
+ require(index < _numElements)
+ _array(index)
}
def +=(value: V) {
- if (numElements == array.length) { resize(array.length * 2) }
- array(numElements) = value
- numElements += 1
+ if (_numElements == _array.length) {
+ resize(_array.length * 2)
+ }
+ _array(_numElements) = value
+ _numElements += 1
}
- def length = numElements
+ def capacity: Int = _array.length
+
+ def length: Int = _numElements
+
+ def size: Int = _numElements
+
+ /** Gets the underlying array backing this vector. */
+ def array: Array[V] = _array
- def getUnderlyingArray = array
+ /** Trims this vector so that the capacity is equal to the size. */
+ def trim(): PrimitiveVector[V] = resize(size)
/** Resizes the array, dropping elements if the total length decreases. */
- def resize(newLength: Int) {
+ def resize(newLength: Int): PrimitiveVector[V] = {
val newArray = new Array[V](newLength)
- array.copyToArray(newArray)
- array = newArray
+ _array.copyToArray(newArray)
+ _array = newArray
+ if (newLength < _numElements) {
+ _numElements = newLength
+ }
+ this
}
}
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 459e257d79..8dd5786da6 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _
override def beforeAll() {
- InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
}
diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
deleted file mode 100644
index 21f16ef2c6..0000000000
--- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.FunSuite
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
-
-
-class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
-
- test("Pruned Partitions inherit locality prefs correctly") {
- class TestPartition(i: Int) extends Partition {
- def index = i
- }
- val rdd = new RDD[Int](sc, Nil) {
- override protected def getPartitions = {
- Array[Partition](
- new TestPartition(1),
- new TestPartition(2),
- new TestPartition(3))
- }
- def compute(split: Partition, context: TaskContext) = {Iterator()}
- }
- val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
- val p = prunedRDD.partitions(0)
- assert(p.index == 2)
- assert(prunedRDD.partitions.length == 1)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d938917f2..1374d01774 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
.filter(_ >= 0.0)
// Run the partitions, including the consecutive empty ones, through StatCounter
- val stats: StatCounter = rdd.stats();
- assert(abs(6.0 - stats.sum) < 0.01);
- assert(abs(6.0/2 - rdd.mean) < 0.01);
- assert(abs(1.0 - rdd.variance) < 0.01);
- assert(abs(1.0 - rdd.stdev) < 0.01);
+ val stats: StatCounter = rdd.stats()
+ assert(abs(6.0 - stats.sum) < 0.01)
+ assert(abs(6.0/2 - rdd.mean) < 0.01)
+ assert(abs(1.0 - rdd.variance) < 0.01)
+ assert(abs(1.0 - rdd.stdev) < 0.01)
// Add other tests here for classes that should be able to handle empty partitions correctly
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000..53a7b7c44d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.scalatest.FunSuite
+import org.apache.spark.{TaskContext, Partition, SharedSparkContext}
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 1),
+ new TestPartition(1, 1),
+ new TestPartition(2, 1))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ Iterator()
+ }
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+ assert(prunedRDD.partitions.length == 1)
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 0)
+ assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2)
+ }
+
+
+ test("Pruned Partitions can be unioned ") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 4),
+ new TestPartition(1, 5),
+ new TestPartition(2, 6))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ List(split.asInstanceOf[TestPartition].testValue).iterator
+ }
+ }
+ val prunedRDD1 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 0) true else false
+ })
+
+ val prunedRDD2 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+
+ val merged = prunedRDD1 ++ prunedRDD2
+ assert(merged.count() == 2)
+ val take = merged.take(2)
+ assert(take.apply(0) == 4)
+ assert(take.apply(1) == 6)
+ }
+}
+
+class TestPartition(i: Int, value: Int) extends Partition with Serializable {
+ def index = i
+
+ def testValue = this.value
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala
new file mode 100644
index 0000000000..970dade628
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class PrimitiveVectorSuite extends FunSuite {
+
+ test("primitive value") {
+ val vector = new PrimitiveVector[Int]
+
+ for (i <- 0 until 1000) {
+ vector += i
+ assert(vector(i) === i)
+ }
+
+ assert(vector.size === 1000)
+ assert(vector.size == vector.length)
+ intercept[IllegalArgumentException] {
+ vector(1000)
+ }
+
+ for (i <- 0 until 1000) {
+ assert(vector(i) == i)
+ }
+ }
+
+ test("non-primitive value") {
+ val vector = new PrimitiveVector[String]
+
+ for (i <- 0 until 1000) {
+ vector += i.toString
+ assert(vector(i) === i.toString)
+ }
+
+ assert(vector.size === 1000)
+ assert(vector.size == vector.length)
+ intercept[IllegalArgumentException] {
+ vector(1000)
+ }
+
+ for (i <- 0 until 1000) {
+ assert(vector(i) == i.toString)
+ }
+ }
+
+ test("ideal growth") {
+ val vector = new PrimitiveVector[Long](initialSize = 1)
+ vector += 1
+ for (i <- 1 until 1024) {
+ vector += i
+ assert(vector.size === i + 1)
+ assert(vector.capacity === Integer.highestOneBit(i) * 2)
+ }
+ assert(vector.capacity === 1024)
+ vector += 1024
+ assert(vector.capacity === 2048)
+ }
+
+ test("ideal size") {
+ val vector = new PrimitiveVector[Long](8192)
+ for (i <- 0 until 8192) {
+ vector += i
+ }
+ assert(vector.size === 8192)
+ assert(vector.capacity === 8192)
+ val actualSize = SizeEstimator.estimate(vector)
+ val expectedSize = 8192 * 8
+ // Make sure we are not allocating a significant amount of memory beyond our expected.
+ // Due to specialization wonkiness, we need to ensure we don't have 2 copies of the array.
+ assert(actualSize < expectedSize * 1.1)
+ }
+
+ test("resizing") {
+ val vector = new PrimitiveVector[Long]
+ for (i <- 0 until 4097) {
+ vector += i
+ }
+ assert(vector.size === 4097)
+ assert(vector.capacity === 8192)
+ vector.trim()
+ assert(vector.size === 4097)
+ assert(vector.capacity === 4097)
+ vector.resize(5000)
+ assert(vector.size === 4097)
+ assert(vector.capacity === 5000)
+ vector.resize(4000)
+ assert(vector.size === 4000)
+ assert(vector.capacity === 4000)
+ vector.resize(5000)
+ assert(vector.size === 4000)
+ assert(vector.capacity === 5000)
+ for (i <- 0 until 4000) {
+ assert(vector(i) == i)
+ }
+ intercept[IllegalArgumentException] {
+ vector(4000)
+ }
+ }
+}
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6fd1d0d150..4056e9c15d 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -37,6 +37,8 @@ System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
+* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
+* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
# Launching Spark on YARN
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 529709c2f9..a119980992 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -32,13 +32,13 @@ object BroadcastTest {
System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
- val sc = new SparkContext(args(0), "Broadcast Test 2",
+ val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
- var arr1 = new Array[Int](num)
+ val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}
@@ -48,9 +48,9 @@ object BroadcastTest {
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
- sc.parallelize(1 to 10, slices).foreach {
- i => println(barr1.value.size)
- }
+ val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size)
+ // Collect the small RDD so we can print the observed sizes locally.
+ observedSizes.collect().foreach(i => println(i))
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 4af45b2b4a..83db8b9e26 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -120,7 +120,7 @@ object LocalALS {
System.exit(1)
}
}
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val R = generateR()
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
index f79f0142b8..e1afc29f9a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
@@ -18,35 +18,38 @@
package org.apache.spark.examples
import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
object MultiBroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
+ System.err.println("Usage: MultiBroadcastTest <master> [<slices>] [numElem]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Broadcast Test",
+ val sc = new SparkContext(args(0), "Multi-Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
- var arr1 = new Array[Int](num)
+ val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}
- var arr2 = new Array[Int](num)
+ val arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) {
arr2(i) = i
}
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
- sc.parallelize(1 to 10, slices).foreach {
- i => println(barr1.value.size + barr2.value.size)
+ val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
+ (barr1.value.size, barr2.value.size)
}
+ // Collect the small RDD so we can print the observed sizes locally.
+ observedSizes.collect().foreach(i => println(i))
System.exit(0)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index 5a7a9d1bd8..8543ce0e32 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -65,7 +65,7 @@ object SparkTC {
oldCount = nextCount
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
- tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
nextCount = tc.count()
} while (nextCount != oldCount)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 128711aacd..50e3f9639c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -121,7 +121,7 @@ object FeederActor {
println("Feeder started as:" + feeder)
- actorSystem.awaitTermination();
+ actorSystem.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index af698a01d5..ff332a0282 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -54,12 +54,12 @@ object MQTTPublisher {
client.connect()
- val msgtopic: MqttTopic = client.getTopic(topic);
+ val msgtopic: MqttTopic = client.getTopic(topic)
val msg: String = "hello mqtt demo for spark streaming"
while (true) {
val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
- msgtopic.publish(message);
+ msgtopic.publish(message)
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
client.disconnect()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index bb9febad38..9271914eb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
fs.delete(file, false)
fs.rename(writeFile, file)
- val finishTime = System.currentTimeMillis();
+ val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
return
@@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def stop() {
synchronized {
- if (stopped) return ;
+ if (stopped) {
+ return
+ }
stopped = true
}
executor.shutdown()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index ca0c905932..80dcf87491 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap, List => JList}
@@ -36,10 +36,9 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -315,7 +314,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
- ssc.fileStream[K, V, F](directory);
+ ssc.fileStream[K, V, F](directory)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
index b72ab90e60..60d79175f1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -138,8 +138,8 @@ class FlumeReceiver(
protected override def onStart() {
val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this));
- val server = new NettyServer(responder, new InetSocketAddress(host, port));
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
server.start()
logInfo("Flume receiver started")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a559db468a..7dc82decef 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort));
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver);
+ classOf[AvroSourceProtocol], transceiver)
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 126915abc9..2f34e812a1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -252,7 +252,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
Thread.sleep(500) // Give some time for the forgetting old RDDs to complete
} catch {
- case e: Exception => e.printStackTrace(); throw e;
+ case e: Exception => {e.printStackTrace(); throw e}
} finally {
ssc.stop()
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4302ef4cda..a7baf0c36c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,11 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.IOException;
+import java.io.IOException
import java.net.Socket
-import java.security.PrivilegedExceptionAction
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
-import org.apache.hadoop.security.UserGroupInformation
+
import scala.collection.JavaConversions._
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
+ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -65,7 +67,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
// Workaround until hadoop moves to something which has
@@ -186,8 +188,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
var successed = false
try {
// Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
// some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
// userThread will stop here unless it has uncaught exception thrown out
@@ -195,7 +197,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
successed = true
} finally {
logDebug("finishing main")
- isLastAMRetry = true;
+ isLastAMRetry = true
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
- sparkContext.preferredNodeLocationData)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args, sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
- ", numTries = " + numTries)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args)
}
}
} finally {
@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {
-
- this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
}
- // TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
// set tracking url to empty since we don't have a history server
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4e0e060ddc..94e353af2e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,14 +17,13 @@
package org.apache.spark.deploy.yarn
-import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
@@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.JavaConversions._
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.Logging
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@@ -60,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
+ validateArgs()
+
init(yarnConf)
start()
logClusterResourceDetails()
@@ -84,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0)
}
+ def validateArgs() = {
+ Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
+ .foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
@@ -97,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
-
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -105,7 +120,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// if we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -142,8 +157,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
} catch {
case e: UnknownHostException =>
return false
@@ -160,7 +175,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
- return true;
+ return true
}
/**
@@ -172,13 +187,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf);
+ val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
- fs.setReplication(newPath, replication);
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// resolve any symlinks in the URI path so using a "current" symlink
@@ -196,7 +211,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
@@ -208,18 +223,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
- logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
- System.exit(1)
- }
-
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
@@ -273,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}
@@ -334,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -354,17 +363,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- if (args.userClass == null) {
- logError("Error: You must specify a user class!")
- System.exit(1)
- }
-
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@@ -442,6 +446,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
+
new Client(args).run
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 07686fefd7..5f159b073f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
@@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging {
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache);
+ val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
- return true;
+ return true
}
return false
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 7a66532254..a4d6e1d87d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
@@ -38,7 +38,6 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
@@ -108,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
@@ -204,8 +203,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// use doAs and remoteUser here so we can add the container token and not
// pollute the current users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
if (containerToken != null) {
user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
}
@@ -216,8 +215,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
return rpc.getProxy(classOf[ContainerManager],
cmAddress, conf).asInstanceOf[ContainerManager]
}
- });
- return proxy;
+ })
+ proxy
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4beb5229fe..baa030b4a4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
+ private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// simply decrement count - next iteration of ReporterThread will take care of allocating !
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+ " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {
@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
-
-
if (numWorkers > 0) {
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ca2f1e2565..2ba2366ead 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,13 +18,10 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.deploy.SparkHadoopUtil
-import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
@@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials();
+ val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index c0a2af0c6f..2941356bc5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar