diff options
author | Sean Owen <sowen@cloudera.com> | 2014-04-11 22:46:47 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-11 22:46:47 -0700 |
commit | 165e06a74c3d75e6b7341c120943add8b035b96a (patch) | |
tree | 407aa7d706a64eb45f7e4d02f3f6db640b130b28 /streaming/src | |
parent | aa8bb117a3ff98420ab751ba4ddbaad88ab57f9d (diff) | |
download | spark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.gz spark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.bz2 spark-165e06a74c3d75e6b7341c120943add8b035b96a.zip |
SPARK-1057 (alternative) Remove fastutil
(This is for discussion at this point -- I'm not suggesting this should be committed.)
This is what removing fastutil looks like. Much of it is straightforward, like using `java.io` buffered stream classes, and Guava for murmurhash3.
Uses of the `FastByteArrayOutputStream` were a little trickier. In only one case though do I think the change to use `java.io` actually entails an extra array copy.
The rest is using `OpenHashMap` and `OpenHashSet`. These are now written in terms of more scala-like operations.
`OpenHashMap` is where I made three non-trivial changes to make it work, and they need review:
- It is no longer private
- The key must be a `ClassTag`
- Unless a lot of other code changes, the key type can't enforce being a supertype of `Null`
It all works and tests pass, and I think there is reason to believe it's OK from a speed perspective.
But what about those last changes?
Author: Sean Owen <sowen@cloudera.com>
Closes #266 from srowen/SPARK-1057-alternate and squashes the following commits:
2601129 [Sean Owen] Fix Map return type error not previously caught
ec65502 [Sean Owen] Updates from matei's review
00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala | 15 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala | 11 |
2 files changed, 12 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index bd1df55cf7..bbf57ef927 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -19,18 +19,17 @@ package org.apache.spark.streaming.util import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import org.apache.spark.util.collection.OpenHashMap import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap - * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + /** + * Splits lines and counts the words. */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - val map = new OLMap[String] + val map = new OpenHashMap[String,Long] var i = 0 var j = 0 while (iter.hasNext) { @@ -43,14 +42,16 @@ object RawTextHelper { } if (j > i) { val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) + map.changeValue(w, 1L, _ + 1L) } i = j while (i < s.length && s.charAt(i) == ' ') { i += 1 } } + map.toIterator.map { + case (k, v) => (k, v) + } } map.toIterator.map{case (k, v) => (k, v)} } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 684b38e8b3..a7850812bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,12 @@ package org.apache.spark.streaming.util -import java.io.IOException +import java.io.{ByteArrayOutputStream, IOException} import java.net.ServerSocket import java.nio.ByteBuffer import scala.io.Source -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.IntParam @@ -45,16 +43,15 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 - while (bufferStream.position < blockSize) { + while (bufferStream.size < blockSize) { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - bufferStream.trim() - val array = bufferStream.array + val array = bufferStream.toByteArray val countBuf = ByteBuffer.wrap(new Array[Byte](4)) countBuf.putInt(array.length) |