aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/BitSet.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala49
-rw-r--r--pom.xml5
7 files changed, 82 insertions, 33 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 319a50049a..1b6b13517b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -174,10 +174,6 @@
<artifactId>lz4</artifactId>
</dependency>
<dependency>
- <groupId>org.roaringbitmap</groupId>
- <artifactId>RoaringBitmap</artifactId>
- </dependency>
- <dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce124c0..180c8d1827 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,9 +19,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}
-import org.roaringbitmap.RoaringBitmap
-
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
/**
@@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
- private[this] var emptyBlocks: RoaringBitmap,
+ private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {
@@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc
override def getSizeForBlock(reduceId: Int): Long = {
- if (emptyBlocks.contains(reduceId)) {
+ if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
@@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
- emptyBlocks = new RoaringBitmap()
+ emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
@@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
- val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
+ val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
- emptyBlocks.add(i)
+ emptyBlocks.set(i)
}
i += 1
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c5195c1143..bc51d4f282 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
-import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
@@ -39,7 +38,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
-import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
@@ -363,12 +362,7 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
- classOf[RoaringBitmap],
- classOf[RoaringArray],
- classOf[RoaringArray.Element],
- classOf[Array[RoaringArray.Element]],
- classOf[ArrayContainer],
- classOf[BitmapContainer],
+ classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 7ab67fc3a2..85c5bdbfce 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -17,14 +17,21 @@
package org.apache.spark.util.collection
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+import org.apache.spark.util.{Utils => UUtils}
+
+
/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
-class BitSet(numBits: Int) extends Serializable {
+class BitSet(private[this] var numBits: Int) extends Externalizable {
- private val words = new Array[Long](bit2words(numBits))
- private val numWords = words.length
+ private var words = new Array[Long](bit2words(numBits))
+ private def numWords = words.length
+
+ def this() = this(0)
/**
* Compute the capacity (number of bits) that can be represented
@@ -230,4 +237,19 @@ class BitSet(numBits: Int) extends Serializable {
/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1
+
+ override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
+ out.writeInt(numBits)
+ words.foreach(out.writeLong(_))
+ }
+
+ override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
+ numBits = in.readInt()
+ words = new Array[Long](bit2words(numBits))
+ var index = 0
+ while (index < words.length) {
+ words(index) = in.readLong()
+ index += 1
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index e428414cf6..afe2e80358 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -322,12 +322,6 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
- // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
- // values, and they use a bitmap (dense) if they have more than 4096 values, and an
- // array (sparse) if they use less. So we just create two cases, one sparse and one dense.
- // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
- // empty blocks
-
val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
index 69dbfa9cd7..b0db0988ee 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -17,7 +17,10 @@
package org.apache.spark.util.collection
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+
import org.apache.spark.SparkFunSuite
+import org.apache.spark.util.{Utils => UUtils}
class BitSetSuite extends SparkFunSuite {
@@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}
+
+ test("read and write externally") {
+ val tempDir = UUtils.createTempDir()
+ val outputFile = File.createTempFile("bits", null, tempDir)
+
+ val fos = new FileOutputStream(outputFile)
+ val oos = new ObjectOutputStream(fos)
+
+ // Create BitSet
+ val setBits = Seq(0, 9, 1, 10, 90, 96)
+ val bitset = new BitSet(100)
+
+ for (i <- 0 until 100) {
+ assert(!bitset.get(i))
+ }
+
+ setBits.foreach(i => bitset.set(i))
+
+ for (i <- 0 until 100) {
+ if (setBits.contains(i)) {
+ assert(bitset.get(i))
+ } else {
+ assert(!bitset.get(i))
+ }
+ }
+ assert(bitset.cardinality() === setBits.size)
+
+ bitset.writeExternal(oos)
+ oos.close()
+
+ val fis = new FileInputStream(outputFile)
+ val ois = new ObjectInputStream(fis)
+
+ // Read BitSet from the file
+ val bitset2 = new BitSet(0)
+ bitset2.readExternal(ois)
+
+ for (i <- 0 until 100) {
+ if (setBits.contains(i)) {
+ assert(bitset2.get(i))
+ } else {
+ assert(!bitset2.get(i))
+ }
+ }
+ assert(bitset2.cardinality() === setBits.size)
+ }
}
diff --git a/pom.xml b/pom.xml
index 3dfc434fb5..50c8f29cdb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -624,11 +624,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.roaringbitmap</groupId>
- <artifactId>RoaringBitmap</artifactId>
- <version>0.4.5</version>
- </dependency>
- <dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>2.2</version>