aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala11
2 files changed, 12 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index bd1df55cf7..bbf57ef927 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -19,18 +19,17 @@ package org.apache.spark.streaming.util
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import org.apache.spark.util.collection.OpenHashMap
import scala.collection.JavaConversions.mapAsScalaMap
private[streaming]
object RawTextHelper {
- /**
- * Splits lines and counts the words in them using specialized object-to-long hashmap
- * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ /**
+ * Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- val map = new OLMap[String]
+ val map = new OpenHashMap[String,Long]
var i = 0
var j = 0
while (iter.hasNext) {
@@ -43,14 +42,16 @@ object RawTextHelper {
}
if (j > i) {
val w = s.substring(i, j)
- val c = map.getLong(w)
- map.put(w, c + 1)
+ map.changeValue(w, 1L, _ + 1L)
}
i = j
while (i < s.length && s.charAt(i) == ' ') {
i += 1
}
}
+ map.toIterator.map {
+ case (k, v) => (k, v)
+ }
}
map.toIterator.map{case (k, v) => (k, v)}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 684b38e8b3..a7850812bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,12 @@
package org.apache.spark.streaming.util
-import java.io.IOException
+import java.io.{ByteArrayOutputStream, IOException}
import java.net.ServerSocket
import java.nio.ByteBuffer
import scala.io.Source
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.IntParam
@@ -45,16 +43,15 @@ object RawTextSender extends Logging {
// Repeat the input data multiple times to fill in a buffer
val lines = Source.fromFile(file).getLines().toArray
- val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
+ val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
val ser = new KryoSerializer(new SparkConf()).newInstance()
val serStream = ser.serializeStream(bufferStream)
var i = 0
- while (bufferStream.position < blockSize) {
+ while (bufferStream.size < blockSize) {
serStream.writeObject(lines(i))
i = (i + 1) % lines.length
}
- bufferStream.trim()
- val array = bufferStream.array
+ val array = bufferStream.toByteArray
val countBuf = ByteBuffer.wrap(new Array[Byte](4))
countBuf.putInt(array.length)