aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-11-02 23:19:01 -0700
committerReynold Xin <rxin@apache.org>2013-11-02 23:19:01 -0700
commit1e9543b567b81cf3207984402269d230c10e713e (patch)
tree326ea5a89f1b83949350baae7b209c7b7d4b779a /core
parentda6bb0aedd5121d9e3b92031dcc0884a9682da05 (diff)
downloadspark-1e9543b567b81cf3207984402269d230c10e713e.tar.gz
spark-1e9543b567b81cf3207984402269d230c10e713e.tar.bz2
spark-1e9543b567b81cf3207984402269d230c10e713e.zip
Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug.
Also addressed Matei's code review comment.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/BitSet.scala)6
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala)12
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala)16
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala (renamed from core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala)14
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala)2
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala (renamed from core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala)2
9 files changed, 38 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c5c12b7a8..fe932d8ede 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,13 +18,12 @@
package org.apache.spark.util
import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
+import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
@@ -36,8 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkEnv, SparkException, Logging}
-import java.util.ConcurrentModificationException
+import org.apache.spark.{SparkException, Logging}
/**
@@ -149,7 +147,7 @@ private[spark] object Utils extends Logging {
return buf
}
- private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
+ private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 0ec002b5d0..6604ec738c 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -24,8 +24,8 @@ package org.apache.spark.util.hash
*/
class BitSet(numBits: Int) {
- private val words = new Array[Long](bit2words(numBits))
- private val numWords = words.length
+ private[this] val words = new Array[Long](bit2words(numBits))
+ private[this] val numWords = words.length
/**
* Sets the bit at the specified index to true.
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index a376d1015a..ed117b2abf 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -34,7 +34,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
def this() = this(64)
protected var _keySet = new OpenHashSet[K](initialCapacity)
- private var _values = new Array[V](_keySet.capacity)
+
+ // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+ // bug that would generate two arrays (one for Object and one for specialized T).
+ private var _values: Array[V] = _
+ _values = new Array[V](_keySet.capacity)
@transient private var _oldValues: Array[V] = null
@@ -64,7 +68,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
haveNullValue = true
nullValue = v
} else {
- val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
_keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
@@ -87,7 +91,7 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
}
nullValue
} else {
- val pos = _keySet.fastAdd(k)
+ val pos = _keySet.addWithoutResize(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 7aa3f6220c..e98a93dc2a 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -78,9 +78,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
protected var _mask = _capacity - 1
protected var _size = 0
- protected var _data = classManifest[T].newArray(_capacity)
protected var _bitset = new BitSet(_capacity)
+ // Init of the array in constructor (instead of in declaration) to work around a Scala compiler
+ // specialization bug that would generate two arrays (one for Object and one for specialized T).
+ protected var _data: Array[T] = _
+ _data = new Array[T](_capacity)
+
/** Number of elements in the set. */
def size: Int = _size
@@ -95,7 +99,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* and rehash all elements.
*/
def add(k: T) {
- fastAdd(k)
+ addWithoutResize(k)
rehashIfNeeded(k, grow, move)
}
@@ -109,7 +113,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
* @return The position where the key is placed, plus the highest order bit is set if the key
* exists previously.
*/
- def fastAdd(k: T): Int = putInto(_bitset, _data, k)
+ def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
/**
* Rehash the set if it is overloaded.
@@ -154,7 +158,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
/**
* Put an entry into the set. Return the position where the key is placed. In addition, the
- * highest bid in the returned position is set if the key exists prior to this put.
+ * highest bit in the returned position is set if the key exists prior to this put.
*
* This function assumes the data array has at least one empty slot.
*/
@@ -236,9 +240,7 @@ private[spark]
object OpenHashSet {
val INVALID_POS = -1
-
val EXISTENCE_MASK = 0x80000000
-
val POSITION_MASK = 0xEFFFFFF
/**
diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 14c1367207..e8f28ecdd7 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
/**
@@ -36,8 +36,12 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
- protected var _keySet = new OpenHashSet[K](initialCapacity)
- private var _values = new Array[V](_keySet.capacity)
+ // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+ // bug that would generate two arrays (one for Object and one for specialized T).
+ protected var _keySet: OpenHashSet[K] = _
+ private var _values: Array[V] = _
+ _keySet = new OpenHashSet[K](initialCapacity)
+ _values = new Array[V](_keySet.capacity)
private var _oldValues: Array[V] = null
@@ -51,7 +55,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
/** Set the value for a key */
def update(k: K, v: V) {
- val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+ val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
_values(pos) = v
_keySet.rehashIfNeeded(k, grow, move)
_oldValues = null
@@ -64,7 +68,7 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
* @return the newly updated value.
*/
def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
- val pos = _keySet.fastAdd(k)
+ val pos = _keySet.addWithoutResize(k)
if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
val newValue = defaultValue
_values(pos & OpenHashSet.POSITION_MASK) = newValue
diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index 41ede860d2..0f1ab3d20e 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
index 355784da32..5e74ca1f7e 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
index b5b3a4abe1..40049e8475 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import org.scalatest.FunSuite
diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
index b9a4b54544..dc7f6cb023 100644
--- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.util.hash
+package org.apache.spark.util.collection
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite