aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-07-16 11:01:53 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-07-16 11:01:53 -0700
commitb1f9f64743400651cd74fe5f7f04f5891ba6d1c5 (patch)
treec2be984d426f0d820915bb2ab8cca7fe06fc73df
parent5c388808a81ebfb1fc23511882c18f9ae76ec509 (diff)
parent8a8a8f2de225310410ebe4e5d9c1e9b1ee8ffcf2 (diff)
downloadspark-b1f9f64743400651cd74fe5f7f04f5891ba6d1c5.tar.gz
spark-b1f9f64743400651cd74fe5f7f04f5891ba6d1c5.tar.bz2
spark-b1f9f64743400651cd74fe5f7f04f5891ba6d1c5.zip
Merge branch 'master' of github.com:mesos/spark
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala58
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala9
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala2
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala6
-rw-r--r--docs/configuration.md12
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala27
8 files changed, 93 insertions, 29 deletions
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index f19648ec68..6a0617cc06 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
+ private var isFinished:Boolean = false
def run() {
@@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait for the user class to Finish
userThread.join()
-
- // Finish the ApplicationMaster
- finishApplicationMaster()
- // TODO: Exit based on success/failure
+
System.exit(0)
}
@@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
}
-
+
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
- mainMethod.invoke(null, mainArgs)
+ var successed = false
+ try {
+ // Copy
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size())
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ mainMethod.invoke(null, mainArgs)
+ // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
+ // userThread will stop here unless it has uncaught exception thrown out
+ // It need shutdown hook to set SUCCEEDED
+ successed = true
+ } finally {
+ if (successed) {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ } else {
+ ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
+ }
+ }
}
}
t.start()
@@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
logInfo("All workers have launched.")
// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- if (userThread.isAlive){
+ if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
@@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val t = new Thread {
override def run() {
- while (userThread.isAlive){
+ while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
*/
-
- def finishApplicationMaster() {
+
+ def finishApplicationMaster(status: FinalApplicationStatus) {
+
+ synchronized {
+ if (isFinished) {
+ return
+ }
+ isFinished = true
+ }
+
+ logInfo("finishApplicationMaster with " + status)
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
- // TODO: Check if the application has failed or succeeded
- finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED)
+ finishReq.setFinishApplicationStatus(status)
resourceManager.finishApplicationMaster(finishReq)
+
}
}
@@ -256,7 +276,7 @@ object ApplicationMaster {
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT){
+ if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.synchronized {
// to wake threads off wait ...
yarnAllocatorLoop.notifyAll()
@@ -291,14 +311,16 @@ object ApplicationMaster {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
- for (master <- applicationMasters) master.finishApplicationMaster
+ for (master <- applicationMasters) {
+ master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
+ }
}
} )
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated
yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){
+ while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
}
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index d723ab7b1e..c7dbcc6fbc 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -210,6 +210,10 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
+
+ // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
+ kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
+
kryo
}
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index 7545ecf868..5e3c5e064f 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -103,7 +103,9 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
</td>
- <td>{app.desc.name}</td>
+ <td>
+ <a href={app.appUiUrl}>{app.desc.name}</a>
+ </td>
<td>
{app.coresGranted}
</td>
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 30a648f50b..64ed91f5a0 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -257,6 +257,15 @@ class DAGScheduler(
if (partitions.size == 0) {
return
}
+
+ // Check to make sure we are not launching a task on a partition that does not exist.
+ val maxPartitions = finalRdd.partitions.length
+ partitions.find(p => p >= maxPartitions).foreach { p =>
+ throw new IllegalArgumentException(
+ "Attempting to access a non-existent partition: " + p + ". " +
+ "Total number of partitions: " + maxPartitions)
+ }
+
val (toSubmit, waiter) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index 36d9c47245..fa46e2487d 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -31,7 +31,7 @@ private[spark] object UIUtils {
<link rel="stylesheet" href="/static/webui.css" type="text/css" />
<link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" />
<script src="/static/sorttable.js"></script>
- <title>{title}</title>
+ <title>{sc.appName} - {title}</title>
<style type="text/css">
table.sortable thead {{ cursor: pointer; }}
</style>
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index aa3ee5f5ee..7f7d4c8211 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
}
}
+
+ test("runJob on an invalid partition") {
+ intercept[IllegalArgumentException] {
+ sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
+ }
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 5a80510959..5c06897cae 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -198,8 +198,18 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.kryo.referenceTracking</td>
+ <td>true</td>
+ <td>
+ Whether to track references to the same object when serializing data with Kryo, which is
+ necessary if your object graphs have loops and useful for efficiency if they contain multiple
+ copies of the same object. Can be disabled to improve performance if you know this is not the
+ case.
+ </td>
+</tr>
+<tr>
<td>spark.kryoserializer.buffer.mb</td>
- <td>32</td>
+ <td>2</td>
<td>
Maximum object size to allow within Kryo (the library needs to create a buffer at least as
large as the largest single object you'll serialize). Increase this if you get a "buffer limit
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 21eb21276e..4c18cbdc6b 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -6,8 +6,10 @@ import scala.util.Sorting
import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
import spark.storage.StorageLevel
+import spark.KryoRegistrator
import spark.SparkContext._
+import com.esotericsoftware.kryo.Kryo
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
@@ -91,15 +93,15 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
*/
def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
val numBlocks = if (this.numBlocks == -1) {
- math.max(ratings.context.defaultParallelism, ratings.partitions.size)
+ math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2)
} else {
this.numBlocks
}
val partitioner = new HashPartitioner(numBlocks)
- val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, (u, p, r)) }
- val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, (p, u, r)) }
+ val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, Rating(u, p, r)) }
+ val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, Rating(p, u, r)) }
val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock)
val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock)
@@ -179,12 +181,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid
* having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it.
*/
- private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, (Int, Int, Double))])
+ private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)])
: (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) =
{
val grouped = ratings.partitionBy(new HashPartitioner(numBlocks))
val links = grouped.mapPartitionsWithIndex((blockId, elements) => {
- val ratings = elements.map{case (k, t) => Rating(t._1, t._2, t._3)}.toArray
+ val ratings = elements.map{_._2}.toArray
val inLinkBlock = makeInLinkBlock(numBlocks, ratings)
val outLinkBlock = makeOutLinkBlock(numBlocks, ratings)
Iterator.single((blockId, (inLinkBlock, outLinkBlock)))
@@ -383,21 +385,30 @@ object ALS {
train(ratings, rank, iterations, 0.01, -1)
}
+ private class ALSRegistrator extends KryoRegistrator {
+ override def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Rating])
+ }
+ }
+
def main(args: Array[String]) {
- if (args.length != 5) {
- println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir>")
+ if (args.length != 5 && args.length != 6) {
+ println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]")
System.exit(1)
}
val (master, ratingsFile, rank, iters, outputDir) =
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
+ val blocks = if (args.length == 6) args(5).toInt else -1
System.setProperty("spark.serializer", "spark.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
+ System.setProperty("spark.kryo.referenceTracking", "false")
System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
val fields = line.split(',')
(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}
- val model = ALS.train(ratings, rank, iters)
+ val model = ALS.train(ratings, rank, iters, 0.01, blocks)
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
.saveAsTextFile(outputDir + "/userFeatures")
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }