diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 12:21:47 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 12:21:47 -0800 |
commit | 2eacf22401f75b956036fb0c32eb38baa16b224e (patch) | |
tree | 263431c21bc6298c2aa902178915de7615b9ad84 /examples | |
parent | 03e8dc6861936a0862fba1ca9f830d5ff507718f (diff) | |
download | spark-2eacf22401f75b956036fb0c32eb38baa16b224e.tar.gz spark-2eacf22401f75b956036fb0c32eb38baa16b224e.tar.bz2 spark-2eacf22401f75b956036fb0c32eb38baa16b224e.zip |
Removed countByKeyAndWindow on paired DStreams, and added countByValueAndWindow for all DStreams. Updated both scala and java API and testsuites.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala | 11 |
1 files changed, 5 insertions, 6 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index a191321d91..60f228b8ad 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -28,16 +28,15 @@ object PageViewStream { // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + val pageCounts = pageViews.map(view => view.url).countByValue() // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => ((view.url, 1))) - .window(Seconds(10), Seconds(2)) - .countByKey() + val slidingPageCounts = pageViews.map(view => view.url) + .countByValueAndWindow(Seconds(10), Seconds(2)) // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds |