aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-19 20:35:43 -0700
committerReynold Xin <rxin@databricks.com>2015-04-19 20:35:43 -0700
commitd8e1b7b06c499289ff3ce5ec91ff354493a17c48 (patch)
treeb96a02111b44e147767e28f233ee323fa786ae87 /streaming
parentfa73da024000386eecef79573e8ac96d6f05b2c7 (diff)
downloadspark-d8e1b7b06c499289ff3ce5ec91ff354493a17c48.tar.gz
spark-d8e1b7b06c499289ff3ce5ec91ff354493a17c48.tar.bz2
spark-d8e1b7b06c499289ff3ce5ec91ff354493a17c48.zip
[SPARK-6983][Streaming] Update ReceiverTrackerActor to use the new Rpc interface
A subtask of [SPARK-5293](https://issues.apache.org/jira/browse/SPARK-5293) Author: zsxwing <zsxwing@gmail.com> Closes #5557 from zsxwing/SPARK-6983 and squashes the following commits: e777e9f [zsxwing] Update ReceiverTrackerActor to use the new Rpc interface
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala52
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala64
3 files changed, 51 insertions, 69 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8f2f1fef76..89af40330b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -21,18 +21,16 @@ import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Await
-import akka.actor.{ActorRef, Actor, Props}
-import akka.pattern.ask
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkEnv, SparkException}
+import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
@@ -63,37 +61,23 @@ private[streaming] class ReceiverSupervisorImpl(
}
- /** Remote Akka actor for the ReceiverTracker */
- private val trackerActor = {
- val ip = env.conf.get("spark.driver.host", "localhost")
- val port = env.conf.getInt("spark.driver.port", 7077)
- val url = AkkaUtils.address(
- AkkaUtils.protocol(env.actorSystem),
- SparkEnv.driverActorSystemName,
- ip,
- port,
- "ReceiverTracker")
- env.actorSystem.actorSelection(url)
- }
-
- /** Timeout for Akka actor messages */
- private val askTimeout = AkkaUtils.askTimeout(env.conf)
+ /** Remote RpcEndpointRef for the ReceiverTracker */
+ private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
- /** Akka actor for receiving messages from the ReceiverTracker in the driver */
- private val actor = env.actorSystem.actorOf(
- Props(new Actor {
+ /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
+ private val endpoint = env.rpcEnv.setupEndpoint(
+ "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
+ override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
- stop("Stopped by driver", None)
+ ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
}
-
- def ref: ActorRef = self
- }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
+ })
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())
@@ -162,15 +146,14 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
- val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
- Await.result(future, askTimeout)
+ trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
/** Report error to the receiver tracker */
def reportError(message: String, error: Throwable) {
val errorString = Option(error).map(Throwables.getStackTraceAsString).getOrElse("")
- trackerActor ! ReportError(streamId, message, errorString)
+ trackerEndpoint.send(ReportError(streamId, message, errorString))
logWarning("Reported error " + message + " - " + error)
}
@@ -180,22 +163,19 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onStop(message: String, error: Option[Throwable]) {
blockGenerator.stop()
- env.actorSystem.stop(actor)
+ env.rpcEnv.stop(endpoint)
}
override protected def onReceiverStart() {
val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
+ streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
+ trackerEndpoint.askWithReply[Boolean](msg)
}
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
- val future = trackerActor.ask(
- DeregisterReceiver(streamId, message, errorString))(askTimeout)
- Await.result(future, askTimeout)
+ trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
index d7e39c528c..52f08b9c9d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala
@@ -17,8 +17,8 @@
package org.apache.spark.streaming.scheduler
-import akka.actor.ActorRef
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rpc.RpcEndpointRef
/**
* :: DeveloperApi ::
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
case class ReceiverInfo(
streamId: Int,
name: String,
- private[streaming] val actor: ActorRef,
+ private[streaming] val endpoint: RpcEndpointRef,
active: Boolean,
location: String,
lastErrorMessage: String = "",
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 9890047313..c4ead6f30a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,13 +17,11 @@
package org.apache.spark.streaming.scheduler
-
import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.language.existentials
-import akka.actor._
-
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
+import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
@@ -36,7 +34,7 @@ private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
host: String,
- receiverActor: ActorRef
+ receiverEndpoint: RpcEndpointRef
) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends ReceiverTrackerMessage
@@ -67,19 +65,19 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
)
private val listenerBus = ssc.scheduler.listenerBus
- // actor is created when generator starts.
+ // endpoint is created when generator starts.
// This not being null means the tracker has been started and not stopped
- private var actor: ActorRef = null
+ private var endpoint: RpcEndpointRef = null
- /** Start the actor and receiver execution thread. */
+ /** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
- if (actor != null) {
+ if (endpoint != null) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
- actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
- "ReceiverTracker")
+ endpoint = ssc.env.rpcEnv.setupEndpoint(
+ "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) receiverExecutor.start()
logInfo("ReceiverTracker started")
}
@@ -87,13 +85,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Stop the receiver execution thread. */
def stop(graceful: Boolean): Unit = synchronized {
- if (!receiverInputStreams.isEmpty && actor != null) {
+ if (!receiverInputStreams.isEmpty && endpoint != null) {
// First, stop the receivers
if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
- // Finally, stop the actor
- ssc.env.actorSystem.stop(actor)
- actor = null
+ // Finally, stop the endpoint
+ ssc.env.rpcEnv.stop(endpoint)
+ endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
}
@@ -129,8 +127,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Signal the receivers to delete old block data
if (ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
- receiverInfo.values.flatMap { info => Option(info.actor) }
- .foreach { _ ! CleanupOldBlocks(cleanupThreshTime) }
+ receiverInfo.values.flatMap { info => Option(info.endpoint) }
+ .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }
}
}
@@ -139,23 +137,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
streamId: Int,
typ: String,
host: String,
- receiverActor: ActorRef,
- sender: ActorRef
+ receiverEndpoint: RpcEndpointRef,
+ senderAddress: RpcAddress
) {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
receiverInfo(streamId) = ReceiverInfo(
- streamId, s"${typ}-${streamId}", receiverActor, true, host)
+ streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
- logInfo("Registered receiver for stream " + streamId + " from " + sender.path.address)
+ logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
}
/** Deregister a receiver */
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
- oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
+ oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message, lastError = error)
case None =>
logWarning("No prior receiver info")
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
@@ -199,19 +197,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
receivedBlockTracker.hasUnallocatedReceivedBlocks
}
- /** Actor to receive messages from the receivers. */
- private class ReceiverTrackerActor extends Actor {
+ /** RpcEndpoint to receive messages from the receivers. */
+ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+
override def receive: PartialFunction[Any, Unit] = {
- case RegisterReceiver(streamId, typ, host, receiverActor) =>
- registerReceiver(streamId, typ, host, receiverActor, sender)
- sender ! true
- case AddBlock(receivedBlockInfo) =>
- sender ! addBlock(receivedBlockInfo)
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
+ }
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case RegisterReceiver(streamId, typ, host, receiverEndpoint) =>
+ registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
+ context.reply(true)
+ case AddBlock(receivedBlockInfo) =>
+ context.reply(addBlock(receivedBlockInfo))
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
- sender ! true
+ context.reply(true)
}
}
@@ -314,8 +316,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/** Stops the receivers. */
private def stopReceivers() {
// Signal the receivers to stop
- receiverInfo.values.flatMap { info => Option(info.actor)}
- .foreach { _ ! StopReceiver }
+ receiverInfo.values.flatMap { info => Option(info.endpoint)}
+ .foreach { _.send(StopReceiver) }
logInfo("Sent stop signal to all " + receiverInfo.size + " receivers")
}
}