aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala12
-rw-r--r--core/src/main/scala/spark/Utils.scala12
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala25
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala153
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala20
6 files changed, 163 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 106fb2960f..8aa77266bc 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -778,10 +778,20 @@ abstract class RDD[T: ClassManifest](
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
- }.toArray
+ }.toArray.sorted(ord.reverse)
}
/**
+ * Returns the first K elements from this RDD as defined by
+ * the specified implicit Ordering[T] and maintains the
+ * ordering.
+ * @param num the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+ /**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 5a37532306..c6a3f97872 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -6,6 +6,7 @@ import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
import java.util.regex.Pattern
+import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._
import scala.io.Source
@@ -545,10 +546,15 @@ private object Utils extends Logging {
/**
* Execute a command and get its output, throwing an exception if it yields a code other than 0.
*/
- def executeAndGetOutput(command: Seq[String], workingDir: File = new File(".")): String = {
- val process = new ProcessBuilder(command: _*)
+ def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
+ extraEnvironment: Map[String, String] = Map.empty): String = {
+ val builder = new ProcessBuilder(command: _*)
.directory(workingDir)
- .start()
+ val environment = builder.environment()
+ for ((key, value) <- extraEnvironment) {
+ environment.put(key, value)
+ }
+ val process = builder.start()
new Thread("read stderr for " + command(0)) {
override def run() {
for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index b555f2030a..27f40ecdfd 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -384,4 +384,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
top(num, comp)
}
+
+ /**
+ * Returns the first K elements from this RDD as defined by
+ * the specified Comparator[T] and maintains the order.
+ * @param num the number of top elements to return
+ * @param comp the comparator that defines the order
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = {
+ import scala.collection.JavaConversions._
+ val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp))
+ val arr: java.util.Collection[T] = topElems.toSeq
+ new java.util.ArrayList(arr)
+ }
+
+ /**
+ * Returns the first K elements from this RDD using the
+ * natural ordering for T while maintain the order.
+ * @param num the number of top elements to return
+ * @return an array of top elements
+ */
+ def takeOrdered(num: Int): JList[T] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+ takeOrdered(num, comp)
+ }
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index d7f58b2cb1..5d3d54c65e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -98,7 +98,9 @@ private[spark] class ExecutorRunner(
// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
- val classPath = Utils.executeAndGetOutput(Seq(sparkHome + "/bin/compute-classpath" + ext))
+ val classPath = Utils.executeAndGetOutput(
+ Seq(sparkHome + "/bin/compute-classpath" + ext),
+ extraEnvironment=appDesc.command.environment)
Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index ca7fab4cc5..e83368b98d 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend(
// An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null
+ var classLoader: ClassLoader = null
+
override def start() {
synchronized {
+ classLoader = Thread.currentThread.getContextClassLoader
+
new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
@@ -114,13 +118,28 @@ private[spark] class MesosSchedulerBackend(
return execArgs
}
+ private def setClassLoader(): ClassLoader = {
+ val oldClassLoader = Thread.currentThread.getContextClassLoader
+ Thread.currentThread.setContextClassLoader(classLoader)
+ return oldClassLoader
+ }
+
+ private def restoreClassLoader(oldClassLoader: ClassLoader) {
+ Thread.currentThread.setContextClassLoader(oldClassLoader)
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
}
@@ -142,49 +161,54 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- // Build a big list of the offerable workers, and remember their indices so that we can
- // figure out which Offer to reply to for each worker
- val offerableIndices = new ArrayBuffer[Int]
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
- def enoughMemory(o: Offer) = {
- val mem = getResource(o.getResourcesList, "mem")
- val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
- }
+ val oldClassLoader = setClassLoader()
+ try {
+ synchronized {
+ // Build a big list of the offerable workers, and remember their indices so that we can
+ // figure out which Offer to reply to for each worker
+ val offerableIndices = new ArrayBuffer[Int]
+ val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+ def enoughMemory(o: Offer) = {
+ val mem = getResource(o.getResourcesList, "mem")
+ val slaveId = o.getSlaveId.getValue
+ mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ }
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices += index
- offerableWorkers += new WorkerOffer(
- offer.getSlaveId.getValue,
- offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
- }
+ for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+ offerableIndices += index
+ offerableWorkers += new WorkerOffer(
+ offer.getSlaveId.getValue,
+ offer.getHostname,
+ getResource(offer.getResourcesList, "cpus").toInt)
+ }
- // Call into the ClusterScheduler
- val taskLists = scheduler.resourceOffers(offerableWorkers)
-
- // Build a list of Mesos tasks for each slave
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
- for ((taskList, index) <- taskLists.zipWithIndex) {
- if (!taskList.isEmpty) {
- val offerNum = offerableIndices(index)
- val slaveId = offers(offerNum).getSlaveId.getValue
- slaveIdsWithExecutors += slaveId
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
- for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ // Call into the ClusterScheduler
+ val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+ // Build a list of Mesos tasks for each slave
+ val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+ for ((taskList, index) <- taskLists.zipWithIndex) {
+ if (!taskList.isEmpty) {
+ val offerNum = offerableIndices(index)
+ val slaveId = offers(offerNum).getSlaveId.getValue
+ slaveIdsWithExecutors += slaveId
+ mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+ for (taskDesc <- taskList) {
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ }
}
}
- }
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ // Reply to the offers
+ val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+ for (i <- 0 until offers.size) {
+ d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ }
}
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
}
@@ -224,23 +248,33 @@ private[spark] class MesosSchedulerBackend(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
- }
- if (isFinished(status.getState)) {
- taskIdToSlaveId.remove(tid)
+ val oldClassLoader = setClassLoader()
+ try {
+ val tid = status.getTaskId.getValue.toLong
+ val state = TaskState.fromMesos(status.getState)
+ synchronized {
+ if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+ // We lost the executor on this slave, so remember that it's gone
+ slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ }
+ if (isFinished(status.getState)) {
+ taskIdToSlaveId.remove(tid)
+ }
}
+ scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
}
override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
+ val oldClassLoader = setClassLoader()
+ try {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
}
override def stop() {
@@ -256,11 +290,16 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ slaveIdsWithExecutors -= slaveId.getValue
+ }
+ scheduler.executorLost(slaveId.getValue, reason)
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
- scheduler.executorLost(slaveId.getValue, reason)
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index e41ae385c0..aa3ee5f5ee 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -240,7 +240,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK = ints.top(5)
assert(topK.size === 5)
- assert(topK.sorted === nums.sorted.takeRight(5))
+ assert(topK === nums.reverse.take(5))
}
test("top with custom ordering") {
@@ -252,6 +252,24 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(topK.sorted === Array("b", "a"))
}
+ test("takeOrdered with predefined ordering") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedLowerK = rdd.takeOrdered(5)
+ assert(sortedLowerK.size === 5)
+ assert(sortedLowerK === Array(1, 2, 3, 4, 5))
+ }
+
+ test("takeOrdered with custom ordering") {
+ val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ implicit val ord = implicitly[Ordering[Int]].reverse
+ val rdd = sc.makeRDD(nums, 2)
+ val sortedTopK = rdd.takeOrdered(5)
+ assert(sortedTopK.size === 5)
+ assert(sortedTopK === Array(10, 9, 8, 7, 6))
+ assert(sortedTopK === nums.sorted(ord).take(5))
+ }
+
test("takeSample") {
val data = sc.parallelize(1 to 100, 2)
for (seed <- 1 to 5) {