aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-09-06 05:29:06 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-09-06 05:29:06 -0700
commit203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8 (patch)
tree8cfa71cb4c7ed817c1a55ef231eb2fa0fe0b3a52 /streaming
parentbabb7e3ce2a5eda793f87b42839cc20d14cb94cf (diff)
parent019de4562c3c68ac36e6ab6a5577f5369336046b (diff)
downloadspark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.tar.gz
spark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.tar.bz2
spark-203ac8fa8bbda9fe477a2ac17b4ec7ce94d48fc8.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/JobManager.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala8
2 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala
index 9bf9251519..230d806a89 100644
--- a/streaming/src/main/scala/spark/streaming/JobManager.scala
+++ b/streaming/src/main/scala/spark/streaming/JobManager.scala
@@ -12,7 +12,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
SparkEnv.set(ssc.env)
try {
val timeTaken = job.run()
- logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
+ println("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index aa542ba07d..c22949d7b9 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions {
object WordCount2 {
def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
+ (0 until 3).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
.map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
+ .reduceByKey(_ + _, 100)
.count()
}
}
@@ -88,7 +88,7 @@ object WordCount2 {
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
+ println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
println("Data count: " + data.count())
println("Data count: " + data.count())