aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-12-07 19:16:35 -0800
committerPatrick Wendell <pwendell@gmail.com>2012-12-07 19:34:05 -0800
commit3e796bdd57297134ed40b20d7692cd9c8cd6efba (patch)
treef51b3f1c2ef11f3553f0c8ba131f9601eff9f2ff /streaming/src
parent3ff9710265d4bb518b89461cfb0fcc771e61a726 (diff)
downloadspark-3e796bdd57297134ed40b20d7692cd9c8cd6efba.tar.gz
spark-3e796bdd57297134ed40b20d7692cd9c8cd6efba.tar.bz2
spark-3e796bdd57297134ed40b20d7692cd9c8cd6efba.zip
Changes in response to TD's review.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala24
6 files changed, 29 insertions, 15 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
index 9c403278c3..2959ce4540 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
@@ -126,5 +126,5 @@ class FlumeReceiver(
logInfo("Flume receiver stopped")
}
- override def getLocationConstraint = Some(host)
+ override def getLocationPreference = Some(host)
} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
index 052fc8bb74..4e4e9fc942 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
@@ -62,8 +62,8 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
/** This method will be called to stop receiving data. */
protected def onStop()
- /** This method conveys a placement constraint (hostname) for this receiver. */
- def getLocationConstraint() : Option[String] = None
+ /** This method conveys a placement preference (hostname) for this receiver. */
+ def getLocationPreference() : Option[String] = None
/**
* This method starts the receiver. First is accesses all the lazy members to
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 56661c2615..b421f795ee 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -99,13 +99,13 @@ class NetworkInputTracker(
def startReceivers() {
val receivers = networkInputStreams.map(_.createReceiver())
- // We only honor constraints if all receivers have them
- val hasLocationConstraints = receivers.map(_.getLocationConstraint().isDefined).reduce(_ && _)
+ // Right now, we only honor preferences if all receivers have them
+ val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
val tempRDD =
- if (hasLocationConstraints) {
- val receiversWithConstraints = receivers.map(r => (r, Seq(r.getLocationConstraint().toString)))
- ssc.sc.makeLocalityConstrainedRDD[NetworkReceiver[_]](receiversWithConstraints)
+ if (hasLocationPreferences) {
+ val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
+ ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
}
else {
ssc.sc.makeRDD(receivers, receivers.size)
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index fd51ed47a5..6acaa9aab1 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -31,7 +31,7 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
var blockPushingThread: Thread = null
- override def getLocationConstraint = None
+ override def getLocationPreference = None
def onStart() {
// Open a socket to the target address and keep reading from it
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
index ebbb17a39a..a9e37c0ff0 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
@@ -34,7 +34,7 @@ class SocketReceiver[T: ClassManifest](
lazy protected val dataHandler = new DataHandler(this, storageLevel)
- override def getLocationConstraint = None
+ override def getLocationPreference = None
protected def onStart() {
logInfo("Connecting to " + host + ":" + port)
diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index d76c92fdd5..e60ce483a3 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -4,19 +4,33 @@ import spark.util.IntParam
import spark.storage.StorageLevel
import spark.streaming._
+/**
+ * Produce a streaming count of events received from Flume.
+ *
+ * This should be used in conjunction with an AvroSink in Flume. It will start
+ * an Avro server on at the request host:port address and listen for requests.
+ * Your Flume AvroSink should be pointed to this address.
+ *
+ * Usage: FlumeEventCount <master> <host> <port>
+ *
+ * <master> is a Spark master URL
+ * <host> is the host the Flume receiver will be started on - a receiver
+ * creates a server and listens for flume events.
+ * <port> is the port the Flume receiver will listen on.
+ */
object FlumeEventCount {
def main(args: Array[String]) {
- if (args.length != 4) {
+ if (args.length != 3) {
System.err.println(
- "Usage: FlumeEventCount <master> <host> <port> <batchMillis>")
+ "Usage: FlumeEventCount <master> <host> <port>")
System.exit(1)
}
- val Array(master, host, IntParam(port), IntParam(batchMillis)) = args
+ val Array(master, host, IntParam(port)) = args
+ val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount",
- Milliseconds(batchMillis))
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)