diff options
Diffstat (limited to 'streaming')
22 files changed, 195 insertions, 15 deletions
diff --git a/streaming/lib/kafka-0.7.2.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar Binary files differindex 65f79925a4..65f79925a4 100644 --- a/streaming/lib/kafka-0.7.2.jar +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 new file mode 100644 index 0000000000..29f45f4adb --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 @@ -0,0 +1 @@ +18876b8bc2e4cef28b6d191aa49d963f
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 new file mode 100644 index 0000000000..e3bd62bac0 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 @@ -0,0 +1 @@ +06b27270ffa52250a2c08703b397c99127b72060
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom new file mode 100644 index 0000000000..082d35726a --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom @@ -0,0 +1,9 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> + <description>POM was created from install:install-file</description> +</project> diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 new file mode 100644 index 0000000000..92c4132b5b --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 @@ -0,0 +1 @@ +7bc4322266e6032bdf9ef6eebdd8097d
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 new file mode 100644 index 0000000000..8a1d8a097a --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 @@ -0,0 +1 @@ +d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml new file mode 100644 index 0000000000..720cd51c2f --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml @@ -0,0 +1,12 @@ +<?xml version="1.0" encoding="UTF-8"?> +<metadata> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <versioning> + <release>0.7.2-spark</release> + <versions> + <version>0.7.2-spark</version> + </versions> + <lastUpdated>20130121015225</lastUpdated> + </versioning> +</metadata> diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 new file mode 100644 index 0000000000..a4ce5dc9e8 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 @@ -0,0 +1 @@ +e2b9c7c5f6370dd1d21a0aae5e8dcd77
\ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 new file mode 100644 index 0000000000..b869eaf2a6 --- /dev/null +++ b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 @@ -0,0 +1 @@ +2a4341da936b6c07a09383d17ffb185ac558ee91
\ No newline at end of file diff --git a/streaming/pom.xml b/streaming/pom.xml new file mode 100644 index 0000000000..6ee7e59df3 --- /dev/null +++ b/streaming/pom.xml @@ -0,0 +1,144 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.spark-project</groupId> + <artifactId>parent</artifactId> + <version>0.7.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <packaging>jar</packaging> + <name>Spark Project Streaming</name> + <url>http://spark-project.org/</url> + + <repositories> + <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> + <repository> + <id>lib</id> + <url>file://${project.basedir}/lib</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.11</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka</artifactId> + <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>com.github.sgroschupf</groupId> + <artifactId>zkclient</artifactId> + <version>0.1</version> + </dependency> + + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.novocode</groupId> + <artifactId>junit-interface</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <plugins> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>hadoop1</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop1</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop1</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>hadoop2</id> + <dependencies> + <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <classifier>hadoop2</classifier> + </configuration> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 07ecb018ee..0eb6aad187 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -198,10 +198,10 @@ abstract class DStream[T: ClassManifest] ( metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + - "delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " + - "the Java property 'spark.cleaner.delay' to more than " + - math.ceil(rememberDuration.milliseconds.toDouble / 60000.0).toInt + " minutes." + "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + + "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + + "set the Java property 'spark.cleaner.delay' to more than " + + math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." ) dependencies.foreach(_.validate()) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index db0461b985..8cfbec51d2 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -406,7 +406,7 @@ object StreamingContext { // Set the default cleaner delay to an hour if not already set. // This should be sufficient for even 1 second interval. if (MetadataCleaner.getDelaySeconds < 0) { - MetadataCleaner.setDelaySeconds(60) + MetadataCleaner.setDelaySeconds(3600) } new SparkContext(master, frameworkName) } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 70d6bd2b1b..5bbf2b084f 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -34,6 +34,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { this(new StreamingContext(master, frameworkName, batchDuration)) /** + * Creates a StreamingContext. + * @param sparkContext The underlying JavaSparkContext to use + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(sparkContext: JavaSparkContext, batchDuration: Duration) = + this(new StreamingContext(sparkContext.sc, batchDuration)) + + /** * Re-creates a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or * to the checkpoint file 'graph' or 'graph.bk'. diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index aa6be95f30..8c322dd698 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -153,8 +153,8 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** A helper actor that communicates with the NetworkInputTracker */ private class NetworkReceiverActor extends Actor { logInfo("Attempting to register with tracker") - val ip = System.getProperty("spark.master.host", "localhost") - val port = System.getProperty("spark.master.port", "7077").toInt + val ip = System.getProperty("spark.driver.host", "localhost") + val port = System.getProperty("spark.driver.port", "7077").toInt val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorFor(url) val timeout = 5.seconds diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 290fab1ce0..04e6b69b7b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.{DaemonThread, Logging} +import spark.Logging import spark.storage.StorageLevel import spark.streaming.StreamingContext @@ -48,7 +48,8 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) val queue = new ArrayBlockingQueue[ByteBuffer](2) - blockPushingThread = new DaemonThread { + blockPushingThread = new Thread { + setDaemon(true) override def run() { var nextBlockNumber = 0 while (true) { diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 7a189d85b4..fbe4af4597 100644 --- a/streaming/src/test/java/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -43,7 +43,7 @@ public class JavaAPISuite implements Serializable { ssc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port"); + System.clearProperty("spark.driver.port"); } /* @Test diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 56349837e5..56349837e5 100644 --- a/streaming/src/test/java/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index d98b840b8e..c031949dd1 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -10,7 +10,7 @@ class BasicOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } test("map") { diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 04ccca4c01..7126af62d9 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -21,7 +21,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } var ssc: StreamingContext = null diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala index 7493ac1207..c4cfffbfc1 100644 --- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala @@ -24,7 +24,7 @@ class FailureSuite extends TestSuiteBase with BeforeAndAfter { FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } override def framework = "CheckpointSuite" diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index aa08ea1141..c442210004 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -29,7 +29,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 0c6e928835..cd9608df53 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -13,7 +13,7 @@ class WindowOperationsSuite extends TestSuiteBase { after { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.master.port") + System.clearProperty("spark.driver.port") } val largerSlideInput = Seq( |