From 4ea032a142ab7fb44f92b145cc8d850164419ab5 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 5 Sep 2012 05:53:07 +0000 Subject: Some changes to make important log output visible even if we set the logging to WARNING --- streaming/src/main/scala/spark/streaming/JobManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 9bf9251519..230d806a89 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( + println("Total delay: %.5f s for job %s (execution: %.5f s)".format( (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => -- cgit v1.2.3 From b7ad291ac52896af6cb1d882392f3d6fa0cf3b49 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 5 Sep 2012 07:08:07 +0000 Subject: Tuning Akka for more connections --- core/src/main/scala/spark/util/AkkaUtils.scala | 1 + .../src/main/scala/spark/streaming/examples/WordCount2.scala | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) (limited to 'streaming') diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 57d212e4ca..fd64e224d7 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -31,6 +31,7 @@ object AkkaUtils { akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d akka.remote.netty.connection-timeout = 1s + akka.remote.netty.execution-pool-size = 10 """.format(host, port)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index aa542ba07d..8561e7f079 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions { object WordCount2 { def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) + (0 until 3).foreach {i => + sc.parallelize(1 to 20000000, 500) .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) + .reduceByKey(_ + _, 100) .count() } } @@ -84,11 +84,11 @@ object WordCount2 { val ssc = new StreamingContext(master, "WordCount2") ssc.setBatchDuration(batchDuration) - //warmup(ssc.sc) + warmup(ssc.sc) val data = ssc.sc.textFile(file, mapTasks.toInt).persist( new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) + println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count()) println("Data count: " + data.count()) println("Data count: " + data.count()) -- cgit v1.2.3 From 019de4562c3c68ac36e6ab6a5577f5369336046b Mon Sep 17 00:00:00 2001 From: root Date: Thu, 6 Sep 2012 02:50:41 +0000 Subject: Less warmup in word count --- streaming/src/main/scala/spark/streaming/examples/WordCount2.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 8561e7f079..c22949d7b9 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -84,7 +84,7 @@ object WordCount2 { val ssc = new StreamingContext(master, "WordCount2") ssc.setBatchDuration(batchDuration) - warmup(ssc.sc) + //warmup(ssc.sc) val data = ssc.sc.textFile(file, mapTasks.toInt).persist( new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas -- cgit v1.2.3