diff options
author | Dmitriy Lyubimov <dlyubimov@apache.org> | 2013-07-30 16:24:23 -0700 |
---|---|---|
committer | Dmitriy Lyubimov <dlyubimov@apache.org> | 2013-07-30 16:24:23 -0700 |
commit | ef9529a9431167e47d2391d84018072f4c75f982 (patch) | |
tree | 1b71fd9cfefd7d871de5dac5a94f4024233e8da4 /core | |
parent | 43394b9a6d37bb3e4b5ebce9db2160e2a35bb279 (diff) | |
download | spark-ef9529a9431167e47d2391d84018072f4c75f982.tar.gz spark-ef9529a9431167e47d2391d84018072f4c75f982.tar.bz2 spark-ef9529a9431167e47d2391d84018072f4c75f982.zip |
refactoring using writeByteBuffer() from Utils.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/TaskResult.scala | 10 |
3 files changed, 17 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index e6a96a5ec1..9875bc1a56 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import spark.serializer.SerializerInstance import spark.deploy.SparkHadoopUtil +import java.nio.ByteBuffer /** @@ -68,6 +69,19 @@ private object Utils extends Logging { return ois.readObject.asInstanceOf[T] } + /** + * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}. + */ + def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = { + if (bb.hasArray) { + out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) + } else { + val bbval = new Array[Byte](bb.remaining()) + bb.get(bbval) + out.write(bbval) + } + } + def isAlpha(c: Char): Boolean = { (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') } diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala index dfeb7be077..19f1e9b4f9 100644 --- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala @@ -54,13 +54,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest]( values.foreach(x => { val bb = ser.serialize(x) out.writeInt(bb.remaining()) - if (bb.hasArray) { - out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) - } else { - val b = new Array[Byte](bb.remaining()) - bb.get(b) - out.write(b) - } + Utils.writeByteBuffer(bb, out) }) } diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala index 7016ece40e..89793e0e82 100644 --- a/core/src/main/scala/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/spark/scheduler/TaskResult.scala @@ -21,7 +21,7 @@ import java.io._ import scala.collection.mutable.Map import spark.executor.TaskMetrics -import spark.SparkEnv +import spark.{Utils, SparkEnv} import java.nio.ByteBuffer // Task result. Also contains updates to accumulator variables. @@ -37,13 +37,7 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: val bb = objectSer.serialize(value) out.writeInt(bb.remaining()) - if (bb.hasArray) { - out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) - } else { - val bbval = new Array[Byte](bb.remaining()) - bb.get(bbval) - out.write(bbval) - } + Utils.writeByteBuffer(bb, out) out.writeInt(accumUpdates.size) for ((key, value) <- accumUpdates) { |