aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
commit95d8dbce91f49467050250d5cf3671aaaa648d76 (patch)
tree06e2583c63cdf39d6d15d36a3189c2e6db0148ba /core/src/test/scala
parent199e9cf02dfaa372c1f067bca54556e1f6ce787d (diff)
parent2fead510f74b962b293de4d724136c24a9825271 (diff)
downloadspark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.gz
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.bz2
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.zip
Merge branch 'master' of github.com:apache/incubator-spark into scala-2.10-temp
Conflicts: core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/PartitioningSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala86
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala117
5 files changed, 209 insertions, 51 deletions
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 459e257d79..8dd5786da6 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _
override def beforeAll() {
- InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
}
diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
deleted file mode 100644
index 21f16ef2c6..0000000000
--- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.FunSuite
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
-
-
-class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
-
- test("Pruned Partitions inherit locality prefs correctly") {
- class TestPartition(i: Int) extends Partition {
- def index = i
- }
- val rdd = new RDD[Int](sc, Nil) {
- override protected def getPartitions = {
- Array[Partition](
- new TestPartition(1),
- new TestPartition(2),
- new TestPartition(3))
- }
- def compute(split: Partition, context: TaskContext) = {Iterator()}
- }
- val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
- val p = prunedRDD.partitions(0)
- assert(p.index == 2)
- assert(prunedRDD.partitions.length == 1)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d938917f2..1374d01774 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
.filter(_ >= 0.0)
// Run the partitions, including the consecutive empty ones, through StatCounter
- val stats: StatCounter = rdd.stats();
- assert(abs(6.0 - stats.sum) < 0.01);
- assert(abs(6.0/2 - rdd.mean) < 0.01);
- assert(abs(1.0 - rdd.variance) < 0.01);
- assert(abs(1.0 - rdd.stdev) < 0.01);
+ val stats: StatCounter = rdd.stats()
+ assert(abs(6.0 - stats.sum) < 0.01)
+ assert(abs(6.0/2 - rdd.mean) < 0.01)
+ assert(abs(1.0 - rdd.variance) < 0.01)
+ assert(abs(1.0 - rdd.stdev) < 0.01)
// Add other tests here for classes that should be able to handle empty partitions correctly
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000..53a7b7c44d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.scalatest.FunSuite
+import org.apache.spark.{TaskContext, Partition, SharedSparkContext}
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 1),
+ new TestPartition(1, 1),
+ new TestPartition(2, 1))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ Iterator()
+ }
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+ assert(prunedRDD.partitions.length == 1)
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 0)
+ assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2)
+ }
+
+
+ test("Pruned Partitions can be unioned ") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 4),
+ new TestPartition(1, 5),
+ new TestPartition(2, 6))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ List(split.asInstanceOf[TestPartition].testValue).iterator
+ }
+ }
+ val prunedRDD1 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 0) true else false
+ })
+
+ val prunedRDD2 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+
+ val merged = prunedRDD1 ++ prunedRDD2
+ assert(merged.count() == 2)
+ val take = merged.take(2)
+ assert(take.apply(0) == 4)
+ assert(take.apply(1) == 6)
+ }
+}
+
+class TestPartition(i: Int, value: Int) extends Partition with Serializable {
+ def index = i
+
+ def testValue = this.value
+
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala
new file mode 100644
index 0000000000..970dade628
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class PrimitiveVectorSuite extends FunSuite {
+
+ test("primitive value") {
+ val vector = new PrimitiveVector[Int]
+
+ for (i <- 0 until 1000) {
+ vector += i
+ assert(vector(i) === i)
+ }
+
+ assert(vector.size === 1000)
+ assert(vector.size == vector.length)
+ intercept[IllegalArgumentException] {
+ vector(1000)
+ }
+
+ for (i <- 0 until 1000) {
+ assert(vector(i) == i)
+ }
+ }
+
+ test("non-primitive value") {
+ val vector = new PrimitiveVector[String]
+
+ for (i <- 0 until 1000) {
+ vector += i.toString
+ assert(vector(i) === i.toString)
+ }
+
+ assert(vector.size === 1000)
+ assert(vector.size == vector.length)
+ intercept[IllegalArgumentException] {
+ vector(1000)
+ }
+
+ for (i <- 0 until 1000) {
+ assert(vector(i) == i.toString)
+ }
+ }
+
+ test("ideal growth") {
+ val vector = new PrimitiveVector[Long](initialSize = 1)
+ vector += 1
+ for (i <- 1 until 1024) {
+ vector += i
+ assert(vector.size === i + 1)
+ assert(vector.capacity === Integer.highestOneBit(i) * 2)
+ }
+ assert(vector.capacity === 1024)
+ vector += 1024
+ assert(vector.capacity === 2048)
+ }
+
+ test("ideal size") {
+ val vector = new PrimitiveVector[Long](8192)
+ for (i <- 0 until 8192) {
+ vector += i
+ }
+ assert(vector.size === 8192)
+ assert(vector.capacity === 8192)
+ val actualSize = SizeEstimator.estimate(vector)
+ val expectedSize = 8192 * 8
+ // Make sure we are not allocating a significant amount of memory beyond our expected.
+ // Due to specialization wonkiness, we need to ensure we don't have 2 copies of the array.
+ assert(actualSize < expectedSize * 1.1)
+ }
+
+ test("resizing") {
+ val vector = new PrimitiveVector[Long]
+ for (i <- 0 until 4097) {
+ vector += i
+ }
+ assert(vector.size === 4097)
+ assert(vector.capacity === 8192)
+ vector.trim()
+ assert(vector.size === 4097)
+ assert(vector.capacity === 4097)
+ vector.resize(5000)
+ assert(vector.size === 4097)
+ assert(vector.capacity === 5000)
+ vector.resize(4000)
+ assert(vector.size === 4000)
+ assert(vector.capacity === 4000)
+ vector.resize(5000)
+ assert(vector.size === 4000)
+ assert(vector.capacity === 5000)
+ for (i <- 0 until 4000) {
+ assert(vector(i) == i)
+ }
+ intercept[IllegalArgumentException] {
+ vector(4000)
+ }
+ }
+}