aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJongyoul Lee <jongyoul@gmail.com>2015-01-20 10:17:29 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-20 10:18:10 -0800
commit9d9294aebf7208a76f43d8fc5a0489a83d7215f4 (patch)
treeaef14f5d3841df7d7b6d3d2b3f6a796ca725fdb2 /core
parent4afad9c7702239f6d5b1b49dc48ee08580964e17 (diff)
downloadspark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.tar.gz
spark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.tar.bz2
spark-9d9294aebf7208a76f43d8fc5a0489a83d7215f4.zip
[SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException
- Rewind ByteBuffer before making ByteString (This fixes a bug introduced in #3849 / SPARK-4014) Author: Jongyoul Lee <jongyoul@gmail.com> Closes #4119 from jongyoul/SPARK-5333 and squashes the following commits: c6693a8 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - changed logDebug location 4141f58 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Added license information 2190606 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Adjusted imported libraries b7f5517 [Jongyoul Lee] [SPARK-5333][Mesos] MesosTaskLaunchData occurs BufferUnderflowException - Rewind ByteBuffer before making ByteString
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala38
2 files changed, 45 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
index 4416ce92ad..5e7e6567a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
@@ -21,24 +21,29 @@ import java.nio.ByteBuffer
import org.apache.mesos.protobuf.ByteString
+import org.apache.spark.Logging
+
/**
* Wrapper for serializing the data sent when launching Mesos tasks.
*/
private[spark] case class MesosTaskLaunchData(
serializedTask: ByteBuffer,
- attemptNumber: Int) {
+ attemptNumber: Int) extends Logging {
def toByteString: ByteString = {
val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
dataBuffer.putInt(attemptNumber)
dataBuffer.put(serializedTask)
+ dataBuffer.rewind
+ logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
ByteString.copyFrom(dataBuffer)
}
}
-private[spark] object MesosTaskLaunchData {
+private[spark] object MesosTaskLaunchData extends Logging {
def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
val byteBuffer = byteString.asReadOnlyByteBuffer()
+ logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
val serializedTask = byteBuffer.slice() // subsequence starting at the current position
MesosTaskLaunchData(serializedTask, attemptNumber)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala
new file mode 100644
index 0000000000..86a42a7398
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosTaskLaunchDataSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.mesos
+
+import java.nio.ByteBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
+
+class MesosTaskLaunchDataSuite extends FunSuite {
+ test("serialize and deserialize data must be same") {
+ val serializedTask = ByteBuffer.allocate(40)
+ (Range(100, 110).map(serializedTask.putInt(_)))
+ serializedTask.rewind
+ val attemptNumber = 100
+ val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
+ serializedTask.rewind
+ val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
+ assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
+ assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
+ }
+}