aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-14 12:21:47 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-14 12:21:47 -0800
commit2eacf22401f75b956036fb0c32eb38baa16b224e (patch)
tree263431c21bc6298c2aa902178915de7615b9ad84 /examples
parent03e8dc6861936a0862fba1ca9f830d5ff507718f (diff)
downloadspark-2eacf22401f75b956036fb0c32eb38baa16b224e.tar.gz
spark-2eacf22401f75b956036fb0c32eb38baa16b224e.tar.bz2
spark-2eacf22401f75b956036fb0c32eb38baa16b224e.zip
Removed countByKeyAndWindow on paired DStreams, and added countByValueAndWindow for all DStreams. Updated both scala and java API and testsuites.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala11
1 files changed, 5 insertions, 6 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a191321d91..60f228b8ad 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -28,16 +28,15 @@ object PageViewStream {
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
// Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+ val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds