aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala31
-rw-r--r--pom.xml2
4 files changed, 38 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce124c0..b2e9a97129 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
- * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
+ emptyBlocks.trim()
+ emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1bcb3175a3..d5ba690ed0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
+import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable
@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap
@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466fab4..15c8de61b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
import scala.util.Random
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}
+
+ test("RoaringBitmap: runOptimize succeeded") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 != 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 > size2)
+ assert(success)
+ }
+
+ test("RoaringBitmap: runOptimize failed") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 == 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 === size2)
+ assert(!success)
+ }
}
diff --git a/pom.xml b/pom.xml
index 2a8a445057..940e2d8740 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,7 +637,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
- <version>0.4.5</version>
+ <version>0.5.11</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>