diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 25 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 153 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 20 |
6 files changed, 163 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 106fb2960f..8aa77266bc 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -778,10 +778,20 @@ abstract class RDD[T: ClassManifest]( }.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 - }.toArray + }.toArray.sorted(ord.reverse) } /** + * Returns the first K elements from this RDD as defined by + * the specified implicit Ordering[T] and maintains the + * ordering. + * @param num the number of top elements to return + * @param ord the implicit ordering for T + * @return an array of top elements + */ + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) + + /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 5a37532306..c6a3f97872 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -6,6 +6,7 @@ import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} import java.util.regex.Pattern +import scala.collection.Map import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConversions._ import scala.io.Source @@ -545,10 +546,15 @@ private object Utils extends Logging { /** * Execute a command and get its output, throwing an exception if it yields a code other than 0. */ - def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = { - val process = new ProcessBuilder(command: _*) + def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."), + extraEnvironment: Map[String, String] = Map.empty): String = { + val builder = new ProcessBuilder(command: _*) .directory(workingDir) - .start() + val environment = builder.environment() + for ((key, value) <- extraEnvironment) { + environment.put(key, value) + } + val process = builder.start() new Thread("read stderr for " + command(0)) { override def run() { for (line <- Source.fromInputStream(process.getErrorStream).getLines) { diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index b555f2030a..27f40ecdfd 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -384,4 +384,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] top(num, comp) } + + /** + * Returns the first K elements from this RDD as defined by + * the specified Comparator[T] and maintains the order. + * @param num the number of top elements to return + * @param comp the comparator that defines the order + * @return an array of top elements + */ + def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = { + import scala.collection.JavaConversions._ + val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)) + val arr: java.util.Collection[T] = topElems.toSeq + new java.util.ArrayList(arr) + } + + /** + * Returns the first K elements from this RDD using the + * natural ordering for T while maintain the order. + * @param num the number of top elements to return + * @return an array of top elements + */ + def takeOrdered(num: Int): JList[T] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]] + takeOrdered(num, comp) + } } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index d7f58b2cb1..5d3d54c65e 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -98,7 +98,9 @@ private[spark] class ExecutorRunner( // Figure out our classpath with the external compute-classpath script val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" - val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext)) + val classPath = Utils.executeAndGetOutput( + Seq(sparkHome + "/bin/compute-classpath" + ext), + extraEnvironment=appDesc.command.environment) Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index ca7fab4cc5..e83368b98d 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend( // An ExecutorInfo for our tasks var execArgs: Array[Byte] = null + var classLoader: ClassLoader = null + override def start() { synchronized { + classLoader = Thread.currentThread.getContextClassLoader + new Thread("MesosSchedulerBackend driver") { setDaemon(true) override def run() { @@ -114,13 +118,28 @@ private[spark] class MesosSchedulerBackend( return execArgs } + private def setClassLoader(): ClassLoader = { + val oldClassLoader = Thread.currentThread.getContextClassLoader + Thread.currentThread.setContextClassLoader(classLoader) + return oldClassLoader + } + + private def restoreClassLoader(oldClassLoader: ClassLoader) { + Thread.currentThread.setContextClassLoader(oldClassLoader) + } + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - logInfo("Registered as framework ID " + frameworkId.getValue) - registeredLock.synchronized { - isRegistered = true - registeredLock.notifyAll() + val oldClassLoader = setClassLoader() + try { + logInfo("Registered as framework ID " + frameworkId.getValue) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } finally { + restoreClassLoader(oldClassLoader) } } @@ -142,49 +161,54 @@ private[spark] class MesosSchedulerBackend( * tasks are balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { - // Build a big list of the offerable workers, and remember their indices so that we can - // figure out which Offer to reply to for each worker - val offerableIndices = new ArrayBuffer[Int] - val offerableWorkers = new ArrayBuffer[WorkerOffer] - - def enoughMemory(o: Offer) = { - val mem = getResource(o.getResourcesList, "mem") - val slaveId = o.getSlaveId.getValue - mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) - } + val oldClassLoader = setClassLoader() + try { + synchronized { + // Build a big list of the offerable workers, and remember their indices so that we can + // figure out which Offer to reply to for each worker + val offerableIndices = new ArrayBuffer[Int] + val offerableWorkers = new ArrayBuffer[WorkerOffer] + + def enoughMemory(o: Offer) = { + val mem = getResource(o.getResourcesList, "mem") + val slaveId = o.getSlaveId.getValue + mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) + } - for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { - offerableIndices += index - offerableWorkers += new WorkerOffer( - offer.getSlaveId.getValue, - offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) - } + for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { + offerableIndices += index + offerableWorkers += new WorkerOffer( + offer.getSlaveId.getValue, + offer.getHostname, + getResource(offer.getResourcesList, "cpus").toInt) + } - // Call into the ClusterScheduler - val taskLists = scheduler.resourceOffers(offerableWorkers) - - // Build a list of Mesos tasks for each slave - val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) - for ((taskList, index) <- taskLists.zipWithIndex) { - if (!taskList.isEmpty) { - val offerNum = offerableIndices(index) - val slaveId = offers(offerNum).getSlaveId.getValue - slaveIdsWithExecutors += slaveId - mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) - for (taskDesc <- taskList) { - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) + // Call into the ClusterScheduler + val taskLists = scheduler.resourceOffers(offerableWorkers) + + // Build a list of Mesos tasks for each slave + val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]()) + for ((taskList, index) <- taskLists.zipWithIndex) { + if (!taskList.isEmpty) { + val offerNum = offerableIndices(index) + val slaveId = offers(offerNum).getSlaveId.getValue + slaveIdsWithExecutors += slaveId + mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size) + for (taskDesc <- taskList) { + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId)) + } } } - } - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId, mesosTasks(i), filters) + // Reply to the offers + val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? + for (i <- 0 until offers.size) { + d.launchTasks(offers(i).getId, mesosTasks(i), filters) + } } + } finally { + restoreClassLoader(oldClassLoader) } } @@ -224,23 +248,33 @@ private[spark] class MesosSchedulerBackend( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val tid = status.getTaskId.getValue.toLong - val state = TaskState.fromMesos(status.getState) - synchronized { - if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - slaveIdsWithExecutors -= taskIdToSlaveId(tid) - } - if (isFinished(status.getState)) { - taskIdToSlaveId.remove(tid) + val oldClassLoader = setClassLoader() + try { + val tid = status.getTaskId.getValue.toLong + val state = TaskState.fromMesos(status.getState) + synchronized { + if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { + // We lost the executor on this slave, so remember that it's gone + slaveIdsWithExecutors -= taskIdToSlaveId(tid) + } + if (isFinished(status.getState)) { + taskIdToSlaveId.remove(tid) + } } + scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) + } finally { + restoreClassLoader(oldClassLoader) } - scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) } override def error(d: SchedulerDriver, message: String) { - logError("Mesos error: " + message) - scheduler.error(message) + val oldClassLoader = setClassLoader() + try { + logError("Mesos error: " + message) + scheduler.error(message) + } finally { + restoreClassLoader(oldClassLoader) + } } override def stop() { @@ -256,11 +290,16 @@ private[spark] class MesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - slaveIdsWithExecutors -= slaveId.getValue + val oldClassLoader = setClassLoader() + try { + logInfo("Mesos slave lost: " + slaveId.getValue) + synchronized { + slaveIdsWithExecutors -= slaveId.getValue + } + scheduler.executorLost(slaveId.getValue, reason) + } finally { + restoreClassLoader(oldClassLoader) } - scheduler.executorLost(slaveId.getValue, reason) } override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index e41ae385c0..aa3ee5f5ee 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -240,7 +240,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) assert(topK.size === 5) - assert(topK.sorted === nums.sorted.takeRight(5)) + assert(topK === nums.reverse.take(5)) } test("top with custom ordering") { @@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(topK.sorted === Array("b", "a")) } + test("takeOrdered with predefined ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val rdd = sc.makeRDD(nums, 2) + val sortedLowerK = rdd.takeOrdered(5) + assert(sortedLowerK.size === 5) + assert(sortedLowerK === Array(1, 2, 3, 4, 5)) + } + + test("takeOrdered with custom ordering") { + val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + implicit val ord = implicitly[Ordering[Int]].reverse + val rdd = sc.makeRDD(nums, 2) + val sortedTopK = rdd.takeOrdered(5) + assert(sortedTopK.size === 5) + assert(sortedTopK === Array(10, 9, 8, 7, 6)) + assert(sortedTopK === nums.sorted(ord).take(5)) + } + test("takeSample") { val data = sc.parallelize(1 to 100, 2) for (seed <- 1 to 5) { |