diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala | 46 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala | 117 |
2 files changed, 148 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala index 369519c559..20554f0aab 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala @@ -17,35 +17,51 @@ package org.apache.spark.util.collection -/** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */ +/** + * An append-only, non-threadsafe, array-backed vector that is optimized for primitive types. + */ private[spark] class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) { - private var numElements = 0 - private var array: Array[V] = _ + private var _numElements = 0 + private var _array: Array[V] = _ // NB: This must be separate from the declaration, otherwise the specialized parent class - // will get its own array with the same initial size. TODO: Figure out why... - array = new Array[V](initialSize) + // will get its own array with the same initial size. + _array = new Array[V](initialSize) def apply(index: Int): V = { - require(index < numElements) - array(index) + require(index < _numElements) + _array(index) } def +=(value: V) { - if (numElements == array.length) { resize(array.length * 2) } - array(numElements) = value - numElements += 1 + if (_numElements == _array.length) { + resize(_array.length * 2) + } + _array(_numElements) = value + _numElements += 1 } - def length = numElements + def capacity: Int = _array.length + + def length: Int = _numElements + + def size: Int = _numElements + + /** Gets the underlying array backing this vector. */ + def array: Array[V] = _array - def getUnderlyingArray = array + /** Trims this vector so that the capacity is equal to the size. */ + def trim(): PrimitiveVector[V] = resize(size) /** Resizes the array, dropping elements if the total length decreases. */ - def resize(newLength: Int) { + def resize(newLength: Int): PrimitiveVector[V] = { val newArray = new Array[V](newLength) - array.copyToArray(newArray) - array = newArray + _array.copyToArray(newArray) + _array = newArray + if (newLength < _numElements) { + _numElements = newLength + } + this } } diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala new file mode 100644 index 0000000000..970dade628 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveVectorSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import org.scalatest.FunSuite + +import org.apache.spark.util.SizeEstimator + +class PrimitiveVectorSuite extends FunSuite { + + test("primitive value") { + val vector = new PrimitiveVector[Int] + + for (i <- 0 until 1000) { + vector += i + assert(vector(i) === i) + } + + assert(vector.size === 1000) + assert(vector.size == vector.length) + intercept[IllegalArgumentException] { + vector(1000) + } + + for (i <- 0 until 1000) { + assert(vector(i) == i) + } + } + + test("non-primitive value") { + val vector = new PrimitiveVector[String] + + for (i <- 0 until 1000) { + vector += i.toString + assert(vector(i) === i.toString) + } + + assert(vector.size === 1000) + assert(vector.size == vector.length) + intercept[IllegalArgumentException] { + vector(1000) + } + + for (i <- 0 until 1000) { + assert(vector(i) == i.toString) + } + } + + test("ideal growth") { + val vector = new PrimitiveVector[Long](initialSize = 1) + vector += 1 + for (i <- 1 until 1024) { + vector += i + assert(vector.size === i + 1) + assert(vector.capacity === Integer.highestOneBit(i) * 2) + } + assert(vector.capacity === 1024) + vector += 1024 + assert(vector.capacity === 2048) + } + + test("ideal size") { + val vector = new PrimitiveVector[Long](8192) + for (i <- 0 until 8192) { + vector += i + } + assert(vector.size === 8192) + assert(vector.capacity === 8192) + val actualSize = SizeEstimator.estimate(vector) + val expectedSize = 8192 * 8 + // Make sure we are not allocating a significant amount of memory beyond our expected. + // Due to specialization wonkiness, we need to ensure we don't have 2 copies of the array. + assert(actualSize < expectedSize * 1.1) + } + + test("resizing") { + val vector = new PrimitiveVector[Long] + for (i <- 0 until 4097) { + vector += i + } + assert(vector.size === 4097) + assert(vector.capacity === 8192) + vector.trim() + assert(vector.size === 4097) + assert(vector.capacity === 4097) + vector.resize(5000) + assert(vector.size === 4097) + assert(vector.capacity === 5000) + vector.resize(4000) + assert(vector.size === 4000) + assert(vector.capacity === 4000) + vector.resize(5000) + assert(vector.size === 4000) + assert(vector.capacity === 5000) + for (i <- 0 until 4000) { + assert(vector(i) == i) + } + intercept[IllegalArgumentException] { + vector(4000) + } + } +} |