aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhaitao.yao <yao.erix@gmail.com>2013-01-31 10:08:32 +0800
committerhaitao.yao <yao.erix@gmail.com>2013-01-31 10:08:32 +0800
commitbabbb521d5f5a2399e9b05b333fb4e9e594a7fb6 (patch)
tree7498f561941f98dbdc3c612cb74dffc80026983f
parent764d78d72fb465903165c0b3d5150452faa7879e (diff)
parent55327a283e962652a126d3f8ac7e9a19c76f1f19 (diff)
downloadspark-babbb521d5f5a2399e9b05b333fb4e9e594a7fb6.tar.gz
spark-babbb521d5f5a2399e9b05b333fb4e9e594a7fb6.tar.bz2
spark-babbb521d5f5a2399e9b05b333fb4e9e594a7fb6.zip
Merge branch 'mesos'
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala6
-rw-r--r--core/src/main/scala/spark/network/Connection.scala15
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala30
-rw-r--r--docs/python-programming-guide.md11
-rw-r--r--python/pyspark/shell.py1
7 files changed, 47 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 5a83a42daf..8b41620d98 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -134,7 +134,9 @@ private[spark] class Worker(
val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
- logInfo("Executor " + fullId + " finished with state " + state)
+ logInfo("Executor " + fullId + " finished with state " + state +
+ message.map(" message " + _).getOrElse("") +
+ exitStatus.map(" exitStatus " + _).getOrElse(""))
finishedExecutors(fullId) = executor
executors -= fullId
coresUsed -= executor.cores
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 1ef88075ad..818d6d1dda 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -32,7 +32,11 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
- executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
+ executor.initialize(
+ executorInfo.getExecutorId.getValue,
+ slaveInfo.getHostname,
+ properties
+ )
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index c193bf7c8d..cd5b7d57f3 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -12,7 +12,14 @@ import java.net._
private[spark]
-abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
+abstract class Connection(val channel: SocketChannel, val selector: Selector,
+ val remoteConnectionManagerId: ConnectionManagerId) extends Logging {
+ def this(channel_ : SocketChannel, selector_ : Selector) = {
+ this(channel_, selector_,
+ ConnectionManagerId.fromSocketAddress(
+ channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
+ ))
+ }
channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true)
@@ -25,7 +32,6 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
val remoteAddress = getRemoteAddress()
- val remoteConnectionManagerId = ConnectionManagerId.fromSocketAddress(remoteAddress)
def key() = channel.keyFor(selector)
@@ -103,8 +109,9 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector) ex
}
-private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector)
-extends Connection(SocketChannel.open, selector_) {
+private[spark] class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
+ remoteId_ : ConnectionManagerId)
+extends Connection(SocketChannel.open, selector_, remoteId_) {
class Outbox(fair: Int = 0) {
val messages = new Queue[Message]()
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 2ecd14f536..c7f226044d 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -299,7 +299,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
def startNewConnection(): SendingConnection = {
val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId, new SendingConnection(inetSocketAddress, selector))
+ val newConnection = connectionRequests.getOrElseUpdate(connectionManagerId,
+ new SendingConnection(inetSocketAddress, selector, connectionManagerId))
newConnection
}
val lookupKey = ConnectionManagerId.fromSocketAddress(connectionManagerId.toSocketAddress)
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index f3467db86b..eab1c60e0b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -51,7 +51,7 @@ private[spark] class MesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Long, String]
// An ExecutorInfo for our tasks
- var executorInfo: ExecutorInfo = null
+ var execArgs: Array[Byte] = null
override def start() {
synchronized {
@@ -70,12 +70,11 @@ private[spark] class MesosSchedulerBackend(
}
}.start()
- executorInfo = createExecutorInfo()
waitForRegister()
}
}
- def createExecutorInfo(): ExecutorInfo = {
+ def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
@@ -97,7 +96,7 @@ private[spark] class MesosSchedulerBackend(
.setEnvironment(environment)
.build()
ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue("default").build())
+ .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(memory)
@@ -109,17 +108,20 @@ private[spark] class MesosSchedulerBackend(
* containing all the spark.* system properties in the form of (String, String) pairs.
*/
private def createExecArg(): Array[Byte] = {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
+ if (execArgs == null) {
+ val props = new HashMap[String, String]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.")) {
+ props(key) = value
+ }
}
+ // Serialize the map as an array of (String, String) pairs
+ execArgs = Utils.serialize(props.toArray)
}
- // Serialize the map as an array of (String, String) pairs
- return Utils.serialize(props.toArray)
+ return execArgs
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
@@ -216,7 +218,7 @@ private[spark] class MesosSchedulerBackend(
return MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(executorInfo)
+ .setExecutor(createExecutorInfo(slaveId))
.setName(task.name)
.addResources(cpuResource)
.setData(ByteString.copyFrom(task.serializedTask))
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index a840b9b34b..4e84d23edf 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -67,13 +67,20 @@ The script automatically adds the `pyspark` package to the `PYTHONPATH`.
# Interactive Use
-The `pyspark` script launches a Python interpreter that is configured to run PySpark jobs.
-When run without any input files, `pyspark` launches a shell that can be used explore data interactively, which is a simple way to learn the API:
+The `pyspark` script launches a Python interpreter that is configured to run PySpark jobs. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options:
+
+{% highlight bash %}
+$ sbt/sbt package
+$ ./pyspark
+{% endhighlight %}
+
+The Python shell can be used explore data interactively and is a simple way to learn the API:
{% highlight python %}
>>> words = sc.textFile("/usr/share/dict/words")
>>> words.filter(lambda w: w.startswith("spar")).take(5)
[u'spar', u'sparable', u'sparada', u'sparadrap', u'sparagrass']
+>>> help(pyspark) # Show all pyspark functions
{% endhighlight %}
By default, the `pyspark` shell creates SparkContext that runs jobs locally.
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index f6328c561f..54ff1bf8e7 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -4,6 +4,7 @@ An interactive shell.
This file is designed to be launched as a PYTHONSTARTUP script.
"""
import os
+import pyspark
from pyspark.context import SparkContext