diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala | 11 |
1 files changed, 5 insertions, 6 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index a191321d91..60f228b8ad 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -28,16 +28,15 @@ object PageViewStream { // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + val pageCounts = pageViews.map(view => view.url).countByValue() // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => ((view.url, 1))) - .window(Seconds(10), Seconds(2)) - .countByKey() + val slidingPageCounts = pageViews.map(view => view.url) + .countByValueAndWindow(Seconds(10), Seconds(2)) // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds |