aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-04-11 22:46:47 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-11 22:46:47 -0700
commit165e06a74c3d75e6b7341c120943add8b035b96a (patch)
tree407aa7d706a64eb45f7e4d02f3f6db640b130b28 /streaming
parentaa8bb117a3ff98420ab751ba4ddbaad88ab57f9d (diff)
downloadspark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.gz
spark-165e06a74c3d75e6b7341c120943add8b035b96a.tar.bz2
spark-165e06a74c3d75e6b7341c120943add8b035b96a.zip
SPARK-1057 (alternative) Remove fastutil
(This is for discussion at this point -- I'm not suggesting this should be committed.) This is what removing fastutil looks like. Much of it is straightforward, like using `java.io` buffered stream classes, and Guava for murmurhash3. Uses of the `FastByteArrayOutputStream` were a little trickier. In only one case though do I think the change to use `java.io` actually entails an extra array copy. The rest is using `OpenHashMap` and `OpenHashSet`. These are now written in terms of more scala-like operations. `OpenHashMap` is where I made three non-trivial changes to make it work, and they need review: - It is no longer private - The key must be a `ClassTag` - Unless a lot of other code changes, the key type can't enforce being a supertype of `Null` It all works and tests pass, and I think there is reason to believe it's OK from a speed perspective. But what about those last changes? Author: Sean Owen <sowen@cloudera.com> Closes #266 from srowen/SPARK-1057-alternate and squashes the following commits: 2601129 [Sean Owen] Fix Map return type error not previously caught ec65502 [Sean Owen] Updates from matei's review 00bc81e [Sean Owen] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala11
2 files changed, 12 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index bd1df55cf7..bbf57ef927 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -19,18 +19,17 @@ package org.apache.spark.streaming.util
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import org.apache.spark.util.collection.OpenHashMap
import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
- * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ /**
+ * Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- val map = new OLMap[String]
+ val map = new OpenHashMap[String,Long]
var i = 0
var j = 0
while (iter.hasNext) {
@@ -43,14 +42,16 @@ object RawTextHelper {
}
if (j > i) {
val w = s.substring(i, j)
- val c = map.getLong(w)
- map.put(w, c + 1)
+ map.changeValue(w, 1L, _ + 1L)
}
i = j
while (i < s.length && s.charAt(i) == ' ') {
i += 1
}
}
+ map.toIterator.map {
+ case (k, v) => (k, v)
+ }
}
map.toIterator.map{case (k, v) => (k, v)}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 684b38e8b3..a7850812bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,12 @@
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{ByteArrayOutputStream, IOException}
import java.net.ServerSocket
import java.nio.ByteBuffer
import scala.io.Source
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.IntParam
@@ -45,16 +43,15 @@ object RawTextSender extends Logging {
// Repeat the input data multiple times to fill in a buffer
val lines = Source.fromFile(file).getLines().toArray
- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
+ val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
var i = 0
- while (bufferStream.position < blockSize) {
+ while (bufferStream.size < blockSize) {
serStream.writeObject(lines(i))
i = (i + 1) % lines.length
}
- bufferStream.trim()
- val array = bufferStream.array
+ val array = bufferStream.toByteArray
val countBuf = ByteBuffer.wrap(new Array[Byte](4))
countBuf.putInt(array.length)