From 165e06a74c3d75e6b7341c120943add8b035b96a Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 11 Apr 2014 22:46:47 -0700 Subject: SPARK-1057 (alternative) Remove fastutil (This is for discussion at this point -- I'm not suggesting this should be committed.) This is what removing fastutil looks like. Much of it is straightforward, like using `java.io` buffered stream classes, and Guava for murmurhash3. Uses of the `FastByteArrayOutputStream` were a little trickier. In only one case though do I think the change to use `java.io` actually entails an extra array copy. The rest is using `OpenHashMap` and `OpenHashSet`. These are now written in terms of more scala-like operations. `OpenHashMap` is where I made three non-trivial changes to make it work, and they need review: - It is no longer private - The key must be a `ClassTag` - Unless a lot of other code changes, the key type can't enforce being a supertype of `Null` It all works and tests pass, and I think there is reason to believe it's OK from a speed perspective. But what about those last changes? Author: Sean Owen Closes #266 from srowen/SPARK-1057-alternate and squashes the following commits: 2601129 [Sean Owen] Fix Map return type error not previously caught ec65502 [Sean Owen] Updates from matei's review 00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes --- core/pom.xml | 4 --- .../org/apache/spark/broadcast/HttpBroadcast.scala | 9 +++--- .../spark/partial/GroupedCountEvaluator.scala | 32 +++++++++----------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 34 ++++++++++------------ .../apache/spark/scheduler/ReplayListenerBus.scala | 5 ++-- .../scala/org/apache/spark/scheduler/Task.scala | 9 ++---- .../org/apache/spark/serializer/Serializer.scala | 9 ++---- .../org/apache/spark/storage/BlockManager.scala | 10 +++---- .../apache/spark/storage/BlockObjectWriter.scala | 6 ++-- .../scala/org/apache/spark/util/FileLogger.scala | 5 ++-- .../org/apache/spark/util/SizeEstimator.scala | 5 ++-- .../spark/util/collection/AppendOnlyMap.scala | 7 ++--- .../util/collection/ExternalAppendOnlyMap.scala | 5 ++-- .../apache/spark/util/collection/OpenHashMap.scala | 3 +- .../apache/spark/util/collection/OpenHashSet.scala | 4 +-- pom.xml | 5 ---- project/SparkBuild.scala | 1 - .../spark/streaming/util/RawTextHelper.scala | 15 +++++----- .../spark/streaming/util/RawTextSender.scala | 11 +++---- 19 files changed, 72 insertions(+), 107 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 1f80838081..a1bdd8ec68 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -157,10 +157,6 @@ - - it.unimi.dsi - fastutil - colt colt diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index f6a8a8af91..29372f16f2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -18,11 +18,10 @@ package org.apache.spark.broadcast import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream} -import java.net.{URI, URL, URLConnection} +import java.io.{BufferedInputStream, BufferedOutputStream} +import java.net.{URL, URLConnection, URI} import java.util.concurrent.TimeUnit -import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream} - import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.io.CompressionCodec import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} @@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging { if (compress) { compressionCodec.compressedOutputStream(new FileOutputStream(file)) } else { - new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) + new BufferedOutputStream(new FileOutputStream(file), bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() @@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging { if (compress) { compressionCodec.compressedInputStream(inputStream) } else { - new FastBufferedInputStream(inputStream, bufferSize) + new BufferedInputStream(inputStream, bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 40b70baabc..8bb78123e3 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.Map import scala.collection.mutable.HashMap +import scala.reflect.ClassTag import cern.jet.stat.Probability -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} + +import org.apache.spark.util.collection.OpenHashMap /** * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval. */ -private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double) - extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] { +private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double) + extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] { var outputsMerged = 0 - var sums = new OLMap[T] // Sum of counts for each key + var sums = new OpenHashMap[T,Long]() // Sum of counts for each key - override def merge(outputId: Int, taskResult: OLMap[T]) { + override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) { outputsMerged += 1 - val iter = taskResult.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue) + taskResult.foreach { case (key, value) => + sums.changeValue(key, value, _ + value) } } override def currentResult(): Map[T, BoundedDouble] = { if (outputsMerged == totalOutputs) { val result = new JHashMap[T, BoundedDouble](sums.size) - val iter = sums.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - val sum = entry.getLongValue() - result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum) + sums.foreach { case (key, sum) => + result(key) = new BoundedDouble(sum, 1.0, sum, sum) } result } else if (outputsMerged == 0) { @@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou val p = outputsMerged.toDouble / totalOutputs val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) val result = new JHashMap[T, BoundedDouble](sums.size) - val iter = sums.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - val sum = entry.getLongValue + sums.foreach { case (key, sum) => val mean = (sum + 1 - p) / p val variance = (sum + 1) * (1 - p) / (p * p) val stdev = math.sqrt(variance) val low = mean - confFactor * stdev val high = mean + confFactor * stdev - result(entry.getKey) = new BoundedDouble(mean, confidence, low, high) + result(key) = new BoundedDouble(mean, confidence, low, high) } result } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3437b2cac1..891efccf23 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -20,12 +20,10 @@ package org.apache.spark.rdd import java.util.Random import scala.collection.Map -import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLog -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -43,6 +41,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} +import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} /** @@ -834,24 +833,24 @@ abstract class RDD[T: ClassTag]( throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. - def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = { - val map = new OLMap[T] - while (iter.hasNext) { - val v = iter.next() - map.put(v, map.getLong(v) + 1L) + def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = { + val map = new OpenHashMap[T,Long] + iter.foreach { + t => map.changeValue(t, 1L, _ + 1L) } Iterator(map) } - def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = { - val iter = m2.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) + def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = { + m2.foreach { case (key, value) => + m1.changeValue(key, value, _ + value) } m1 } val myResult = mapPartitions(countPartition).reduce(mergeMaps) - myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map + // Convert to a Scala mutable map + val mutableResult = scala.collection.mutable.Map[T,Long]() + myResult.foreach { case (k, v) => mutableResult.put(k, v) } + mutableResult } /** @@ -866,11 +865,10 @@ abstract class RDD[T: ClassTag]( if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } - val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) => - val map = new OLMap[T] - while (iter.hasNext) { - val v = iter.next() - map.put(v, map.getLong(v) + 1L) + val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) => + val map = new OpenHashMap[T,Long] + iter.foreach { + t => map.changeValue(t, 1L, _ + 1L) } map } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index b03665fd56..f868e772cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -17,11 +17,10 @@ package org.apache.spark.scheduler -import java.io.InputStream +import java.io.{BufferedInputStream, InputStream} import scala.io.Source -import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.hadoop.fs.{Path, FileSystem} import org.json4s.jackson.JsonMethods._ @@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus( var currentLine = "" try { fileStream = Some(fileSystem.open(path)) - bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) + bufferedStream = Some(new BufferedInputStream(fileStream.get)) compressStream = Some(wrapForCompression(bufferedStream.get)) // Parse each line as an event and post the event to all attached listeners diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index b85b4a50cd..a8bcb7dfe2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -17,13 +17,11 @@ package org.apache.spark.scheduler -import java.io.{DataInputStream, DataOutputStream} +import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.nio.ByteBuffer import scala.collection.mutable.HashMap -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.TaskContext import org.apache.spark.executor.TaskMetrics import org.apache.spark.serializer.SerializerInstance @@ -104,7 +102,7 @@ private[spark] object Task { serializer: SerializerInstance) : ByteBuffer = { - val out = new FastByteArrayOutputStream(4096) + val out = new ByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) // Write currentFiles @@ -125,8 +123,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task).array() out.write(taskBytes) - out.trim() - ByteBuffer.wrap(out.array) + ByteBuffer.wrap(out.toByteArray) } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 9f04dc6e42..f2c8f9b621 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -17,11 +17,9 @@ package org.apache.spark.serializer -import java.io.{EOFException, InputStream, OutputStream} +import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream} import java.nio.ByteBuffer -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.SparkEnv import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.{ByteBufferInputStream, NextIterator} @@ -73,10 +71,9 @@ trait SerializerInstance { def serializeMany[T](iterator: Iterator[T]): ByteBuffer = { // Default implementation uses serializeStream - val stream = new FastByteArrayOutputStream() + val stream = new ByteArrayOutputStream() serializeStream(stream).writeAll(iterator) - val buffer = ByteBuffer.allocate(stream.position.toInt) - buffer.put(stream.array, 0, stream.position.toInt) + val buffer = ByteBuffer.wrap(stream.toByteArray) buffer.flip() buffer } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index df9bb4044e..f14017051f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, OutputStream} +import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -26,7 +26,6 @@ import scala.concurrent.duration._ import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} -import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException} @@ -992,7 +991,7 @@ private[spark] class BlockManager( outputStream: OutputStream, values: Iterator[Any], serializer: Serializer = defaultSerializer) { - val byteStream = new FastBufferedOutputStream(outputStream) + val byteStream = new BufferedOutputStream(outputStream) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } @@ -1002,10 +1001,9 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { - val byteStream = new FastByteArrayOutputStream(4096) + val byteStream = new ByteArrayOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) - byteStream.trim() - ByteBuffer.wrap(byteStream.array) + ByteBuffer.wrap(byteStream.toByteArray) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 696b930a26..a2687e6be4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -17,11 +17,9 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, File, OutputStream} +import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream} import java.nio.channels.FileChannel -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - import org.apache.spark.Logging import org.apache.spark.serializer.{SerializationStream, Serializer} @@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter( ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() lastValidPosition = initialPosition - bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) + bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true this diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 0080a8b342..68a12e8ed6 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -17,12 +17,11 @@ package org.apache.spark.util -import java.io._ +import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException} import java.net.URI import java.text.SimpleDateFormat import java.util.Date -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} @@ -100,7 +99,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - val bstream = new FastBufferedOutputStream(dstream, outputBufferSize) + val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) } diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index b955612ca7..0846557530 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer -import it.unimi.dsi.fastutil.ints.IntOpenHashSet - import org.apache.spark.Logging +import org.apache.spark.util.collection.OpenHashSet /** * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in @@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging { // Estimate the size of a large array by sampling elements without replacement. var size = 0.0 val rand = new Random(42) - val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE) + val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE) for (i <- 0 until ARRAY_SAMPLE_SIZE) { var index = 0 do { diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 025492b177..ad38250ad3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -19,6 +19,8 @@ package org.apache.spark.util.collection import java.util.{Arrays, Comparator} +import com.google.common.hash.Hashing + import org.apache.spark.annotation.DeveloperApi /** @@ -199,11 +201,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. - * We use the Murmur Hash 3 finalization step that's also used in fastutil. */ - private def rehash(h: Int): Int = { - it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) - } + private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() /** Double the table's size and re-hash everything */ protected def growTable() { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index dd01ae821f..d615767284 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -17,14 +17,13 @@ package org.apache.spark.util.collection -import java.io._ +import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} import java.util.Comparator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.io.ByteStreams -import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi @@ -350,7 +349,7 @@ class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { private val fileStream = new FileInputStream(file) - private val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) + private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 62f99f3981..b8de4ff9aa 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -30,7 +30,8 @@ import org.apache.spark.annotation.DeveloperApi * Under the hood, it uses our OpenHashSet implementation. */ @DeveloperApi -class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( +private[spark] +class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)] with Serializable { diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 148c12e64d..19af4f8cbe 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import scala.reflect._ +import com.google.common.hash.Hashing /** * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never @@ -256,9 +257,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. - * We use the Murmur Hash 3 finalization step that's also used in fastutil. */ - private def hashcode(h: Int): Int = it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) + private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() private def nextPowerOf2(n: Int): Int = { val highBit = Integer.highestOneBit(n) diff --git a/pom.xml b/pom.xml index c03bb35c99..5f66cbe768 100644 --- a/pom.xml +++ b/pom.xml @@ -348,11 +348,6 @@ - - it.unimi.dsi - fastutil - 6.4.4 - colt colt diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 21163760e6..a6058bba3d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -331,7 +331,6 @@ object SparkBuild extends Build { "org.spark-project.akka" %% "akka-slf4j" % akkaVersion excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-testkit" % akkaVersion % "test", "org.json4s" %% "json4s-jackson" % "3.2.6" excludeAll(excludeScalap), - "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "org.apache.mesos" % "mesos" % "0.13.0", "commons-net" % "commons-net" % "2.2", diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index bd1df55cf7..bbf57ef927 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -19,18 +19,17 @@ package org.apache.spark.streaming.util import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import org.apache.spark.util.collection.OpenHashMap import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap - * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + /** + * Splits lines and counts the words. */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - val map = new OLMap[String] + val map = new OpenHashMap[String,Long] var i = 0 var j = 0 while (iter.hasNext) { @@ -43,14 +42,16 @@ object RawTextHelper { } if (j > i) { val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) + map.changeValue(w, 1L, _ + 1L) } i = j while (i < s.length && s.charAt(i) == ' ') { i += 1 } } + map.toIterator.map { + case (k, v) => (k, v) + } } map.toIterator.map{case (k, v) => (k, v)} } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 684b38e8b3..a7850812bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,12 @@ package org.apache.spark.streaming.util -import java.io.IOException +import java.io.{ByteArrayOutputStream, IOException} import java.net.ServerSocket import java.nio.ByteBuffer import scala.io.Source -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.IntParam @@ -45,16 +43,15 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 - while (bufferStream.position < blockSize) { + while (bufferStream.size < blockSize) { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - bufferStream.trim() - val array = bufferStream.array + val array = bufferStream.toByteArray val countBuf = ByteBuffer.wrap(new Array[Byte](4)) countBuf.putInt(array.length) -- cgit v1.2.3