diff options
author | Jongyoul Lee <jongyoul@gmail.com> | 2015-01-20 10:17:29 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-01-20 10:18:10 -0800 |
commit | 9d9294aebf7208a76f43d8fc5a0489a83d7215f4 (patch) | |
tree | aef14f5d3841df7d7b6d3d2b3f6a796ca725fdb2 | |
parent | 4afad9c7702239f6d5b1b49dc48ee08580964e17 (diff) | |
download | spark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.tar.gz spark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.tar.bz2 spark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.zip |
[SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException
- Rewind ByteBuffer before making ByteString
(This fixes a bug introduced in #3849 / SPARK-4014)
Author: Jongyoul Lee <jongyoul@gmail.com>
Closes #4119 from jongyoul/SPARK-5333 and squashes the following commits:
c6693a8 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - changed logDebug location
4141f58 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Added license information
2190606 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Adjusted imported libraries
b7f5517 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala | 9 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala | 38 |
2 files changed, 45 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala index 4416ce92ad..5e7e6567a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala @@ -21,24 +21,29 @@ import java.nio.ByteBuffer import org.apache.mesos.protobuf.ByteString +import org.apache.spark.Logging + /** * Wrapper for serializing the data sent when launching Mesos tasks. */ private[spark] case class MesosTaskLaunchData( serializedTask: ByteBuffer, - attemptNumber: Int) { + attemptNumber: Int) extends Logging { def toByteString: ByteString = { val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) dataBuffer.putInt(attemptNumber) dataBuffer.put(serializedTask) + dataBuffer.rewind + logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]") ByteString.copyFrom(dataBuffer) } } -private[spark] object MesosTaskLaunchData { +private[spark] object MesosTaskLaunchData extends Logging { def fromByteString(byteString: ByteString): MesosTaskLaunchData = { val byteBuffer = byteString.asReadOnlyByteBuffer() + logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]") val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes val serializedTask = byteBuffer.slice() // subsequence starting at the current position MesosTaskLaunchData(serializedTask, attemptNumber) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala new file mode 100644 index 0000000000..86a42a7398 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import java.nio.ByteBuffer + +import org.scalatest.FunSuite + +import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData + +class MesosTaskLaunchDataSuite extends FunSuite { + test("serialize and deserialize data must be same") { + val serializedTask = ByteBuffer.allocate(40) + (Range(100, 110).map(serializedTask.putInt(_))) + serializedTask.rewind + val attemptNumber = 100 + val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString + serializedTask.rewind + val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) + assert(mesosTaskLaunchData.attemptNumber == attemptNumber) + assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) + } +} |