diff options
author | Reynold Xin <rxin@apache.org> | 2013-11-02 23:19:01 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-11-02 23:19:01 -0700 |
commit | 1e9543b567b81cf3207984402269d230c10e713e (patch) | |
tree | 326ea5a89f1b83949350baae7b209c7b7d4b779a /core | |
parent | da6bb0aedd5121d9e3b92031dcc0884a9682da05 (diff) | |
download | spark-1e9543b567b81cf3207984402269d230c10e713e.tar.gz spark-1e9543b567b81cf3207984402269d230c10e713e.tar.bz2 spark-1e9543b567b81cf3207984402269d230c10e713e.zip |
Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug.
Also addressed Matei's code review comment.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 12 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/BitSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/BitSet.scala) | 6 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala) | 12 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala) | 16 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala) | 14 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala) | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala) | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala) | 2 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala) | 2 |
9 files changed, 38 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0c5c12b7a8..fe932d8ede 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -18,13 +18,12 @@ package org.apache.spark.util import java.io._ -import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} +import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} -import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import java.util.regex.Pattern +import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source @@ -36,8 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer -import org.apache.spark.{SparkEnv, SparkException, Logging} -import java.util.ConcurrentModificationException +import org.apache.spark.{SparkException, Logging} /** @@ -149,7 +147,7 @@ private[spark] object Utils extends Logging { return buf } - private val shutdownDeletePaths = new collection.mutable.HashSet[String]() + private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 0ec002b5d0..6604ec738c 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -24,8 +24,8 @@ package org.apache.spark.util.hash */ class BitSet(numBits: Int) { - private val words = new Array[Long](bit2words(numBits)) - private val numWords = words.length + private[this] val words = new Array[Long](bit2words(numBits)) + private[this] val numWords = words.length /** * Sets the bit at the specified index to true. diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index a376d1015a..ed117b2abf 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -34,7 +34,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: def this() = this(64) protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) + + // Init in constructor (instead of in declaration) to work around a Scala compiler specialization + // bug that would generate two arrays (one for Object and one for specialized T). + private var _values: Array[V] = _ + _values = new Array[V](_keySet.capacity) @transient private var _oldValues: Array[V] = null @@ -64,7 +68,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: haveNullValue = true nullValue = v } else { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v _keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -87,7 +91,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: } nullValue } else { - val pos = _keySet.fastAdd(k) + val pos = _keySet.addWithoutResize(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 7aa3f6220c..e98a93dc2a 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -78,9 +78,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( protected var _mask = _capacity - 1 protected var _size = 0 - protected var _data = classManifest[T].newArray(_capacity) protected var _bitset = new BitSet(_capacity) + // Init of the array in constructor (instead of in declaration) to work around a Scala compiler + // specialization bug that would generate two arrays (one for Object and one for specialized T). + protected var _data: Array[T] = _ + _data = new Array[T](_capacity) + /** Number of elements in the set. */ def size: Int = _size @@ -95,7 +99,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * and rehash all elements. */ def add(k: T) { - fastAdd(k) + addWithoutResize(k) rehashIfNeeded(k, grow, move) } @@ -109,7 +113,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( * @return The position where the key is placed, plus the highest order bit is set if the key * exists previously. */ - def fastAdd(k: T): Int = putInto(_bitset, _data, k) + def addWithoutResize(k: T): Int = putInto(_bitset, _data, k) /** * Rehash the set if it is overloaded. @@ -154,7 +158,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest]( /** * Put an entry into the set. Return the position where the key is placed. In addition, the - * highest bid in the returned position is set if the key exists prior to this put. + * highest bit in the returned position is set if the key exists prior to this put. * * This function assumes the data array has at least one empty slot. */ @@ -236,9 +240,7 @@ private[spark] object OpenHashSet { val INVALID_POS = -1 - val EXISTENCE_MASK = 0x80000000 - val POSITION_MASK = 0xEFFFFFF /** diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala index 14c1367207..e8f28ecdd7 100644 --- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection /** @@ -36,8 +36,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int]) - protected var _keySet = new OpenHashSet[K](initialCapacity) - private var _values = new Array[V](_keySet.capacity) + // Init in constructor (instead of in declaration) to work around a Scala compiler specialization + // bug that would generate two arrays (one for Object and one for specialized T). + protected var _keySet: OpenHashSet[K] = _ + private var _values: Array[V] = _ + _keySet = new OpenHashSet[K](initialCapacity) + _values = new Array[V](_keySet.capacity) private var _oldValues: Array[V] = null @@ -51,7 +55,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, /** Set the value for a key */ def update(k: K, v: V) { - val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK + val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK _values(pos) = v _keySet.rehashIfNeeded(k, grow, move) _oldValues = null @@ -64,7 +68,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest, * @return the newly updated value. */ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = { - val pos = _keySet.fastAdd(k) + val pos = _keySet.addWithoutResize(k) if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { val newValue = defaultValue _values(pos & OpenHashSet.POSITION_MASK) = newValue diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index 41ede860d2..0f1ab3d20e 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala index 355784da32..5e74ca1f7e 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala index b5b3a4abe1..40049e8475 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala index b9a4b54544..dc7f6cb023 100644 --- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala @@ -1,4 +1,4 @@ -package org.apache.spark.util.hash +package org.apache.spark.util.collection import scala.collection.mutable.HashSet import org.scalatest.FunSuite |