diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala | 15 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala | 11 |
2 files changed, 12 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index bd1df55cf7..bbf57ef927 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -19,18 +19,17 @@ package org.apache.spark.streaming.util import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import org.apache.spark.util.collection.OpenHashMap import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap - * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + /** + * Splits lines and counts the words. */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - val map = new OLMap[String] + val map = new OpenHashMap[String,Long] var i = 0 var j = 0 while (iter.hasNext) { @@ -43,14 +42,16 @@ object RawTextHelper { } if (j > i) { val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) + map.changeValue(w, 1L, _ + 1L) } i = j while (i < s.length && s.charAt(i) == ' ') { i += 1 } } + map.toIterator.map { + case (k, v) => (k, v) + } } map.toIterator.map{case (k, v) => (k, v)} } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 684b38e8b3..a7850812bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,12 @@ package org.apache.spark.streaming.util -import java.io.IOException +import java.io.{ByteArrayOutputStream, IOException} import java.net.ServerSocket import java.nio.ByteBuffer import scala.io.Source -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.IntParam @@ -45,16 +43,15 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 - while (bufferStream.position < blockSize) { + while (bufferStream.size < blockSize) { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - bufferStream.trim() - val array = bufferStream.array + val array = bufferStream.toByteArray val countBuf = ByteBuffer.wrap(new Array[Byte](4)) countBuf.putInt(array.length) |