diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 12:34:46 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 12:34:46 +0530 |
commit | 95d8dbce91f49467050250d5cf3671aaaa648d76 (patch) | |
tree | 06e2583c63cdf39d6d15d36a3189c2e6db0148ba /core/src/main/scala/org | |
parent | 199e9cf02dfaa372c1f067bca54556e1f6ce787d (diff) | |
parent | 2fead510f74b962b293de4d724136c24a9825271 (diff) | |
download | spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.gz spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.bz2 spark-95d8dbce91f49467050250d5cf3671aaaa648d76.zip |
Merge branch 'master' of github.com:apache/incubator-spark into scala-2.10-temp
Conflicts:
core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
Diffstat (limited to 'core/src/main/scala/org')
15 files changed, 83 insertions, 75 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a12f8860b9..b9fe7f604e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.reflect.{ ClassTag, classTag} diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 668032a3a2..0aa8852649 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -1,19 +1,19 @@ /* * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 94cf4ff88b..59d12a3e6f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -17,12 +17,12 @@ package org.apache.spark.deploy -import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.actor.ActorSystem import org.apache.spark.deploy.worker.Worker import org.apache.spark.deploy.master.Master -import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.{Logging} +import org.apache.spark.util.Utils +import org.apache.spark.Logging import scala.collection.mutable.ArrayBuffer diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 16d8f81a71..a98ec06be9 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.Logging import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{Utils, AkkaUtils} diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 34ed9c8f73..97176e4f5b 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -20,8 +20,6 @@ package org.apache.spark.executor import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem -import org.apache.hadoop.fs.LocalFileSystem import scala.collection.JavaConversions._ diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index 481ff8c3e0..b1e1576dad 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging { extends FileClientHandler with Logging { override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) { - logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)"); + logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)") resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 4fb7f3aace..d2a3d60965 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -71,7 +71,7 @@ class CartesianRDD[T: ClassTag, U:ClassTag]( override def compute(split: Partition, context: TaskContext) = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index bb9b309a70..ea8885b36e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -35,11 +35,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo extends NarrowDependency[T](rdd) { @transient - val partitions: Array[Partition] = rdd.partitions.zipWithIndex - .filter(s => partitionFilterFunc(s._2)) + val partitions: Array[Partition] = rdd.partitions + .filter(s => partitionFilterFunc(s.index)).zipWithIndex .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } - override def getParents(partitionId: Int) = List(partitions(partitionId).index) + override def getParents(partitionId: Int) = { + List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index) + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 9975ec1ab6..7c9d6a93e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -250,7 +250,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None - var taskFailed = false synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { @@ -270,9 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } taskIdToExecutorId.remove(tid) } - if (state == TaskState.FAILED) { - taskFailed = true - } activeTaskSets.get(taskSetId).foreach { taskSet => if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) @@ -294,10 +290,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } - if (taskFailed) { - // Also revive offers if a task had failed for some reason other than host lost - backend.reviveOffers() - } } def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { @@ -317,8 +309,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) taskState: TaskState, reason: Option[TaskEndReason]) = synchronized { taskSetManager.handleFailedTask(tid, taskState, reason) - if (taskState == TaskState.FINISHED) { - // The task finished successfully but the result was lost, so we should revive offers. + if (taskState != TaskState.KILLED) { + // Need to revive offers again now that the task set manager state has been updated to + // reflect failed tasks that need to be re-run. backend.reviveOffers() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala index ee47aaffca..4c5eca8537 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster +import java.io.NotSerializableException import java.util.Arrays import scala.collection.mutable.ArrayBuffer @@ -484,6 +485,14 @@ private[spark] class ClusterTaskSetManager( case ef: ExceptionFailure => sched.dagScheduler.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) + if (ef.className == classOf[NotSerializableException].getName()) { + // If the task result wasn't serializable, there's no point in trying to re-execute it. + logError("Task %s:%s had a not serializable result: %s; not retrying".format( + taskSet.id, index, ef.description)) + abort("Task %s:%s had a not serializable result: %s".format( + taskSet.id, index, ef.description)) + return + } val key = ef.description val now = clock.getTime() val (printFull, dupCount) = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 03cf1e2853..821c30a119 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -194,6 +194,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac } override def stop() { + stopExecutors() try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 6b91935400..e000531a26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -31,10 +31,6 @@ private[spark] class SimrSchedulerBackend( val tmpPath = new Path(driverFilePath + "_tmp") val filePath = new Path(driverFilePath) - val uiFilePath = driverFilePath + "_ui" - val tmpUiPath = new Path(uiFilePath + "_tmp") - val uiPath = new Path(uiFilePath) - val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt override def start() { @@ -49,30 +45,23 @@ private[spark] class SimrSchedulerBackend( logInfo("Writing to HDFS file: " + driverFilePath) logInfo("Writing Akka address: " + driverUrl) - logInfo("Writing to HDFS file: " + uiFilePath) logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress) // Create temporary file to prevent race condition where executors get empty driverUrl file val temp = fs.create(tmpPath, true) temp.writeUTF(driverUrl) temp.writeInt(maxCores) + temp.writeUTF(sc.ui.appUIAddress) temp.close() // "Atomic" rename fs.rename(tmpPath, filePath) - - // Write Spark UI Address to file - val uiTemp = fs.create(tmpUiPath, true) - uiTemp.writeUTF(sc.ui.appUIAddress) - uiTemp.close() - fs.rename(tmpUiPath, uiPath) } override def stop() { val conf = new Configuration() val fs = FileSystem.get(conf) fs.delete(new Path(driverFilePath), false) - super.stopExecutors() super.stop() } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 42e9be6e19..e596690bc3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { </tr> } - val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b) + val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execTable = UIUtils.listingTable(execHead, execRow, execInfo) val content = @@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors) } - def getExecInfo(a: Int): Seq[String] = { - val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId - val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort - val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString - val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString - val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString - val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString - val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0) - val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) - val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0) + def getExecInfo(statusId: Int): Seq[String] = { + val status = sc.getExecutorStorageStatus(statusId) + val execId = status.blockManagerId.executorId + val hostPort = status.blockManagerId.hostPort + val rddBlocks = status.blocks.size.toString + val memUsed = status.memUsed().toString + val maxMem = status.maxMem.toString + val diskUsed = status.diskUsed().toString + val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size + val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) + val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks Seq( diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c1c7aa70e6..fbd822867f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) { summary ++ <h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++ <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ - <h4>Tasks</h4> ++ taskTable; + <h4>Tasks</h4> ++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 465c221d5f..b84eb65c62 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -19,35 +19,51 @@ package org.apache.spark.util.collection import scala.reflect.ClassTag -/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */ +/** + * An append-only, non-threadsafe, array-backed vector that is optimized for primitive types. + */ private[spark] class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) { - private var numElements = 0 - private var array: Array[V] = _ + private var _numElements = 0 + private var _array: Array[V] = _ // NB: This must be separate from the declaration, otherwise the specialized parent class - // will get its own array with the same initial size. TODO: Figure out why... - array = new Array[V](initialSize) + // will get its own array with the same initial size. + _array = new Array[V](initialSize) def apply(index: Int): V = { - require(index < numElements) - array(index) + require(index < _numElements) + _array(index) } def +=(value: V) { - if (numElements == array.length) { resize(array.length * 2) } - array(numElements) = value - numElements += 1 + if (_numElements == _array.length) { + resize(_array.length * 2) + } + _array(_numElements) = value + _numElements += 1 } - def length = numElements + def capacity: Int = _array.length + + def length: Int = _numElements + + def size: Int = _numElements + + /** Gets the underlying array backing this vector. */ + def array: Array[V] = _array - def getUnderlyingArray = array + /** Trims this vector so that the capacity is equal to the size. */ + def trim(): PrimitiveVector[V] = resize(size) /** Resizes the array, dropping elements if the total length decreases. */ - def resize(newLength: Int) { + def resize(newLength: Int): PrimitiveVector[V] = { val newArray = new Array[V](newLength) - array.copyToArray(newArray) - array = newArray + _array.copyToArray(newArray) + _array = newArray + if (newLength < _numElements) { + _numElements = newLength + } + this } } |