diff options
author | unknown <ulanov@ULANOV3.americas.hpqcorp.net> | 2015-11-10 14:25:06 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-11-10 14:25:06 -0800 |
commit | dba1a62cf1baa9ae1ee665d592e01dfad78331a2 (patch) | |
tree | a8a89a2d9c39f901454460859b550aa88369318c /mllib/src/test/scala/org | |
parent | 18350a57004eb87cafa9504ff73affab4b818e06 (diff) | |
download | spark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.tar.gz spark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.tar.bz2 spark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.zip |
[SPARK-7316][MLLIB] RDD sliding window with step
Implementation of step capability for sliding window function in MLlib's RDD.
Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points:
Window | Step | Time | Windows produced
------------ | ------------- | ---------- | ----------
128 | 1 | 6.38 | 9999873
128 | 10 | 0.9 | 999988
128 | 100 | 0.41 | 99999
1024 | 1 | 44.67 | 9998977
1024 | 10 | 4.74 | 999898
1024 | 100 | 0.78 | 99990
```
import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd = sc.parallelize(1 to 10000000, 10)
rdd.count
val window = 1024
val step = 1
val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9)
```
Author: unknown <ulanov@ULANOV3.americas.hpqcorp.net>
Author: Alexander Ulanov <nashb@yandex.ru>
Author: Xiangrui Meng <meng@databricks.com>
Closes #5855 from avulanov/SPARK-7316-sliding.
Diffstat (limited to 'mllib/src/test/scala/org')
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala index bc64172614..ac93733bab 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala @@ -28,9 +28,12 @@ class RDDFunctionsSuite extends SparkFunSuite with MLlibTestSparkContext { for (numPartitions <- 1 to 8) { val rdd = sc.parallelize(data, numPartitions) for (windowSize <- 1 to 6) { - val sliding = rdd.sliding(windowSize).collect().map(_.toList).toList - val expected = data.sliding(windowSize).map(_.toList).toList - assert(sliding === expected) + for (step <- 1 to 3) { + val sliding = rdd.sliding(windowSize, step).collect().map(_.toList).toList + val expected = data.sliding(windowSize, step) + .map(_.toList).toList.filter(l => l.size == windowSize) + assert(sliding === expected) + } } assert(rdd.sliding(7).collect().isEmpty, "Should return an empty RDD if the window size is greater than the number of items.") @@ -40,7 +43,7 @@ class RDDFunctionsSuite extends SparkFunSuite with MLlibTestSparkContext { test("sliding with empty partitions") { val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], Seq(5, 6, 7)) val rdd = sc.parallelize(data, data.length).flatMap(s => s) - assert(rdd.partitions.size === data.length) + assert(rdd.partitions.length === data.length) val sliding = rdd.sliding(3).collect().toSeq.map(_.toSeq) val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq) assert(sliding === expected) |