aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala9
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala2
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala51
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala23
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala36
5 files changed, 26 insertions, 95 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index ee9149ce02..b252539782 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{SparkConf, SparkEnv, TaskState}
import org.apache.spark.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
import org.apache.spark.util.Utils
private[spark] class MesosExecutorBackend
@@ -84,14 +85,12 @@ private[spark] class MesosExecutorBackend
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
- val taskId = taskInfo.getTaskId.getValue.toLong
- val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
+ val taskDescription = TaskDescription.decode(taskInfo.getData.asReadOnlyByteBuffer())
if (executor == null) {
logError("Received launchTask but executor was null")
} else {
SparkHadoopUtil.get.runAsSparkUser { () =>
- executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber,
- taskInfo.getName, taskData.serializedTask)
+ executor.launchTask(this, taskDescription)
}
}
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 779ffb5229..7e561916a7 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -351,7 +351,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setExecutor(executorInfo)
.setName(task.name)
.addAllResources(cpuResources.asJava)
- .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
+ .setData(ByteString.copyFrom(TaskDescription.encode(task)))
.build()
(taskInfo, finalResources.asJava)
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
deleted file mode 100644
index 8370b61145..0000000000
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.internal.Logging
-
-/**
- * Wrapper for serializing the data sent when launching Mesos tasks.
- */
-private[spark] case class MesosTaskLaunchData(
- serializedTask: ByteBuffer,
- attemptNumber: Int) extends Logging {
-
- def toByteString: ByteString = {
- val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
- dataBuffer.putInt(attemptNumber)
- dataBuffer.put(serializedTask)
- dataBuffer.rewind
- logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
- ByteString.copyFrom(dataBuffer)
- }
-}
-
-private[spark] object MesosTaskLaunchData extends Logging {
- def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
- val byteBuffer = byteString.asReadOnlyByteBuffer()
- logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
- val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
- val serializedTask = byteBuffer.slice() // subsequence starting at the current position
- MesosTaskLaunchData(serializedTask, attemptNumber)
- }
-}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 1d7a86f4b0..4ee85b9183 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util.Arrays
import java.util.Collection
import java.util.Collections
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -246,7 +247,16 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers.get(2).getHostname,
(minCpu - backend.mesosExecutorCores).toInt
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
@@ -345,7 +355,16 @@ class MesosFineGrainedSchedulerBackendSuite
2 // Deducting 1 for executor
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb335f..0000000000
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
- test("serialize and deserialize data must be same") {
- val serializedTask = ByteBuffer.allocate(40)
- (Range(100, 110).map(serializedTask.putInt(_)))
- serializedTask.rewind
- val attemptNumber = 100
- val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
- serializedTask.rewind
- val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
- assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
- assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
- }
-}