aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-07 13:59:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-07 13:59:31 -0800
commit4cc223b4785c9da39c4a35d2adb7339dfa8e47e6 (patch)
tree0aa12b24f0074f3c6522a8ab3c16bf8ac9097061 /streaming
parentd55e3aa467ab7d406739255bd8dc3dfc60f3cb16 (diff)
parent9cfa06837998f30e50b160bc7aaaad3b33a23c5e (diff)
downloadspark-4cc223b4785c9da39c4a35d2adb7339dfa8e47e6.tar.gz
spark-4cc223b4785c9da39c4a35d2adb7339dfa8e47e6.tar.bz2
spark-4cc223b4785c9da39c4a35d2adb7339dfa8e47e6.zip
Merge branch 'mesos-master' into streaming
Diffstat (limited to 'streaming')
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar (renamed from streaming/lib/kafka-0.7.2.jar)bin1358063 -> 1358063 bytes
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom9
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml12
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha11
-rw-r--r--streaming/pom.xml144
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala5
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java)2
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala)0
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala2
22 files changed, 195 insertions, 15 deletions
diff --git a/streaming/lib/kafka-0.7.2.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
index 65f79925a4..65f79925a4 100644
--- a/streaming/lib/kafka-0.7.2.jar
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
Binary files differ
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
new file mode 100644
index 0000000000..29f45f4adb
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
@@ -0,0 +1 @@
+18876b8bc2e4cef28b6d191aa49d963f \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
new file mode 100644
index 0000000000..e3bd62bac0
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
@@ -0,0 +1 @@
+06b27270ffa52250a2c08703b397c99127b72060 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
new file mode 100644
index 0000000000..082d35726a
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.2-spark</version>
+ <description>POM was created from install:install-file</description>
+</project>
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
new file mode 100644
index 0000000000..92c4132b5b
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
@@ -0,0 +1 @@
+7bc4322266e6032bdf9ef6eebdd8097d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
new file mode 100644
index 0000000000..8a1d8a097a
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
@@ -0,0 +1 @@
+d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
new file mode 100644
index 0000000000..720cd51c2f
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<metadata>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <versioning>
+ <release>0.7.2-spark</release>
+ <versions>
+ <version>0.7.2-spark</version>
+ </versions>
+ <lastUpdated>20130121015225</lastUpdated>
+ </versioning>
+</metadata>
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
new file mode 100644
index 0000000000..a4ce5dc9e8
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
@@ -0,0 +1 @@
+e2b9c7c5f6370dd1d21a0aae5e8dcd77 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
new file mode 100644
index 0000000000..b869eaf2a6
--- /dev/null
+++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
@@ -0,0 +1 @@
+2a4341da936b6c07a09383d17ffb185ac558ee91 \ No newline at end of file
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000000..6ee7e59df3
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,144 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>parent</artifactId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project Streaming</name>
+ <url>http://spark-project.org/</url>
+
+ <repositories>
+ <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
+ <repository>
+ <id>lib</id>
+ <url>file://${project.basedir}/lib</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.sgroschupf</groupId>
+ <artifactId>zkclient</artifactId>
+ <version>0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.novocode</groupId>
+ <artifactId>junit-interface</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 07ecb018ee..0eb6aad187 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -198,10 +198,10 @@ abstract class DStream[T: ClassManifest] (
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
- "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
- "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
- "the Java property 'spark.cleaner.delay' to more than " +
- math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes."
+ "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" +
+ "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " +
+ "set the Java property 'spark.cleaner.delay' to more than " +
+ math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds."
)
dependencies.foreach(_.validate())
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index db0461b985..8cfbec51d2 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -406,7 +406,7 @@ object StreamingContext {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second interval.
if (MetadataCleaner.getDelaySeconds < 0) {
- MetadataCleaner.setDelaySeconds(60)
+ MetadataCleaner.setDelaySeconds(3600)
}
new SparkContext(master, frameworkName)
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 70d6bd2b1b..5bbf2b084f 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -34,6 +34,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, frameworkName, batchDuration))
/**
+ * Creates a StreamingContext.
+ * @param sparkContext The underlying JavaSparkContext to use
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(sparkContext: JavaSparkContext, batchDuration: Duration) =
+ this(new StreamingContext(sparkContext.sc, batchDuration))
+
+ /**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index aa6be95f30..8c322dd698 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -153,8 +153,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/** A helper actor that communicates with the NetworkInputTracker */
private class NetworkReceiverActor extends Actor {
logInfo("Attempting to register with tracker")
- val ip = System.getProperty("spark.master.host", "localhost")
- val port = System.getProperty("spark.master.port", "7077").toInt
+ val ip = System.getProperty("spark.driver.host", "localhost")
+ val port = System.getProperty("spark.driver.port", "7077").toInt
val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 290fab1ce0..04e6b69b7b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.dstream
-import spark.{DaemonThread, Logging}
+import spark.Logging
import spark.storage.StorageLevel
import spark.streaming.StreamingContext
@@ -48,7 +48,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
val queue = new ArrayBlockingQueue[ByteBuffer](2)
- blockPushingThread = new DaemonThread {
+ blockPushingThread = new Thread {
+ setDaemon(true)
override def run() {
var nextBlockNumber = 0
while (true) {
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 7a189d85b4..fbe4af4597 100644
--- a/streaming/src/test/java/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -43,7 +43,7 @@ public class JavaAPISuite implements Serializable {
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
/*
@Test
diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..56349837e5 100644
--- a/streaming/src/test/java/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index d98b840b8e..c031949dd1 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -10,7 +10,7 @@ class BasicOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
test("map") {
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 04ccca4c01..7126af62d9 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -21,7 +21,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
var ssc: StreamingContext = null
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 7493ac1207..c4cfffbfc1 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -24,7 +24,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
override def framework = "CheckpointSuite"
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index aa08ea1141..c442210004 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 0c6e928835..cd9608df53 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -13,7 +13,7 @@ class WindowOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
val largerSlideInput = Seq(