aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliguoqiang <liguoqiang@rd.tuan800.com>2014-01-03 15:34:24 +0800
committerliguoqiang <liguoqiang@rd.tuan800.com>2014-01-03 15:34:24 +0800
commitb27b75f1c595139bdcebbadb43e89b0a7eadf2b5 (patch)
treeb94e48423721ddf274ac035668d4b21ff32e2fde
parent010e72c079274cab7c86cbde3bc7fa5c447e2072 (diff)
downloadspark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.tar.gz
spark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.tar.bz2
spark-b27b75f1c595139bdcebbadb43e89b0a7eadf2b5.zip
Modify spark on yarn to create SparkConf process
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala20
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala10
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala21
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala22
7 files changed, 65 insertions, 48 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 609e4e47e3..69ae14ce83 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -42,9 +42,11 @@ import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+ sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
@@ -115,7 +117,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
+ .getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
@@ -137,11 +139,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
}
- private def startUserClass(): Thread = {
+ private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
args.userClass,
- false /* initialize */,
+ false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
@@ -257,7 +259,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
}
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -316,7 +318,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
logInfo("finishApplicationMaster with " + status)
// Set tracking URL to empty since we don't have a history server.
- amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
/**
@@ -351,6 +353,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
+
}
object ApplicationMaster {
@@ -401,6 +404,7 @@ object ApplicationMaster {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
+
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
@@ -409,7 +413,7 @@ object ApplicationMaster {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
- } )
+ })
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 952171cd0a..440ad5cde5 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -50,9 +50,11 @@ import org.apache.spark.deploy.SparkHadoopUtil
* Client submits an application to the global ResourceManager to launch Spark's ApplicationMaster,
* which will launch a Spark master process and negotiate resources throughout its duration.
*/
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+ extends YarnClientImpl with Logging {
- def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ClientArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ClientArguments) = this(args, new SparkConf())
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
+ logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
@@ -347,7 +349,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
val prefix = " --args "
val args = clientArgs.userArgs
val retval = new StringBuilder()
- for (arg <- args){
+ for (arg <- args) {
retval.append(prefix).append(" '").append(arg).append("' ")
}
retval.toString
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 09ac8d77ca..e4c6ab212c 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -35,9 +35,11 @@ import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+ extends Logging {
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
@@ -50,7 +52,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private var amClient: AMRMClient[ContainerRequest] = _
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
var actor: ActorRef = _
@@ -93,7 +95,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
@@ -139,8 +141,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
Thread.sleep(100)
}
}
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -169,7 +171,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numWorkers)
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
yarnAllocator.allocateResources()
Thread.sleep(100)
}
@@ -180,7 +182,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -212,7 +214,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
def finishApplicationMaster(status: FinalApplicationStatus) {
logInfo("finish ApplicationMaster with " + status)
- amClient.unregisterApplicationMaster(status, "" /* appMessage */, "" /* appTrackingUrl */)
+ amClient.unregisterApplicationMaster(status, "" /* appMessage */ , "" /* appTrackingUrl */)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 69170c7427..2bb11e54c5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.util.Utils
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
+ sparkConf: SparkConf) extends Logging {
- def this(args: ApplicationMasterArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
@@ -126,7 +128,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
- .getOrElse(""))
+ .getOrElse(""))
if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
@@ -165,11 +167,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
resourceManager.registerApplicationMaster(appMasterRequest)
}
- private def startUserClass(): Thread = {
+ private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(
args.userClass,
- false /* initialize */,
+ false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
@@ -231,7 +233,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
yarnConf,
resourceManager,
appAttemptId,
- args,
+ args,
sparkContext.getConf)
}
}
@@ -286,7 +288,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
}
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {
@@ -385,6 +387,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, s
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}
+
}
object ApplicationMaster {
@@ -394,6 +397,7 @@ object ApplicationMaster {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
+
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
@@ -432,6 +436,7 @@ object ApplicationMaster {
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
+
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
@@ -440,7 +445,7 @@ object ApplicationMaster {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
- } )
+ })
}
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 525ea72762..6abb4d5017 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -45,9 +45,11 @@ import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
-class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) extends YarnClientImpl with Logging {
+class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
+ extends YarnClientImpl with Logging {
- def this(args: ClientArguments,sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ClientArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ClientArguments) = this(args, new SparkConf())
@@ -123,7 +125,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
clusterMetrics.getNumNodeManagers)
val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
+ logInfo( """Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s,
queueApplicationCount = %s, queueChildQueueCount = %s""".format(
queueInfo.getQueueName,
queueInfo.getCurrentCapacity,
@@ -143,7 +145,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
- logError("AM size is to large to run on this cluster " + amMem)
+ logError("AM size is to large to run on this cluster " + amMem)
System.exit(1)
}
@@ -328,7 +330,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) e
val prefix = " --args "
val args = clientArgs.userArgs
val retval = new StringBuilder()
- for (arg <- args){
+ for (arg <- args) {
retval.append(prefix).append(" '").append(arg).append("' ")
}
retval.toString
@@ -467,10 +469,10 @@ object Client {
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
- val sparkConf = new SparkConf
- val args = new ClientArguments(argStrings,sparkConf)
+ val sparkConf = new SparkConf
+ val args = new ClientArguments(argStrings, sparkConf)
- new Client(args,sparkConf).run
+ new Client(args, sparkConf).run
}
// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 09303ae5c2..8254d628fb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -24,7 +24,7 @@ import collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo}
// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String],val sparkConf: SparkConf) {
+class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
var addJars: String = null
var files: String = null
var archives: String = null
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 1a792ddf66..300e78612e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -34,9 +34,11 @@ import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.SplitInfo
-class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging {
+class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
+ extends Logging {
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
@@ -47,9 +49,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private var yarnAllocator: YarnAllocationHandler = null
- private var driverClosed:Boolean = false
+ private var driverClosed: Boolean = false
- val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
+ val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf)._1
var actor: ActorRef = null
@@ -83,7 +85,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
if (minimumMemory > 0) {
val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
+ val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
if (numCore > 0) {
// do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
@@ -104,7 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+ val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval / 10, 60000L))
reporterThread = launchReporterThread(interval)
// Wait for the reporter thread to Finish.
@@ -165,8 +167,8 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
Thread.sleep(100)
}
}
- sparkConf.set("spark.driver.host", driverHost)
- sparkConf.set("spark.driver.port", driverPort.toString)
+ sparkConf.set("spark.driver.host", driverHost)
+ sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
@@ -188,7 +190,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers) {
+ while (yarnAllocator.getNumWorkersRunning < args.numWorkers) {
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
Thread.sleep(100)
}
@@ -199,7 +201,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar
// TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
+ val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime
val t = new Thread {
override def run() {