diff options
4 files changed, 52 insertions, 70 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index f2c1c86af7..cba038ca35 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -258,7 +258,7 @@ private[spark] trait RpcEndpoint { final def stop(): Unit = { val _self = self if (_self != null) { - rpcEnv.stop(self) + rpcEnv.stop(_self) } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 8f2f1fef76..89af40330b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -21,18 +21,16 @@ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Await -import akka.actor.{ActorRef, Actor, Props} -import akka.pattern.ask import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.Time import org.apache.spark.streaming.scheduler._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{RpcUtils, Utils} /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] @@ -63,37 +61,23 @@ private[streaming] class ReceiverSupervisorImpl( } - /** Remote Akka actor for the ReceiverTracker */ - private val trackerActor = { - val ip = env.conf.get("spark.driver.host", "localhost") - val port = env.conf.getInt("spark.driver.port", 7077) - val url = AkkaUtils.address( - AkkaUtils.protocol(env.actorSystem), - SparkEnv.driverActorSystemName, - ip, - port, - "ReceiverTracker") - env.actorSystem.actorSelection(url) - } - - /** Timeout for Akka actor messages */ - private val askTimeout = AkkaUtils.askTimeout(env.conf) + /** Remote RpcEndpointRef for the ReceiverTracker */ + private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv) - /** Akka actor for receiving messages from the ReceiverTracker in the driver */ - private val actor = env.actorSystem.actorOf( - Props(new Actor { + /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */ + private val endpoint = env.rpcEnv.setupEndpoint( + "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { + override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") - stop("Stopped by driver", None) + ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) } - - def ref: ActorRef = self - }), "Receiver-" + streamId + "-" + System.currentTimeMillis()) + }) /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) @@ -162,15 +146,14 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) - val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) - Await.result(future, askTimeout) + trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } /** Report error to the receiver tracker */ def reportError(message: String, error: Throwable) { val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("") - trackerActor ! ReportError(streamId, message, errorString) + trackerEndpoint.send(ReportError(streamId, message, errorString)) logWarning("Reported error " + message + " - " + error) } @@ -180,22 +163,19 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onStop(message: String, error: Option[Throwable]) { blockGenerator.stop() - env.actorSystem.stop(actor) + env.rpcEnv.stop(endpoint) } override protected def onReceiverStart() { val msg = RegisterReceiver( - streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) - val future = trackerActor.ask(msg)(askTimeout) - Await.result(future, askTimeout) + streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint) + trackerEndpoint.askWithReply[Boolean](msg) } override protected def onReceiverStop(message: String, error: Option[Throwable]) { logInfo("Deregistering receiver " + streamId) val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("") - val future = trackerActor.ask( - DeregisterReceiver(streamId, message, errorString))(askTimeout) - Await.result(future, askTimeout) + trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString)) logInfo("Stopped receiver " + streamId) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index d7e39c528c..52f08b9c9d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -17,8 +17,8 @@ package org.apache.spark.streaming.scheduler -import akka.actor.ActorRef import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rpc.RpcEndpointRef /** * :: DeveloperApi :: @@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi case class ReceiverInfo( streamId: Int, name: String, - private[streaming] val actor: ActorRef, + private[streaming] val endpoint: RpcEndpointRef, active: Boolean, location: String, lastErrorMessage: String = "", diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 9890047313..c4ead6f30a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -17,13 +17,11 @@ package org.apache.spark.streaming.scheduler - import scala.collection.mutable.{HashMap, SynchronizedMap} import scala.language.existentials -import akka.actor._ - import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException} +import org.apache.spark.rpc._ import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver} @@ -36,7 +34,7 @@ private[streaming] case class RegisterReceiver( streamId: Int, typ: String, host: String, - receiverActor: ActorRef + receiverEndpoint: RpcEndpointRef ) extends ReceiverTrackerMessage private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) extends ReceiverTrackerMessage @@ -67,19 +65,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ) private val listenerBus = ssc.scheduler.listenerBus - // actor is created when generator starts. + // endpoint is created when generator starts. // This not being null means the tracker has been started and not stopped - private var actor: ActorRef = null + private var endpoint: RpcEndpointRef = null - /** Start the actor and receiver execution thread. */ + /** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { - if (actor != null) { + if (endpoint != null) { throw new SparkException("ReceiverTracker already started") } if (!receiverInputStreams.isEmpty) { - actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), - "ReceiverTracker") + endpoint = ssc.env.rpcEnv.setupEndpoint( + "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) receiverExecutor.start() logInfo("ReceiverTracker started") } @@ -87,13 +85,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Stop the receiver execution thread. */ def stop(graceful: Boolean): Unit = synchronized { - if (!receiverInputStreams.isEmpty && actor != null) { + if (!receiverInputStreams.isEmpty && endpoint != null) { // First, stop the receivers if (!skipReceiverLaunch) receiverExecutor.stop(graceful) - // Finally, stop the actor - ssc.env.actorSystem.stop(actor) - actor = null + // Finally, stop the endpoint + ssc.env.rpcEnv.stop(endpoint) + endpoint = null receivedBlockTracker.stop() logInfo("ReceiverTracker stopped") } @@ -129,8 +127,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Signal the receivers to delete old block data if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { logInfo(s"Cleanup old received batch data: $cleanupThreshTime") - receiverInfo.values.flatMap { info => Option(info.actor) } - .foreach { _ ! CleanupOldBlocks(cleanupThreshTime) } + receiverInfo.values.flatMap { info => Option(info.endpoint) } + .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) } } } @@ -139,23 +137,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false streamId: Int, typ: String, host: String, - receiverActor: ActorRef, - sender: ActorRef + receiverEndpoint: RpcEndpointRef, + senderAddress: RpcAddress ) { if (!receiverInputStreamIds.contains(streamId)) { throw new SparkException("Register received for unexpected id " + streamId) } receiverInfo(streamId) = ReceiverInfo( - streamId, s"${typ}-${streamId}", receiverActor, true, host) + streamId, s"${typ}-${streamId}", receiverEndpoint, true, host) listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId))) - logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address) + logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) } /** Deregister a receiver */ private def deregisterReceiver(streamId: Int, message: String, error: String) { val newReceiverInfo = receiverInfo.get(streamId) match { case Some(oldInfo) => - oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error) + oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error) case None => logWarning("No prior receiver info") ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error) @@ -199,19 +197,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receivedBlockTracker.hasUnallocatedReceivedBlocks } - /** Actor to receive messages from the receivers. */ - private class ReceiverTrackerActor extends Actor { + /** RpcEndpoint to receive messages from the receivers. */ + private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { + override def receive: PartialFunction[Any, Unit] = { - case RegisterReceiver(streamId, typ, host, receiverActor) => - registerReceiver(streamId, typ, host, receiverActor, sender) - sender ! true - case AddBlock(receivedBlockInfo) => - sender ! addBlock(receivedBlockInfo) case ReportError(streamId, message, error) => reportError(streamId, message, error) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RegisterReceiver(streamId, typ, host, receiverEndpoint) => + registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address) + context.reply(true) + case AddBlock(receivedBlockInfo) => + context.reply(addBlock(receivedBlockInfo)) case DeregisterReceiver(streamId, message, error) => deregisterReceiver(streamId, message, error) - sender ! true + context.reply(true) } } @@ -314,8 +316,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Stops the receivers. */ private def stopReceivers() { // Signal the receivers to stop - receiverInfo.values.flatMap { info => Option(info.actor)} - .foreach { _ ! StopReceiver } + receiverInfo.values.flatMap { info => Option(info.endpoint)} + .foreach { _.send(StopReceiver) } logInfo("Sent stop signal to all " + receiverInfo.size + " receivers") } } |