diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-09-06 05:29:06 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-09-06 05:29:06 -0700 |
commit | 203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8 (patch) | |
tree | 8cfa71cb4c7ed817c1a55ef231eb2fa0fe0b3a52 /streaming | |
parent | babb7e3ce2a5eda793f87b42839cc20d14cb94cf (diff) | |
parent | 019de4562c3c68ac36e6ab6a5577f5369336046b (diff) | |
download | spark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.tar.gz spark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.tar.bz2 spark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.zip |
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/JobManager.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount2.scala | 8 |
2 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index 9bf9251519..230d806a89 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { SparkEnv.set(ssc.env) try { val timeTaken = job.run() - logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( + println("Total delay: %.5f s for job %s (execution: %.5f s)".format( (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index aa542ba07d..c22949d7b9 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions { object WordCount2 { def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) + (0 until 3).foreach {i => + sc.parallelize(1 to 20000000, 500) .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) + .reduceByKey(_ + _, 100) .count() } } @@ -88,7 +88,7 @@ object WordCount2 { val data = ssc.sc.textFile(file, mapTasks.toInt).persist( new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) + println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count()) println("Data count: " + data.count()) println("Data count: " + data.count()) |