aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-04-11 22:46:47 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-11 22:46:47 -0700
commit165e06a74c3d75e6b7341c120943add8b035b96a (patch)
tree407aa7d706a64eb45f7e4d02f3f6db640b130b28
parentaa8bb117a3ff98420ab751ba4ddbaad88ab57f9d (diff)
downloadspark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.gz
spark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.bz2
spark-165e06a74c3d75e6b7341c120943add8b035b96a.zip
SPARK-1057 (alternative) Remove fastutil
(This is for discussion at this point -- I'm not suggesting this should be committed.) This is what removing fastutil looks like. Much of it is straightforward, like using `java.io` buffered stream classes, and Guava for murmurhash3. Uses of the `FastByteArrayOutputStream` were a little trickier. In only one case though do I think the change to use `java.io` actually entails an extra array copy. The rest is using `OpenHashMap` and `OpenHashSet`. These are now written in terms of more scala-like operations. `OpenHashMap` is where I made three non-trivial changes to make it work, and they need review: - It is no longer private - The key must be a `ClassTag` - Unless a lot of other code changes, the key type can't enforce being a supertype of `Null` It all works and tests pass, and I think there is reason to believe it's OK from a speed perspective. But what about those last changes? Author: Sean Owen <sowen@cloudera.com> Closes #266 from srowen/SPARK-1057-alternate and squashes the following commits: 2601129 [Sean Owen] Fix Map return type error not previously caught ec65502 [Sean Owen] Updates from matei's review 00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/Serializer.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala4
-rw-r--r--pom.xml5
-rw-r--r--project/SparkBuild.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala11
19 files changed, 72 insertions, 107 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 1f80838081..a1bdd8ec68 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -158,10 +158,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- </dependency>
- <dependency>
<groupId>colt</groupId>
<artifactId>colt</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index f6a8a8af91..29372f16f2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -18,11 +18,10 @@
package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
-import java.net.{URI, URL, URLConnection}
+import java.io.{BufferedInputStream, BufferedOutputStream}
+import java.net.{URL, URLConnection, URI}
import java.util.concurrent.TimeUnit
-import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream}
-
import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
@@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
- new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
@@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
- new FastBufferedInputStream(inputStream, bufferSize)
+ new BufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
index 40b70baabc..8bb78123e3 100644
--- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.Map
import scala.collection.mutable.HashMap
+import scala.reflect.ClassTag
import cern.jet.stat.Probability
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+import org.apache.spark.util.collection.OpenHashMap
/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
-private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
+private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {
var outputsMerged = 0
- var sums = new OLMap[T] // Sum of counts for each key
+ var sums = new OpenHashMap[T,Long]() // Sum of counts for each key
- override def merge(outputId: Int, taskResult: OLMap[T]) {
+ override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
outputsMerged += 1
- val iter = taskResult.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
+ taskResult.foreach { case (key, value) =>
+ sums.changeValue(key, value, _ + value)
}
}
override def currentResult(): Map[T, BoundedDouble] = {
if (outputsMerged == totalOutputs) {
val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val sum = entry.getLongValue()
- result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+ sums.foreach { case (key, sum) =>
+ result(key) = new BoundedDouble(sum, 1.0, sum, sum)
}
result
} else if (outputsMerged == 0) {
@@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou
val p = outputsMerged.toDouble / totalOutputs
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val sum = entry.getLongValue
+ sums.foreach { case (key, sum) =>
val mean = (sum + 1 - p) / p
val variance = (sum + 1) * (1 - p) / (p * p)
val stdev = math.sqrt(variance)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
- result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+ result(key) = new BoundedDouble(mean, confidence, low, high)
}
result
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3437b2cac1..891efccf23 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -20,12 +20,10 @@ package org.apache.spark.rdd
import java.util.Random
import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}
import com.clearspring.analytics.stream.cardinality.HyperLogLog
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
@@ -43,6 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
+import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
/**
@@ -834,24 +833,24 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
- val map = new OLMap[T]
- while (iter.hasNext) {
- val v = iter.next()
- map.put(v, map.getLong(v) + 1L)
+ def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+ val map = new OpenHashMap[T,Long]
+ iter.foreach {
+ t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}
- def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
- val iter = m2.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
+ def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+ m2.foreach { case (key, value) =>
+ m1.changeValue(key, value, _ + value)
}
m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
- myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
+ // Convert to a Scala mutable map
+ val mutableResult = scala.collection.mutable.Map[T,Long]()
+ myResult.foreach { case (k, v) => mutableResult.put(k, v) }
+ mutableResult
}
/**
@@ -866,11 +865,10 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
- val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
- val map = new OLMap[T]
- while (iter.hasNext) {
- val v = iter.next()
- map.put(v, map.getLong(v) + 1L)
+ val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
+ val map = new OpenHashMap[T,Long]
+ iter.foreach {
+ t => map.changeValue(t, 1L, _ + 1L)
}
map
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index b03665fd56..f868e772cf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -17,11 +17,10 @@
package org.apache.spark.scheduler
-import java.io.InputStream
+import java.io.{BufferedInputStream, InputStream}
import scala.io.Source
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._
@@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
var currentLine = "<not started>"
try {
fileStream = Some(fileSystem.open(path))
- bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
+ bufferedStream = Some(new BufferedInputStream(fileStream.get))
compressStream = Some(wrapForCompression(bufferedStream.get))
// Parse each line as an event and post the event to all attached listeners
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index b85b4a50cd..a8bcb7dfe2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -17,13 +17,11 @@
package org.apache.spark.scheduler
-import java.io.{DataInputStream, DataOutputStream}
+import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
@@ -104,7 +102,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {
- val out = new FastByteArrayOutputStream(4096)
+ val out = new ByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)
// Write currentFiles
@@ -125,8 +123,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task).array()
out.write(taskBytes)
- out.trim()
- ByteBuffer.wrap(out.array)
+ ByteBuffer.wrap(out.toByteArray)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
index 9f04dc6e42..f2c8f9b621 100644
--- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala
@@ -17,11 +17,9 @@
package org.apache.spark.serializer
-import java.io.{EOFException, InputStream, OutputStream}
+import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import org.apache.spark.SparkEnv
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
@@ -73,10 +71,9 @@ trait SerializerInstance {
def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
// Default implementation uses serializeStream
- val stream = new FastByteArrayOutputStream()
+ val stream = new ByteArrayOutputStream()
serializeStream(stream).writeAll(iterator)
- val buffer = ByteBuffer.allocate(stream.position.toInt)
- buffer.put(stream.array, 0, stream.position.toInt)
+ val buffer = ByteBuffer.wrap(stream.toByteArray)
buffer.flip()
buffer
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index df9bb4044e..f14017051f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{File, InputStream, OutputStream}
+import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -26,7 +26,6 @@ import scala.concurrent.duration._
import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
-import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer
import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
@@ -992,7 +991,7 @@ private[spark] class BlockManager(
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer) {
- val byteStream = new FastBufferedOutputStream(outputStream)
+ val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
@@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
- val byteStream = new FastByteArrayOutputStream(4096)
+ val byteStream = new ByteArrayOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
- byteStream.trim()
- ByteBuffer.wrap(byteStream.array)
+ ByteBuffer.wrap(byteStream.toByteArray)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 696b930a26..a2687e6be4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -17,11 +17,9 @@
package org.apache.spark.storage
-import java.io.{FileOutputStream, File, OutputStream}
+import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
@@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
- bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
+ bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 0080a8b342..68a12e8ed6 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -17,12 +17,11 @@
package org.apache.spark.util
-import java.io._
+import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.spark.{Logging, SparkConf}
@@ -100,7 +99,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}
- val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
+ val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index b955612ca7..0846557530 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable.ArrayBuffer
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
import org.apache.spark.Logging
+import org.apache.spark.util.collection.OpenHashSet
/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
@@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
- val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
+ val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 025492b177..ad38250ad3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -19,6 +19,8 @@ package org.apache.spark.util.collection
import java.util.{Arrays, Comparator}
+import com.google.common.hash.Hashing
+
import org.apache.spark.annotation.DeveloperApi
/**
@@ -199,11 +201,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
- * We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
- private def rehash(h: Int): Int = {
- it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
- }
+ private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
/** Double the table's size and re-hash everything */
protected def growTable() {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index dd01ae821f..d615767284 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -17,14 +17,13 @@
package org.apache.spark.util.collection
-import java.io._
+import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException}
import java.util.Comparator
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.io.ByteStreams
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
@@ -350,7 +349,7 @@ class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
extends Iterator[(K, C)] {
private val fileStream = new FileInputStream(file)
- private val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
+ private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 62f99f3981..b8de4ff9aa 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -30,7 +30,8 @@ import org.apache.spark.annotation.DeveloperApi
* Under the hood, it uses our OpenHashSet implementation.
*/
@DeveloperApi
-class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
+private[spark]
+class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 148c12e64d..19af4f8cbe 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util.collection
import scala.reflect._
+import com.google.common.hash.Hashing
/**
* A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
@@ -256,9 +257,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
- * We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
- private def hashcode(h: Int): Int = it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
+ private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()
private def nextPowerOf2(n: Int): Int = {
val highBit = Integer.highestOneBit(n)
diff --git a/pom.xml b/pom.xml
index c03bb35c99..5f66cbe768 100644
--- a/pom.xml
+++ b/pom.xml
@@ -349,11 +349,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- <version>6.4.4</version>
- </dependency>
- <dependency>
<groupId>colt</groupId>
<artifactId>colt</artifactId>
<version>1.2.0</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 21163760e6..a6058bba3d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -331,7 +331,6 @@ object SparkBuild extends Build {
"org.spark-project.akka" %% "akka-slf4j" % akkaVersion excludeAll(excludeNetty),
"org.spark-project.akka" %% "akka-testkit" % akkaVersion % "test",
"org.json4s" %% "json4s-jackson" % "3.2.6" excludeAll(excludeScalap),
- "it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.13.0",
"commons-net" % "commons-net" % "2.2",
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index bd1df55cf7..bbf57ef927 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -19,18 +19,17 @@ package org.apache.spark.streaming.util
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import org.apache.spark.util.collection.OpenHashMap
import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
- * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ /**
+ * Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- val map = new OLMap[String]
+ val map = new OpenHashMap[String,Long]
var i = 0
var j = 0
while (iter.hasNext) {
@@ -43,14 +42,16 @@ object RawTextHelper {
}
if (j > i) {
val w = s.substring(i, j)
- val c = map.getLong(w)
- map.put(w, c + 1)
+ map.changeValue(w, 1L, _ + 1L)
}
i = j
while (i < s.length && s.charAt(i) == ' ') {
i += 1
}
}
+ map.toIterator.map {
+ case (k, v) => (k, v)
+ }
}
map.toIterator.map{case (k, v) => (k, v)}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 684b38e8b3..a7850812bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,12 @@
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{ByteArrayOutputStream, IOException}
import java.net.ServerSocket
import java.nio.ByteBuffer
import scala.io.Source
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.IntParam
@@ -45,16 +43,15 @@ object RawTextSender extends Logging {
// Repeat the input data multiple times to fill in a buffer
val lines = Source.fromFile(file).getLines().toArray
- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
+ val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
var i = 0
- while (bufferStream.position < blockSize) {
+ while (bufferStream.size < blockSize) {
serStream.writeObject(lines(i))
i = (i + 1) % lines.length
}
- bufferStream.trim()
- val array = bufferStream.array
+ val array = bufferStream.toByteArray
val countBuf = ByteBuffer.wrap(new Array[Byte](4))
countBuf.putInt(array.length)