From 2a37235825cecd3f75286d11456c6e3cb13d4327 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 18 Oct 2013 00:07:49 -0700 Subject: Initial commit of adding histogram functionality to the DoubleRDDFunctions. --- .../org/apache/spark/rdd/DoubleRDDSuite.scala | 233 +++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala (limited to 'core/src/test/scala/org') diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala new file mode 100644 index 0000000000..2ec7173511 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.math.abs +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ +import org.apache.spark._ + +class DoubleRDDSuite extends FunSuite with SharedSparkContext { + // Verify tests on the histogram functionality. We test with both evenly + // and non-evenly spaced buckets as the bucket lookup function changes. + test("WorksOnEmpty") { + // Make sure that it works on an empty input + val rdd: RDD[Double] = sc.parallelize(Seq()) + val buckets: Array[Double] = Array(0.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(0) + assert(histogramResults === expectedHistogramResults) + } + test("WorksWithOutOfRangeWithOneBucket") { + // Verify that if all of the elements are out of range the counts are zero + val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val buckets: Array[Double] = Array(0.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(0) + assert(histogramResults === expectedHistogramResults) + } + test("WorksInRangeWithOneBucket") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val buckets: Array[Double] = Array(0.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4) + assert(histogramResults === expectedHistogramResults) + } + test("WorksInRangeWithOneBucketExactMatch") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val buckets: Array[Double] = Array(1.0, 4.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4) + assert(histogramResults === expectedHistogramResults) + } + test("WorksWithOutOfRangeWithTwoBuckets") { + // Verify that out of range works with two buckets + val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val buckets: Array[Double] = Array(0.0, 5.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(0,0) + assert(histogramResults === expectedHistogramResults) + } + test("WorksWithOutOfRangeWithTwoUnEvenBuckets") { + // Verify that out of range works with two un even buckets + val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01)) + val buckets: Array[Double] = Array(0.0, 4.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(0,0) + assert(histogramResults === expectedHistogramResults) + } + test("WorksInRangeWithTwoBuckets") { + // Make sure that it works with two equally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6)) + val buckets: Array[Double] = Array(0.0, 5.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(3,2) + assert(histogramResults === expectedHistogramResults) + } + test("WorksInRangeWithTwoBucketsAndNaN") { + // Make sure that it works with two equally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6,Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 10.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(3,2) + assert(histogramResults === expectedHistogramResults) + } + test("WorksInRangeWithTwoUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(3,2) + assert(histogramResults === expectedHistogramResults) + } + test("WorksMixedRangeWithTwoUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.0,11.01)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4,3) + assert(histogramResults === expectedHistogramResults) + } + test("WorksMixedRangeWithFourUnevenBuckets") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4,2,1,3) + assert(histogramResults === expectedHistogramResults) + } + test("WorksMixedRangeWithUnevenBucketsAndNaN") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4,2,1,3) + assert(histogramResults === expectedHistogramResults) + } + // Make sure this works with a NaN end bucket + test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4,2,1,2,3) + assert(histogramResults === expectedHistogramResults) + } + // Make sure this works with a NaN end bucket and an inifity + test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") { + // Make sure that it works with two unequally spaced buckets and elements in each + val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,1.0/0.0,-1.0/0.0,Double.NaN)) + val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(4,2,1,2,4) + assert(histogramResults === expectedHistogramResults) + } + test("WorksWithOutOfRangeWithInfiniteBuckets") { + // Verify that out of range works with two buckets + val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01,Double.NaN)) + val buckets: Array[Double] = Array(-1.0/0.0 ,0.0, 1.0/0.0) + val histogramResults: Array[Long] = rdd.histogram(buckets) + val expectedHistogramResults: Array[Long] = Array(1,1) + assert(histogramResults === expectedHistogramResults) + } + // Test the failure mode with an invalid bucket array + test("ThrowsExceptionOnInvalidBucketArray") { + val rdd: RDD[Double] = sc.parallelize(Seq(1.0)) + // Empty array + intercept[IllegalArgumentException]{ + val buckets: Array[Double] = Array.empty[Double] + val result = rdd.histogram(buckets) + } + // Single element array + intercept[IllegalArgumentException] + { + val buckets: Array[Double] = Array(1.0) + val result = rdd.histogram(buckets) + } + } + + // Test automatic histogram function + test("WorksWithoutBucketsBasic") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val (histogramBuckets, histogramResults) = rdd.histogram(1) + val expectedHistogramResults: Array[Long] = Array(4) + val expectedHistogramBuckets: Array[Double] = Array(1.0,4.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + test("WorksWithoutBucketsBasicTwo") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4)) + val (histogramBuckets, histogramResults) = rdd.histogram(2) + val expectedHistogramResults: Array[Long] = Array(2,2) + val expectedHistogramBuckets: Array[Double] = Array(1.0,2.5,4.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + test("WorksWithoutBucketsWithMoreRequestedThanElements") { + // Verify the basic case of one bucket and all elements in that bucket works + val rdd: RDD[Double] = sc.parallelize(Seq(1,2)) + val (histogramBuckets, histogramResults) = rdd.histogram(10) + val expectedHistogramResults: Array[Long] = + Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1) + val expectedHistogramBuckets: Array[Double] = + Array(1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0) + assert(histogramResults === expectedHistogramResults) + assert(histogramBuckets === expectedHistogramBuckets) + } + // Test the failure mode with an invalid RDD + test("ThrowsExceptionOnInvalidRDDs") { + // infinity + intercept[UnsupportedOperationException]{ + val rdd: RDD[Double] = sc.parallelize(Seq(1,1.0/0.0)) + val result = rdd.histogram(1) + } + // NaN + intercept[UnsupportedOperationException] + { + val rdd: RDD[Double] = sc.parallelize(Seq(1,Double.NaN)) + val result = rdd.histogram(1) + } + // Empty + intercept[UnsupportedOperationException] + { + val rdd: RDD[Double] = sc.parallelize(Seq()) + val result = rdd.histogram(1) + } + // Single element + intercept[UnsupportedOperationException] + { + val rdd: RDD[Double] = sc.parallelize(Seq(1)) + val result = rdd.histogram(1) + } + // No Range + intercept[UnsupportedOperationException] + { + val rdd: RDD[Double] = sc.parallelize(Seq(1,1,1)) + val result = rdd.histogram(1) + } + } + +} -- cgit v1.2.3