aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDmitriy Lyubimov <dlyubimov@apache.org>2013-07-30 16:24:23 -0700
committerDmitriy Lyubimov <dlyubimov@apache.org>2013-07-30 16:24:23 -0700
commitef9529a9431167e47d2391d84018072f4c75f982 (patch)
tree1b71fd9cfefd7d871de5dac5a94f4024233e8da4 /core
parent43394b9a6d37bb3e4b5ebce9db2160e2a35bb279 (diff)
downloadspark-ef9529a9431167e47d2391d84018072f4c75f982.tar.gz
spark-ef9529a9431167e47d2391d84018072f4c75f982.tar.bz2
spark-ef9529a9431167e47d2391d84018072f4c75f982.zip
refactoring using writeByteBuffer() from Utils.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala14
-rw-r--r--core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/TaskResult.scala10
3 files changed, 17 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index e6a96a5ec1..9875bc1a56 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import spark.serializer.SerializerInstance
import spark.deploy.SparkHadoopUtil
+import java.nio.ByteBuffer
/**
@@ -68,6 +69,19 @@ private object Utils extends Logging {
return ois.readObject.asInstanceOf[T]
}
+ /**
+ * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+ */
+ def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
+ if (bb.hasArray) {
+ out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+ } else {
+ val bbval = new Array[Byte](bb.remaining())
+ bb.get(bbval)
+ out.write(bbval)
+ }
+ }
+
def isAlpha(c: Char): Boolean = {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
}
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index dfeb7be077..19f1e9b4f9 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -54,13 +54,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
values.foreach(x => {
val bb = ser.serialize(x)
out.writeInt(bb.remaining())
- if (bb.hasArray) {
- out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
- } else {
- val b = new Array[Byte](bb.remaining())
- bb.get(b)
- out.write(b)
- }
+ Utils.writeByteBuffer(bb, out)
})
}
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index 7016ece40e..89793e0e82 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -21,7 +21,7 @@ import java.io._
import scala.collection.mutable.Map
import spark.executor.TaskMetrics
-import spark.SparkEnv
+import spark.{Utils, SparkEnv}
import java.nio.ByteBuffer
// Task result. Also contains updates to accumulator variables.
@@ -37,13 +37,7 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
val bb = objectSer.serialize(value)
out.writeInt(bb.remaining())
- if (bb.hasArray) {
- out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
- } else {
- val bbval = new Array[Byte](bb.remaining())
- bb.get(bbval)
- out.write(bbval)
- }
+ Utils.writeByteBuffer(bb, out)
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {