aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-09-08 23:50:24 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-09-08 23:50:24 -0700
commitbf984e27457fb64baa1c1b32bbc2d7dd645c98c4 (patch)
tree4b47659200f2375e5323d5ad5ede40a884b7b8f8 /core
parente9d4f44a7adfaf55d0fd312b81350638310c341d (diff)
parent1e2474b814719673ce0faa2a8551d1acfb135ead (diff)
downloadspark-bf984e27457fb64baa1c1b32bbc2d7dd645c98c4.tar.gz
spark-bf984e27457fb64baa1c1b32bbc2d7dd645c98c4.tar.bz2
spark-bf984e27457fb64baa1c1b32bbc2d7dd645c98c4.zip
Merge pull request #890 from mridulm/master
Fix hash bug
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala14
3 files changed, 17 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
index 537f225469..8afcbe190a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala
@@ -20,6 +20,7 @@ package org.apache.spark.network.netty
import java.io.File
import org.apache.spark.Logging
+import org.apache.spark.util.Utils
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
@@ -57,7 +58,7 @@ private[spark] object ShuffleSender {
throw new Exception("Block " + blockId + " is not a shuffle block")
}
// Figure out which local directory it hashes to, and which subdirectory in that
- val hash = math.abs(blockId.hashCode)
+ val hash = Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index fc25ef0fae..63447baf8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -238,7 +238,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
logDebug("Getting file for block " + blockId)
// Figure out which local directory it hashes to, and which subdirectory in that
- val hash = math.abs(blockId.hashCode)
+ val hash = Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 468800b2bd..b890be2f6f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -778,4 +778,18 @@ private[spark] object Utils extends Logging {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
+
+ // Handles idiosyncracies with hash (add more as required)
+ def nonNegativeHash(obj: AnyRef): Int = {
+
+ // Required ?
+ if (obj eq null) return 0
+
+ val hash = obj.hashCode
+ // math.abs fails for Int.MinValue
+ val hashAbs = if (Int.MinValue != hash) math.abs(hash) else 0
+
+ // Nothing else to guard against ?
+ hashAbs
+ }
}