aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshane-huang <shengsheng.huang@intel.com>2013-01-10 20:09:34 +0800
committershane-huang <shengsheng.huang@intel.com>2013-01-10 20:09:55 +0800
commit9930a95d217045c4c22c2575080a03e4b0fd2426 (patch)
treea8bf6cb2152a5db061ea58821c05a041159b84c5
parente4cb72da8a5428c6b9097e92ddbdf4ceee087b85 (diff)
downloadspark-9930a95d217045c4c22c2575080a03e4b0fd2426.tar.gz
spark-9930a95d217045c4c22c2575080a03e4b0fd2426.tar.bz2
spark-9930a95d217045c4c22c2575080a03e4b0fd2426.zip
Modified Patch according to comments
-rw-r--r--core/src/main/scala/spark/network/Connection.scala8
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala9
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala20
3 files changed, 22 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
index 95096fd0ba..c193bf7c8d 100644
--- a/core/src/main/scala/spark/network/Connection.scala
+++ b/core/src/main/scala/spark/network/Connection.scala
@@ -136,10 +136,10 @@ extends Connection(SocketChannel.open, selector_) {
if (chunk.isDefined) {
messages += message // this is probably incorrect, it wont work as fifo
if (!message.started) {
- logDebug("Starting to send [" + message + "]")
- message.started = true
- message.startTime = System.currentTimeMillis
- }
+ logDebug("Starting to send [" + message + "]")
+ message.started = true
+ message.startTime = System.currentTimeMillis
+ }
return chunk
} else {
/*logInfo("Finished sending [" + message + "] to [" + remoteConnectionManagerId + "]")*/
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index e7bd2d3bbd..36c01ad629 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -43,12 +43,12 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
}
val selector = SelectorProvider.provider.openSelector()
- val handleMessageExecutor = Executors.newFixedThreadPool(20)
+ val handleMessageExecutor = Executors.newFixedThreadPool(System.getProperty("spark.core.connection.handler.threads","20").toInt)
val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val messageStatuses = new HashMap[Int, MessageStatus]
- val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
+ val connectionRequests = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)]
@@ -78,9 +78,8 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
def run() {
try {
- while(!selectorThread.isInterrupted) {
+ while(!selectorThread.isInterrupted) {
for( (connectionManagerId, sendingConnection) <- connectionRequests) {
- //val sendingConnection = connectionRequests.dequeue
sendingConnection.connect()
addConnection(sendingConnection)
connectionRequests -= connectionManagerId
@@ -465,7 +464,7 @@ private[spark] object ConnectionManager {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
manager.sendMessageReliably(manager.id, bufferMessage)
}).foreach(f => {
- val g = Await.result(f, 10 second)
+ val g = Await.result(f, 1 second)
if (!g.isDefined) println("Failed")
})
val finishTime = System.currentTimeMillis
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 0e79c518e0..533e4610f3 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -13,8 +13,14 @@ import akka.util.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
- if (args.length < 5) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> <num of tasks> <size of msg> <count>")
+ //<mesos cluster> - the master URL
+ //<slaves file> - a list slaves to run connectionTest on
+ //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
+ //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
+ //[count] - how many times to run, default is 3
+ //[await time in seconds] : await time (in seconds), default is 600
+ if (args.length < 2) {
+ println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
System.exit(1)
}
@@ -29,14 +35,17 @@ private[spark] object ConnectionManagerTest extends Logging{
/*println("Slaves")*/
/*slaves.foreach(println)*/
- val tasknum = args(2).toInt
+ val tasknum = if (args.length > 2) args(2).toInt else slaves.length
+ val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
+ val count = if (args.length > 4) args(4).toInt else 3
+ val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
+ println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
i => SparkEnv.get.connectionManager.id).collect()
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
- val count = args(4).toInt
(0 until count).foreach(i => {
val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
val connManager = SparkEnv.get.connectionManager
@@ -46,7 +55,6 @@ private[spark] object ConnectionManagerTest extends Logging{
None
})
- val size = (args(3).toInt) * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
@@ -56,7 +64,7 @@ private[spark] object ConnectionManagerTest extends Logging{
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
- val results = futures.map(f => Await.result(f, 999.second))
+ val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)