aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test/scala/org
diff options
context:
space:
mode:
authorunknown <ulanov@ULANOV3.americas.hpqcorp.net>2015-11-10 14:25:06 -0800
committerXiangrui Meng <meng@databricks.com>2015-11-10 14:25:06 -0800
commitdba1a62cf1baa9ae1ee665d592e01dfad78331a2 (patch)
treea8a89a2d9c39f901454460859b550aa88369318c /mllib/src/test/scala/org
parent18350a57004eb87cafa9504ff73affab4b818e06 (diff)
downloadspark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.tar.gz
spark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.tar.bz2
spark-dba1a62cf1baa9ae1ee665d592e01dfad78331a2.zip
[SPARK-7316][MLLIB] RDD sliding window with step
Implementation of step capability for sliding window function in MLlib's RDD. Though one can use current sliding window with step 1 and then filter every Nth window, it will take more time and space (N*data.count times more than needed). For example, below are the results for various windows and steps on 10M data points: Window | Step | Time | Windows produced ------------ | ------------- | ---------- | ---------- 128 | 1 | 6.38 | 9999873 128 | 10 | 0.9 | 999988 128 | 100 | 0.41 | 99999 1024 | 1 | 44.67 | 9998977 1024 | 10 | 4.74 | 999898 1024 | 100 | 0.78 | 99990 ``` import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(1 to 10000000, 10) rdd.count val window = 1024 val step = 1 val t = System.nanoTime(); val windows = rdd.sliding(window, step); println(windows.count); println((System.nanoTime() - t) / 1e9) ``` Author: unknown <ulanov@ULANOV3.americas.hpqcorp.net> Author: Alexander Ulanov <nashb@yandex.ru> Author: Xiangrui Meng <meng@databricks.com> Closes #5855 from avulanov/SPARK-7316-sliding.
Diffstat (limited to 'mllib/src/test/scala/org')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala11
1 files changed, 7 insertions, 4 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index bc64172614..ac93733bab 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -28,9 +28,12 @@ class RDDFunctionsSuite extends SparkFunSuite with MLlibTestSparkContext {
for (numPartitions <- 1 to 8) {
val rdd = sc.parallelize(data, numPartitions)
for (windowSize <- 1 to 6) {
- val sliding = rdd.sliding(windowSize).collect().map(_.toList).toList
- val expected = data.sliding(windowSize).map(_.toList).toList
- assert(sliding === expected)
+ for (step <- 1 to 3) {
+ val sliding = rdd.sliding(windowSize, step).collect().map(_.toList).toList
+ val expected = data.sliding(windowSize, step)
+ .map(_.toList).toList.filter(l => l.size == windowSize)
+ assert(sliding === expected)
+ }
}
assert(rdd.sliding(7).collect().isEmpty,
"Should return an empty RDD if the window size is greater than the number of items.")
@@ -40,7 +43,7 @@ class RDDFunctionsSuite extends SparkFunSuite with MLlibTestSparkContext {
test("sliding with empty partitions") {
val data = Seq(Seq(1, 2, 3), Seq.empty[Int], Seq(4), Seq.empty[Int], Seq(5, 6, 7))
val rdd = sc.parallelize(data, data.length).flatMap(s => s)
- assert(rdd.partitions.size === data.length)
+ assert(rdd.partitions.length === data.length)
val sliding = rdd.sliding(3).collect().toSeq.map(_.toSeq)
val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq)
assert(sliding === expected)