aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2015-11-16 14:50:38 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-16 14:50:38 -0800
commit3c025087b58f475a9bcb5c8f4b2b2df804915b2b (patch)
treebf8fcb5faaaf62d080b0b35ba1f94be7ff296f3d
parent985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181 (diff)
downloadspark-3c025087b58f475a9bcb5c8f4b2b2df804915b2b.tar.gz
spark-3c025087b58f475a9bcb5c8f4b2b2df804915b2b.tar.bz2
spark-3c025087b58f475a9bcb5c8f4b2b2df804915b2b.zip
Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"
This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala49
-rw-r--r--pom.xml5
7 files changed, 33 insertions, 82 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a076..37e3f168ab 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
<artifactId>lz4</artifactId>
</dependency>
<dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1827..1efce124c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
+import org.roaringbitmap.RoaringBitmap
+
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
/**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
- private[this] var emptyBlocks: BitSet,
+ private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc
override def getSizeForBlock(reduceId: Int): Long = {
- if (emptyBlocks.get(reduceId)) {
+ if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
- emptyBlocks = new BitSet
+ emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
+ val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
- val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
- emptyBlocks.set(i)
+ emptyBlocks.add(i)
}
i += 1
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f282..c5195c1143 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
@@ -38,7 +39,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
-import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+import org.apache.spark.util.collection.CompactBuffer
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
@@ -362,7 +363,12 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
- classOf[BitSet],
+ classOf[RoaringBitmap],
+ classOf[RoaringArray],
+ classOf[RoaringArray.Element],
+ classOf[Array[RoaringArray.Element]],
+ classOf[ArrayContainer],
+ classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 85c5bdbfce..7ab67fc3a2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -17,21 +17,14 @@
package org.apache.spark.util.collection
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-
-import org.apache.spark.util.{Utils => UUtils}
-
-
/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
-class BitSet(private[this] var numBits: Int) extends Externalizable {
+class BitSet(numBits: Int) extends Serializable {
- private var words = new Array[Long](bit2words(numBits))
- private def numWords = words.length
-
- def this() = this(0)
+ private val words = new Array[Long](bit2words(numBits))
+ private val numWords = words.length
/**
* Compute the capacity (number of bits) that can be represented
@@ -237,19 +230,4 @@ class BitSet(private[this] var numBits: Int) extends Externalizable {
/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
-
- override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
- out.writeInt(numBits)
- words.foreach(out.writeLong(_))
- }
-
- override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
- numBits = in.readInt()
- words = new Array[Long](bit2words(numBits))
- var index = 0
- while (index < words.length) {
- words(index) = in.readLong()
- index += 1
- }
- }
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index afe2e80358..e428414cf6 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -322,6 +322,12 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
+ // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
+ // values, and they use a bitmap (dense) if they have more than 4096 values, and an
+ // array (sparse) if they use less. So we just create two cases, one sparse and one dense.
+ // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
+ // empty blocks
+
val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index b0db0988ee..69dbfa9cd7 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -17,10 +17,7 @@
package org.apache.spark.util.collection
-import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
-
import org.apache.spark.SparkFunSuite
-import org.apache.spark.util.{Utils => UUtils}
class BitSetSuite extends SparkFunSuite {
@@ -155,50 +152,4 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}
-
- test("read and write externally") {
- val tempDir = UUtils.createTempDir()
- val outputFile = File.createTempFile("bits", null, tempDir)
-
- val fos = new FileOutputStream(outputFile)
- val oos = new ObjectOutputStream(fos)
-
- // Create BitSet
- val setBits = Seq(0, 9, 1, 10, 90, 96)
- val bitset = new BitSet(100)
-
- for (i <- 0 until 100) {
- assert(!bitset.get(i))
- }
-
- setBits.foreach(i => bitset.set(i))
-
- for (i <- 0 until 100) {
- if (setBits.contains(i)) {
- assert(bitset.get(i))
- } else {
- assert(!bitset.get(i))
- }
- }
- assert(bitset.cardinality() === setBits.size)
-
- bitset.writeExternal(oos)
- oos.close()
-
- val fis = new FileInputStream(outputFile)
- val ois = new ObjectInputStream(fis)
-
- // Read BitSet from the file
- val bitset2 = new BitSet(0)
- bitset2.readExternal(ois)
-
- for (i <- 0 until 100) {
- if (setBits.contains(i)) {
- assert(bitset2.get(i))
- } else {
- assert(!bitset2.get(i))
- }
- }
- assert(bitset2.cardinality() === setBits.size)
- }
}
diff --git a/pom.xml b/pom.xml
index 01afa80617..2a8a445057 100644
--- a/pom.xml
+++ b/pom.xml
@@ -635,6 +635,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>0.4.5</version>
+ </dependency>
+ <dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>2.2</version>