From a96b4ef761aa80310b194176f41a088c5bf6274a Mon Sep 17 00:00:00 2001 From: seanm Date: Mon, 15 Jul 2013 19:13:17 -0600 Subject: dding tgz option to make-distribution.sh --- make-distribution.sh | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) (limited to 'make-distribution.sh') diff --git a/make-distribution.sh b/make-distribution.sh index feb13d52f9..ef3d2529d0 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash # # Script to create a binary distribution for easy deploys of Spark. # The distribution directory defaults to dist/ but can be overridden below. @@ -6,6 +6,10 @@ # so it is completely self contained. # It does not contain source or *.class files. # +# Arguments +# (none): Creates dist/ directory +# tgz: Additionally creates spark-$VERSION-bin.tar.gz +# # Recommended deploy/testing procedure (standalone mode): # 1) Rsync / deploy the dist/ dir to one host # 2) cd to deploy dir; ./bin/start-master.sh @@ -19,8 +23,14 @@ DISTDIR="$FWDIR/dist" # Get version from SBT export TERM=dumb # Prevents color codes in SBT output -VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2) -echo "Making distribution for Spark $VERSION in $DISTDIR..." +VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/') + +if [ "$1" == "tgz" ]; then + echo "Making spark-$VERSION-bin.tar.gz" +else + echo "Making distribution for Spark $VERSION in $DISTDIR..." +fi + # Build fat JAR $FWDIR/sbt/sbt "repl/assembly" @@ -37,3 +47,11 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/conf" "$DISTDIR" cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" + + +if [ "$1" == "tgz" ]; then + TARDIR="$FWDIR/spark-$VERSION" + cp -r $DISTDIR $TARDIR + tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION + rm -rf $TARDIR +fi -- cgit v1.2.3 From f6f46455eb4f1199eb9a464b1a0b45d9926f7ff8 Mon Sep 17 00:00:00 2001 From: Benjamin Hindman Date: Tue, 23 Jul 2013 09:33:13 -0400 Subject: Added property 'spark.executor.uri' for launching on Mesos without requiring Spark to be installed. Using 'make_distribution.sh' a user can put a Spark distribution at a URI supported by Mesos (e.g., 'hdfs://...') and then set that when launching their job. Also added SPARK_EXECUTOR_URI for the REPL. --- .../mesos/CoarseMesosSchedulerBackend.scala | 27 ++++++++++++++++------ .../scheduler/mesos/MesosSchedulerBackend.scala | 17 ++++++++++---- make-distribution.sh | 1 + project/SparkBuild.scala | 2 +- repl/src/main/scala/spark/repl/SparkILoop.scala | 2 ++ 5 files changed, 36 insertions(+), 13 deletions(-) (limited to 'make-distribution.sh') diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 7bc6040544..f75244a9ac 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend( } def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val runScript = new File(sparkHome, "run").getCanonicalPath - val driverUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) - val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(value) .build()) } - return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + val runScript = new File(sparkHome, "run").getCanonicalPath + command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = new File(uri).getName().split('.')(0) + command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + return command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 75b8268b55..51b780ac72 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend( .setValue(value) .build()) } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = new File(uri).getName().split('.')(0) + command.setValue("cd %s*; ./spark-executor".format(basename)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .build() - val command = CommandInfo.newBuilder() - .setValue(execScript) - .setEnvironment(environment) - .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) diff --git a/make-distribution.sh b/make-distribution.sh index 4374e0e8c4..0a8941c1f8 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/conf" "$DISTDIR" cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" +cp "$FWDIR/spark-executor" "$DISTDIR" if [ "$1" == "tgz" ]; then diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9920e00a67..a2ea9c9694 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -176,7 +176,7 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "net.liftweb" % "lift-json_2.9.2" % "2.5", - "org.apache.mesos" % "mesos" % "0.9.0-incubating", + "org.apache.mesos" % "mesos" % "0.12.0-incubating", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", "com.codahale.metrics" % "metrics-core" % "3.0.0", diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala index 59f9d05683..0bfe7bb743 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/spark/repl/SparkILoop.scala @@ -831,6 +831,8 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: var sparkContext: SparkContext = null def createSparkContext(): SparkContext = { + val uri = System.getenv("SPARK_EXECUTOR_URI") + if (uri != null) System.setProperty("spark.executor.uri", uri) val master = this.master match { case Some(m) => m case None => { -- cgit v1.2.3 From 3f98eff63a3df35f6dc56f0786c828cdbe4ffcf1 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 14 Aug 2013 17:34:34 -0700 Subject: Allow make-distribution.sh to specify Hadoop version used --- make-distribution.sh | 53 ++++++++++++++++++++++++++++++++++++++---------- project/SparkBuild.scala | 28 +++++++++++++++++++------ 2 files changed, 64 insertions(+), 17 deletions(-) (limited to 'make-distribution.sh') diff --git a/make-distribution.sh b/make-distribution.sh index 0a8941c1f8..a101024de5 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -24,9 +24,10 @@ # so it is completely self contained. # It does not contain source or *.class files. # -# Arguments -# (none): Creates dist/ directory -# tgz: Additionally creates spark-$VERSION-bin.tar.gz +# Optional Arguments +# --tgz: Additionally creates spark-$VERSION-bin.tar.gz +# --hadoop VERSION: Builds against specified version of Hadoop. +# --with-yarn: Enables support for Hadoop YARN. # # Recommended deploy/testing procedure (standalone mode): # 1) Rsync / deploy the dist/ dir to one host @@ -44,20 +45,50 @@ DISTDIR="$FWDIR/dist" export TERM=dumb # Prevents color codes in SBT output VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/') -if [ "$1" == "tgz" ]; then - echo "Making spark-$VERSION-bin.tar.gz" +# Initialize defaults +SPARK_HADOOP_VERSION=1.2.1 +SPARK_YARN_MODE=false +MAKE_TGZ=false + +# Parse arguments +while (( "$#" )); do + case $1 in + --hadoop) + SPARK_HADOOP_VERSION="$2" + shift + ;; + --with-yarn) + SPARK_YARN_MODE=true + ;; + --tgz) + MAKE_TGZ=true + ;; + esac + shift +done + +if [ "$MAKE_TGZ" == "true" ]; then + echo "Making spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" else echo "Making distribution for Spark $VERSION in $DISTDIR..." fi +echo "Hadoop version set to $SPARK_HADOOP_VERSION" +if [ "$SPARK_YARN_MODE" == "true" ]; then + echo "YARN enabled" +else + echo "YARN disabled" +fi # Build fat JAR -$FWDIR/sbt/sbt "repl/assembly" +export SPARK_HADOOP_VERSION +export SPARK_YARN_MODE +"$FWDIR/sbt/sbt" "repl/assembly" # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/jars" -echo "$VERSION" >$DISTDIR/RELEASE +echo "$VERSION" > "$DISTDIR/RELEASE" # Copy jars cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" @@ -69,9 +100,9 @@ cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/spark-executor" "$DISTDIR" -if [ "$1" == "tgz" ]; then +if [ "$MAKE_TGZ" == "true" ]; then TARDIR="$FWDIR/spark-$VERSION" - cp -r $DISTDIR $TARDIR - tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION - rm -rf $TARDIR + cp -r "$DISTDIR" "$TARDIR" + tar -zcf "spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" -C "$FWDIR" "spark-$VERSION" + rm -rf "$TARDIR" fi diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 350a36a964..23c7179919 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -24,10 +24,15 @@ import AssemblyKeys._ //import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { + // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or + // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. + val HADOOP_VERSION = "1.2.1" + val HADOOP_YARN = false + // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools, yarn) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*) lazy val core = Project("core", file("core"), settings = coreSettings) @@ -49,6 +54,17 @@ object SparkBuild extends Build { lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") + // Allows build configuration to be set through environment variables + lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", HADOOP_VERSION) + lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_YARN_MODE") match { + case None => HADOOP_YARN + case Some(v) => v.toBoolean + } + + // Conditionally include the yarn sub-project + lazy val maybeYarn = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]() + lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarn + def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.8.0-SNAPSHOT", @@ -170,7 +186,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % "1.2.1" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", @@ -239,10 +255,10 @@ object SparkBuild extends Build { name := "spark-yarn", libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) ) ) ++ assemblySettings ++ extraAssemblySettings -- cgit v1.2.3 From 67b593607c7df934d5a73012fe9cce220b25f321 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 16 Aug 2013 13:53:16 -0700 Subject: Rename YARN build flag to SPARK_WITH_YARN --- make-distribution.sh | 8 ++++---- project/SparkBuild.scala | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) (limited to 'make-distribution.sh') diff --git a/make-distribution.sh b/make-distribution.sh index a101024de5..55dc22b992 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -47,7 +47,7 @@ VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z # Initialize defaults SPARK_HADOOP_VERSION=1.2.1 -SPARK_YARN_MODE=false +SPARK_WITH_YARN=false MAKE_TGZ=false # Parse arguments @@ -58,7 +58,7 @@ while (( "$#" )); do shift ;; --with-yarn) - SPARK_YARN_MODE=true + SPARK_WITH_YARN=true ;; --tgz) MAKE_TGZ=true @@ -74,7 +74,7 @@ else fi echo "Hadoop version set to $SPARK_HADOOP_VERSION" -if [ "$SPARK_YARN_MODE" == "true" ]; then +if [ "$SPARK_WITH_YARN" == "true" ]; then echo "YARN enabled" else echo "YARN disabled" @@ -82,7 +82,7 @@ fi # Build fat JAR export SPARK_HADOOP_VERSION -export SPARK_YARN_MODE +export SPARK_WITH_YARN "$FWDIR/sbt/sbt" "repl/assembly" # Make directories diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4023626c16..cea982b886 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -26,8 +26,10 @@ import AssemblyKeys._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.2.1" - val HADOOP_YARN = false + // Note that these variables can be set through the environment variables + // SPARK_HADOOP_VERSION and SPARK_WITH_YARN. + val DEFAULT_HADOOP_VERSION = "1.2.1" + val DEFAULT_WITH_YARN = false // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" @@ -55,9 +57,9 @@ object SparkBuild extends Build { lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") // Allows build configuration to be set through environment variables - lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", HADOOP_VERSION) - lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_YARN_MODE") match { - case None => HADOOP_YARN + lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION) + lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match { + case None => DEFAULT_WITH_YARN case Some(v) => v.toBoolean } -- cgit v1.2.3 From 111b2741fd4bacd5f0b31add22acd28d7d884299 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 21 Aug 2013 11:54:10 -0700 Subject: Change default SPARK_HADOOP_VERSION in make-distribution.sh too --- make-distribution.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'make-distribution.sh') diff --git a/make-distribution.sh b/make-distribution.sh index 55dc22b992..70aff418c7 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -46,7 +46,7 @@ export TERM=dumb # Prevents color codes in SBT output VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/') # Initialize defaults -SPARK_HADOOP_VERSION=1.2.1 +SPARK_HADOOP_VERSION=1.0.4 SPARK_WITH_YARN=false MAKE_TGZ=false -- cgit v1.2.3 From 53cd50c0699efc8733518658100c62426b425de2 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 23 Aug 2013 23:30:17 -0700 Subject: Change build and run instructions to use assemblies This commit makes Spark invocation saner by using an assembly JAR to find all of Spark's dependencies instead of adding all the JARs in lib_managed. It also packages the examples into an assembly and uses that as SPARK_EXAMPLES_JAR. Finally, it replaces the old "run" script with two better-named scripts: "run-examples" for examples, and "spark-class" for Spark internal classes (e.g. REPL, master, etc). This is also designed to minimize the confusion people have in trying to use "run" to run their own classes; it's not meant to do that, but now at least if they look at it, they can modify run-examples to do a decent job for them. As part of this, Bagel's examples are also now properly moved to the examples package instead of bagel. --- README.md | 6 +- .../scala/spark/bagel/examples/PageRankUtils.scala | 123 ------------ .../spark/bagel/examples/WikipediaPageRank.scala | 101 ---------- .../examples/WikipediaPageRankStandalone.scala | 223 --------------------- bin/compute-classpath.sh | 82 ++------ bin/spark-daemon.sh | 8 +- .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- core/src/test/scala/spark/DriverSuite.scala | 2 +- docs/bagel-programming-guide.md | 2 +- docs/index.md | 4 +- docs/java-programming-guide.md | 2 +- docs/running-on-yarn.md | 8 +- docs/scala-programming-guide.md | 2 +- docs/spark-standalone.md | 4 +- docs/streaming-programming-guide.md | 4 +- examples/pom.xml | 5 + .../scala/spark/examples/bagel/PageRankUtils.scala | 123 ++++++++++++ .../spark/examples/bagel/WikipediaPageRank.scala | 101 ++++++++++ .../bagel/WikipediaPageRankStandalone.scala | 223 +++++++++++++++++++++ make-distribution.sh | 17 +- project/SparkBuild.scala | 52 +++-- project/build.properties | 2 +- project/plugins.sbt | 2 +- python/pyspark/java_gateway.py | 2 +- run | 175 ---------------- run-example | 76 +++++++ spark-class | 124 ++++++++++++ spark-executor | 2 +- spark-shell | 3 +- 29 files changed, 734 insertions(+), 746 deletions(-) delete mode 100644 bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala delete mode 100644 bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala delete mode 100644 bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala create mode 100644 examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala create mode 100644 examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala create mode 100644 examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala delete mode 100755 run create mode 100755 run-example create mode 100755 spark-class (limited to 'make-distribution.sh') diff --git a/README.md b/README.md index e5f527b84a..e7af34f513 100644 --- a/README.md +++ b/README.md @@ -20,16 +20,16 @@ Spark and its example programs, run: Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html) -in the spark documentation.. +in the Spark documentation.. To run Spark, you will need to have Scala's bin directory in your `PATH`, or you will need to set the `SCALA_HOME` environment variable to point to where you've installed Scala. Scala must be accessible through one of these methods on your cluster's worker nodes as well as its master. -To run one of the examples, use `./run `. For example: +To run one of the examples, use `./run-example `. For example: - ./run spark.examples.SparkLR local[2] + ./run-example spark.examples.SparkLR local[2] will run the Logistic Regression example locally on 2 CPUs. diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala deleted file mode 100644 index de65e27fe0..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.bagel.examples - -import spark._ -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ - -class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double)( - self: PRVertex, messageSum: Option[Double], superstep: Int - ): (PRVertex, Array[PRMessage]) = { - val newValue = messageSum match { - case Some(msgSum) if msgSum != 0 => - 0.15 / numVertices + 0.85 * msgSum - case _ => self.value - } - - val terminate = superstep >= 10 - - val outbox: Array[PRMessage] = - if (!terminate) - self.outEdges.map(targetId => - new PRMessage(targetId, newValue / self.outEdges.size)) - else - Array[PRMessage]() - - (new PRVertex(newValue, self.outEdges, !terminate), outbox) - } - - def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) = - computeWithCombiner(numVertices, epsilon)(self, messages match { - case Some(msgs) => Some(msgs.map(_.value).sum) - case None => None - }, superstep) -} - -class PRCombiner extends Combiner[PRMessage, Double] with Serializable { - def createCombiner(msg: PRMessage): Double = - msg.value - def mergeMsg(combiner: Double, msg: PRMessage): Double = - combiner + msg.value - def mergeCombiners(a: Double, b: Double): Double = - a + b -} - -class PRVertex() extends Vertex with Serializable { - var value: Double = _ - var outEdges: Array[String] = _ - var active: Boolean = _ - - def this(value: Double, outEdges: Array[String], active: Boolean = true) { - this() - this.value = value - this.outEdges = outEdges - this.active = active - } - - override def toString(): String = { - "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString) - } -} - -class PRMessage() extends Message[String] with Serializable { - var targetId: String = _ - var value: Double = _ - - def this(targetId: String, value: Double) { - this() - this.targetId = targetId - this.value = value - } -} - -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - } -} - -class CustomPartitioner(partitions: Int) extends Partitioner { - def numPartitions = partitions - - def getPartition(key: Any): Int = { - val hash = key match { - case k: Long => (k & 0x00000000FFFFFFFFL).toInt - case _ => key.hashCode - } - - val mod = key.hashCode % partitions - if (mod < 0) mod + partitions else mod - } - - override def equals(other: Any): Boolean = other match { - case c: CustomPartitioner => - c.numPartitions == numPartitions - case _ => false - } -} diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala deleted file mode 100644 index a0c5ac9c18..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.bagel.examples - -import spark._ -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.xml.{XML,NodeSeq} - -/** - * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles" - * files from there, which contains one line per wiki article in a tab-separated format - * (http://wiki.freebase.com/wiki/WEX/Documentation#articles). - */ -object WikipediaPageRank { - def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: WikipediaPageRank ") - System.exit(-1) - } - - System.setProperty("spark.serializer", "spark.KryoSerializer") - System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) - - val inputFile = args(0) - val threshold = args(1).toDouble - val numPartitions = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean - val sc = new SparkContext(host, "WikipediaPageRank") - - // Parse the Wikipedia page data into a graph - val input = sc.textFile(inputFile) - - println("Counting vertices...") - val numVertices = input.count() - println("Done counting vertices.") - - println("Parsing input file...") - var vertices = input.map(line => { - val fields = line.split("\t") - val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) - val links = - if (body == "\\N") - NodeSeq.Empty - else - try { - XML.loadString(body) \\ "link" \ "target" - } catch { - case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) - NodeSeq.Empty - } - val outEdges = links.map(link => new String(link.text)).toArray - val id = new String(title) - (id, new PRVertex(1.0 / numVertices, outEdges)) - }) - if (usePartitioner) - vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache - else - vertices = vertices.cache - println("Done parsing input file.") - - // Do the computation - val epsilon = 0.01 / numVertices - val messages = sc.parallelize(Array[(String, PRMessage)]()) - val utils = new PageRankUtils - val result = - Bagel.run( - sc, vertices, messages, combiner = new PRCombiner(), - numPartitions = numPartitions)( - utils.computeWithCombiner(numVertices, epsilon)) - - // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") - val top = - (result - .filter { case (id, vertex) => vertex.value >= threshold } - .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) } - .collect.mkString) - println(top) - } -} diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala deleted file mode 100644 index 3c54a85f42..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.bagel.examples - -import spark._ -import serializer.{DeserializationStream, SerializationStream, SerializerInstance} -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.xml.{XML,NodeSeq} - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} -import java.nio.ByteBuffer - -object WikipediaPageRankStandalone { - def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: WikipediaPageRankStandalone ") - System.exit(-1) - } - - System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer") - - val inputFile = args(0) - val threshold = args(1).toDouble - val numIterations = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean - val sc = new SparkContext(host, "WikipediaPageRankStandalone") - - val input = sc.textFile(inputFile) - val partitioner = new HashPartitioner(sc.defaultParallelism) - val links = - if (usePartitioner) - input.map(parseArticle _).partitionBy(partitioner).cache() - else - input.map(parseArticle _).cache() - val n = links.count() - val defaultRank = 1.0 / n - val a = 0.15 - - // Do the computation - val startTime = System.currentTimeMillis - val ranks = - pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism) - - // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") - val top = - (ranks - .filter { case (id, rank) => rank >= threshold } - .map { case (id, rank) => "%s\t%s\n".format(id, rank) } - .collect().mkString) - println(top) - - val time = (System.currentTimeMillis - startTime) / 1000.0 - println("Completed %d iterations in %f seconds: %f seconds per iteration" - .format(numIterations, time, time / numIterations)) - System.exit(0) - } - - def parseArticle(line: String): (String, Array[String]) = { - val fields = line.split("\t") - val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) - val id = new String(title) - val links = - if (body == "\\N") - NodeSeq.Empty - else - try { - XML.loadString(body) \\ "link" \ "target" - } catch { - case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) - NodeSeq.Empty - } - val outEdges = links.map(link => new String(link.text)).toArray - (id, outEdges) - } - - def pageRank( - links: RDD[(String, Array[String])], - numIterations: Int, - defaultRank: Double, - a: Double, - n: Long, - partitioner: Partitioner, - usePartitioner: Boolean, - numPartitions: Int - ): RDD[(String, Double)] = { - var ranks = links.mapValues { edges => defaultRank } - for (i <- 1 to numIterations) { - val contribs = links.groupWith(ranks).flatMap { - case (id, (linksWrapper, rankWrapper)) => - if (linksWrapper.length > 0) { - if (rankWrapper.length > 0) { - linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size)) - } else { - linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size)) - } - } else { - Array[(String, Double)]() - } - } - ranks = (contribs.combineByKey((x: Double) => x, - (x: Double, y: Double) => x + y, - (x: Double, y: Double) => x + y, - partitioner) - .mapValues(sum => a/n + (1-a)*sum)) - } - ranks - } -} - -class WPRSerializer extends spark.serializer.Serializer { - def newInstance(): SerializerInstance = new WPRSerializerInstance() -} - -class WPRSerializerInstance extends SerializerInstance { - def serialize[T](t: T): ByteBuffer = { - throw new UnsupportedOperationException() - } - - def deserialize[T](bytes: ByteBuffer): T = { - throw new UnsupportedOperationException() - } - - def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { - throw new UnsupportedOperationException() - } - - def serializeStream(s: OutputStream): SerializationStream = { - new WPRSerializationStream(s) - } - - def deserializeStream(s: InputStream): DeserializationStream = { - new WPRDeserializationStream(s) - } -} - -class WPRSerializationStream(os: OutputStream) extends SerializationStream { - val dos = new DataOutputStream(os) - - def writeObject[T](t: T): SerializationStream = t match { - case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match { - case links: Array[String] => { - dos.writeInt(0) // links - dos.writeUTF(id) - dos.writeInt(links.length) - for (link <- links) { - dos.writeUTF(link) - } - this - } - case rank: Double => { - dos.writeInt(1) // rank - dos.writeUTF(id) - dos.writeDouble(rank) - this - } - } - case (id: String, rank: Double) => { - dos.writeInt(2) // rank without wrapper - dos.writeUTF(id) - dos.writeDouble(rank) - this - } - } - - def flush() { dos.flush() } - def close() { dos.close() } -} - -class WPRDeserializationStream(is: InputStream) extends DeserializationStream { - val dis = new DataInputStream(is) - - def readObject[T](): T = { - val typeId = dis.readInt() - typeId match { - case 0 => { - val id = dis.readUTF() - val numLinks = dis.readInt() - val links = new Array[String](numLinks) - for (i <- 0 until numLinks) { - val link = dis.readUTF() - links(i) = link - } - (id, ArrayBuffer(links)).asInstanceOf[T] - } - case 1 => { - val id = dis.readUTF() - val rank = dis.readDouble() - (id, ArrayBuffer(rank)).asInstanceOf[T] - } - case 2 => { - val id = dis.readUTF() - val rank = dis.readDouble() - (id, rank).asInstanceOf[T] - } - } - } - - def close() { dis.close() } -} diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 7a21b3c4a1..5dc86c51a4 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -30,79 +30,25 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi -CORE_DIR="$FWDIR/core" -REPL_DIR="$FWDIR/repl" -REPL_BIN_DIR="$FWDIR/repl-bin" -EXAMPLES_DIR="$FWDIR/examples" -BAGEL_DIR="$FWDIR/bagel" -MLLIB_DIR="$FWDIR/mllib" -TOOLS_DIR="$FWDIR/tools" -YARN_DIR="$FWDIR/yarn" -STREAMING_DIR="$FWDIR/streaming" -PYSPARK_DIR="$FWDIR/python" - # Build up classpath -CLASSPATH="$SPARK_CLASSPATH" - -function dev_classpath { - CLASSPATH="$CLASSPATH:$FWDIR/conf" - CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/classes" - if [ -n "$SPARK_TESTING" ] ; then - CLASSPATH="$CLASSPATH:$CORE_DIR/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/test-classes" - fi - CLASSPATH="$CLASSPATH:$CORE_DIR/src/main/resources" - CLASSPATH="$CLASSPATH:$REPL_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$STREAMING_DIR/lib/org/apache/kafka/kafka/0.7.2-spark/*" # <-- our in-project Kafka Jar - if [ -e "$FWDIR/lib_managed" ]; then - CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/jars/*" - CLASSPATH="$CLASSPATH:$FWDIR/lib_managed/bundles/*" - fi - CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" - # Add the shaded JAR for Maven builds - if [ -e $REPL_BIN_DIR/target ]; then - for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do - CLASSPATH="$CLASSPATH:$jar" - done - # The shaded JAR doesn't contain examples, so include those separately - for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do - CLASSPATH="$CLASSPATH:$jar" - done - fi - CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes" - for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do - CLASSPATH="$CLASSPATH:$jar" - done - - # Add Scala standard library - if [ -z "$SCALA_LIBRARY_PATH" ]; then - if [ -z "$SCALA_HOME" ]; then - echo "SCALA_HOME is not set" >&2 - exit 1 - fi - SCALA_LIBRARY_PATH="$SCALA_HOME/lib" - fi - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-library.jar" - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/scala-compiler.jar" - CLASSPATH="$CLASSPATH:$SCALA_LIBRARY_PATH/jline.jar" -} - -function release_classpath { - CLASSPATH="$CLASSPATH:$FWDIR/jars/*" -} - +CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf" if [ -f "$FWDIR/RELEASE" ]; then - release_classpath + ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark-assembly*.jar` else - dev_classpath + ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*.jar` +fi +CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR" + +# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1 +if [[ $SPARK_TESTING == 1 ]]; then + CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" fi -# Add hadoop conf dir - else FileSystem.*, etc fail ! +# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! # Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts # the configurtion files. if [ "x" != "x$HADOOP_CONF_DIR" ]; then diff --git a/bin/spark-daemon.sh b/bin/spark-daemon.sh index 96c71e66ca..eac0774669 100755 --- a/bin/spark-daemon.sh +++ b/bin/spark-daemon.sh @@ -87,7 +87,7 @@ TEST_LOG_DIR=$? if [ "${TEST_LOG_DIR}" = "0" ]; then rm -f $SPARK_LOG_DIR/.spark_test else - chown $SPARK_IDENT_STRING $SPARK_LOG_DIR + chown $SPARK_IDENT_STRING $SPARK_LOG_DIR fi if [ "$SPARK_PID_DIR" = "" ]; then @@ -109,7 +109,7 @@ fi case $startStop in (start) - + mkdir -p "$SPARK_PID_DIR" if [ -f $pid ]; then @@ -128,11 +128,11 @@ case $startStop in echo starting $command, logging to $log echo "Spark Daemon: $command" > $log cd "$SPARK_PREFIX" - nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/run $command "$@" >> "$log" 2>&1 < /dev/null & + nohup nice -n $SPARK_NICENESS "$SPARK_PREFIX"/spark-class $command "$@" >> "$log" 2>&1 < /dev/null & echo $! > $pid sleep 1; head "$log" ;; - + (stop) if [ -f $pid ]; then diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 6ebbb5ec9b..ebfc21392d 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -125,7 +125,7 @@ private[spark] class CoarseMesosSchedulerBackend( StandaloneSchedulerBackend.ACTOR_NAME) val uri = System.getProperty("spark.executor.uri") if (uri == null) { - val runScript = new File(sparkHome, "run").getCanonicalPath + val runScript = new File(sparkHome, "spark-class").getCanonicalPath command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) } else { diff --git a/core/src/test/scala/spark/DriverSuite.scala b/core/src/test/scala/spark/DriverSuite.scala index ed16b9d8ef..553c0309f6 100644 --- a/core/src/test/scala/spark/DriverSuite.scala +++ b/core/src/test/scala/spark/DriverSuite.scala @@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts { val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) forAll(masters) { (master: String) => failAfter(30 seconds) { - Utils.execute(Seq("./run", "spark.DriverWithoutCleanup", master), + Utils.execute(Seq("./spark-class", "spark.DriverWithoutCleanup", master), new File(System.getenv("SPARK_HOME"))) } } diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md index 8a0fa42d94..c526da3ca0 100644 --- a/docs/bagel-programming-guide.md +++ b/docs/bagel-programming-guide.md @@ -158,4 +158,4 @@ trait Message[K] { ## Where to Go from Here -Two example jobs, PageRank and shortest path, are included in `bagel/src/main/scala/spark/bagel/examples`. You can run them by passing the class name to the `run` script included in Spark -- for example, `./run spark.bagel.examples.WikipediaPageRank`. Each example program prints usage help when run without any arguments. +Two example jobs, PageRank and shortest path, are included in `examples/src/main/scala/spark/examples/bagel`. You can run them by passing the class name to the `run-example` script included in Spark -- for example, `./run-example spark.examples.bagel.WikipediaPageRank`. Each example program prints usage help when run without any arguments. diff --git a/docs/index.md b/docs/index.md index 0c4add45dc..e51a6998f6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,9 +27,9 @@ Spark also supports building using Maven. If you would like to build using Maven # Testing the Build Spark comes with a number of sample programs in the `examples` directory. -To run one of the samples, use `./run ` in the top-level Spark directory +To run one of the samples, use `./run-example ` in the top-level Spark directory (the `run` script sets up the appropriate paths and launches that program). -For example, `./run spark.examples.SparkPi` will run a sample program that estimates Pi. Each of the +For example, `./run-example spark.examples.SparkPi` will run a sample program that estimates Pi. Each of the examples prints usage help if no params are given. Note that all of the sample programs take a `` parameter specifying the cluster URL diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index ae8257b539..dd19a5f0c9 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -190,6 +190,6 @@ We hope to generate documentation with Java-style syntax in the future. Spark includes several sample programs using the Java API in [`examples/src/main/java`](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/examples). You can run them by passing the class name to the -`run` script included in Spark -- for example, `./run +`run-example` script included in Spark -- for example, `./run-example spark.examples.JavaWordCount`. Each example program prints usage help when run without any arguments. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 1a0afd19d4..678cd57aba 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -15,9 +15,9 @@ We need a consolidated spark core jar (which bundles all the required dependenci This can be built either through sbt or via maven. - Building spark assembled jar via sbt. -Enable YARN support by setting `SPARK_WITH_YARN=true` when invoking sbt: +Enable YARN support by setting `SPARK_YARN=true` when invoking sbt: - SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_WITH_YARN=true ./sbt/sbt clean assembly + SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true ./sbt/sbt clean assembly The assembled jar would typically be something like : `./yarn/target/spark-yarn-assembly-0.8.0-SNAPSHOT.jar` @@ -55,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t The command to launch the YARN Client is as follows: - SPARK_JAR= ./run spark.deploy.yarn.Client \ + SPARK_JAR= ./spark-class spark.deploy.yarn.Client \ --jar \ --class \ --args \ @@ -67,7 +67,7 @@ The command to launch the YARN Client is as follows: For example: - SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \ + SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./spark-class spark.deploy.yarn.Client \ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \ --class spark.examples.SparkPi \ --args yarn-standalone \ diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index e9cf9ef36f..db584d2096 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -356,7 +356,7 @@ res2: Int = 10 # Where to Go from Here You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website. -In addition, Spark includes several sample programs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run` script included in Spark -- for example, `./run spark.examples.SparkPi`. Each example program prints usage help when run without any arguments. +In addition, Spark includes several sample programs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run-example` script included in Spark -- for example, `./run-example spark.examples.SparkPi`. Each example program prints usage help when run without any arguments. For help on optimizing your program, the [configuration](configuration.html) and [tuning](tuning.html) guides provide information on best practices. They are especially important for diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 7463844a4e..bb8be276c5 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -20,7 +20,7 @@ Compile Spark with `sbt package` as described in the [Getting Started Guide](ind You can start a standalone master server by executing: - ./run spark.deploy.master.Master + ./spark-class spark.deploy.master.Master Once started, the master will print out a `spark://IP:PORT` URL for itself, which you can use to connect workers to it, or pass as the "master" argument to `SparkContext` to connect a job to the cluster. You can also find this URL on @@ -28,7 +28,7 @@ the master's web UI, which is [http://localhost:8080](http://localhost:8080) by Similarly, you can start one or more workers and connect them to the master via: - ./run spark.deploy.worker.Worker spark://IP:PORT + ./spark-class spark.deploy.worker.Worker spark://IP:PORT Once you have started a worker, look at the master's web UI ([http://localhost:8080](http://localhost:8080) by default). You should see the new node listed there, along with its number of CPUs and memory (minus one gigabyte left for the OS). diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index a74c17bdb7..3330e63598 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -234,7 +234,7 @@ $ nc -lk 9999 Then, in a different terminal, you can start NetworkWordCount by using {% highlight bash %} -$ ./run spark.streaming.examples.NetworkWordCount local[2] localhost 9999 +$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999 {% endhighlight %} This will make NetworkWordCount connect to the netcat server. Any lines typed in the terminal running the netcat server will be counted and printed on screen. @@ -272,7 +272,7 @@ Time: 1357008430000 ms -You can find more examples in `/streaming/src/main/scala/spark/streaming/examples/`. They can be run in the similar manner using `./run spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files. +You can find more examples in `/streaming/src/main/scala/spark/streaming/examples/`. They can be run in the similar manner using `./run-example spark.streaming.examples....` . Executing without any parameter would give the required parameter list. Further explanation to run them can be found in comments in the files. # DStream Persistence Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is, using `persist()` method on a DStream would automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like `reduceByWindow` and `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling `persist()`. diff --git a/examples/pom.xml b/examples/pom.xml index 0db52b8691..d24bd404fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -47,6 +47,11 @@ spark-mllib ${project.version} + + org.spark-project + spark-bagel + ${project.version} + org.apache.hbase hbase diff --git a/examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala new file mode 100644 index 0000000000..c23ee9895f --- /dev/null +++ b/examples/src/main/scala/spark/examples/bagel/PageRankUtils.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.examples.bagel + +import spark._ +import spark.SparkContext._ + +import spark.bagel._ +import spark.bagel.Bagel._ + +import scala.collection.mutable.ArrayBuffer + +import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} + +import com.esotericsoftware.kryo._ + +class PageRankUtils extends Serializable { + def computeWithCombiner(numVertices: Long, epsilon: Double)( + self: PRVertex, messageSum: Option[Double], superstep: Int + ): (PRVertex, Array[PRMessage]) = { + val newValue = messageSum match { + case Some(msgSum) if msgSum != 0 => + 0.15 / numVertices + 0.85 * msgSum + case _ => self.value + } + + val terminate = superstep >= 10 + + val outbox: Array[PRMessage] = + if (!terminate) + self.outEdges.map(targetId => + new PRMessage(targetId, newValue / self.outEdges.size)) + else + Array[PRMessage]() + + (new PRVertex(newValue, self.outEdges, !terminate), outbox) + } + + def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) = + computeWithCombiner(numVertices, epsilon)(self, messages match { + case Some(msgs) => Some(msgs.map(_.value).sum) + case None => None + }, superstep) +} + +class PRCombiner extends Combiner[PRMessage, Double] with Serializable { + def createCombiner(msg: PRMessage): Double = + msg.value + def mergeMsg(combiner: Double, msg: PRMessage): Double = + combiner + msg.value + def mergeCombiners(a: Double, b: Double): Double = + a + b +} + +class PRVertex() extends Vertex with Serializable { + var value: Double = _ + var outEdges: Array[String] = _ + var active: Boolean = _ + + def this(value: Double, outEdges: Array[String], active: Boolean = true) { + this() + this.value = value + this.outEdges = outEdges + this.active = active + } + + override def toString(): String = { + "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString) + } +} + +class PRMessage() extends Message[String] with Serializable { + var targetId: String = _ + var value: Double = _ + + def this(targetId: String, value: Double) { + this() + this.targetId = targetId + this.value = value + } +} + +class PRKryoRegistrator extends KryoRegistrator { + def registerClasses(kryo: Kryo) { + kryo.register(classOf[PRVertex]) + kryo.register(classOf[PRMessage]) + } +} + +class CustomPartitioner(partitions: Int) extends Partitioner { + def numPartitions = partitions + + def getPartition(key: Any): Int = { + val hash = key match { + case k: Long => (k & 0x00000000FFFFFFFFL).toInt + case _ => key.hashCode + } + + val mod = key.hashCode % partitions + if (mod < 0) mod + partitions else mod + } + + override def equals(other: Any): Boolean = other match { + case c: CustomPartitioner => + c.numPartitions == numPartitions + case _ => false + } +} diff --git a/examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala new file mode 100644 index 0000000000..00635a7ffa --- /dev/null +++ b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRank.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.examples.bagel + +import spark._ +import spark.SparkContext._ + +import spark.bagel._ +import spark.bagel.Bagel._ + +import scala.xml.{XML,NodeSeq} + +/** + * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles" + * files from there, which contains one line per wiki article in a tab-separated format + * (http://wiki.freebase.com/wiki/WEX/Documentation#articles). + */ +object WikipediaPageRank { + def main(args: Array[String]) { + if (args.length < 5) { + System.err.println("Usage: WikipediaPageRank ") + System.exit(-1) + } + + System.setProperty("spark.serializer", "spark.KryoSerializer") + System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + + val inputFile = args(0) + val threshold = args(1).toDouble + val numPartitions = args(2).toInt + val host = args(3) + val usePartitioner = args(4).toBoolean + val sc = new SparkContext(host, "WikipediaPageRank") + + // Parse the Wikipedia page data into a graph + val input = sc.textFile(inputFile) + + println("Counting vertices...") + val numVertices = input.count() + println("Done counting vertices.") + + println("Parsing input file...") + var vertices = input.map(line => { + val fields = line.split("\t") + val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) + val links = + if (body == "\\N") + NodeSeq.Empty + else + try { + XML.loadString(body) \\ "link" \ "target" + } catch { + case e: org.xml.sax.SAXParseException => + System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) + NodeSeq.Empty + } + val outEdges = links.map(link => new String(link.text)).toArray + val id = new String(title) + (id, new PRVertex(1.0 / numVertices, outEdges)) + }) + if (usePartitioner) + vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache + else + vertices = vertices.cache + println("Done parsing input file.") + + // Do the computation + val epsilon = 0.01 / numVertices + val messages = sc.parallelize(Array[(String, PRMessage)]()) + val utils = new PageRankUtils + val result = + Bagel.run( + sc, vertices, messages, combiner = new PRCombiner(), + numPartitions = numPartitions)( + utils.computeWithCombiner(numVertices, epsilon)) + + // Print the result + System.err.println("Articles with PageRank >= "+threshold+":") + val top = + (result + .filter { case (id, vertex) => vertex.value >= threshold } + .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) } + .collect.mkString) + println(top) + } +} diff --git a/examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala new file mode 100644 index 0000000000..c416ddbc58 --- /dev/null +++ b/examples/src/main/scala/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.examples.bagel + +import spark._ +import serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import spark.SparkContext._ + +import spark.bagel._ +import spark.bagel.Bagel._ + +import scala.xml.{XML,NodeSeq} + +import scala.collection.mutable.ArrayBuffer + +import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} +import java.nio.ByteBuffer + +object WikipediaPageRankStandalone { + def main(args: Array[String]) { + if (args.length < 5) { + System.err.println("Usage: WikipediaPageRankStandalone ") + System.exit(-1) + } + + System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer") + + val inputFile = args(0) + val threshold = args(1).toDouble + val numIterations = args(2).toInt + val host = args(3) + val usePartitioner = args(4).toBoolean + val sc = new SparkContext(host, "WikipediaPageRankStandalone") + + val input = sc.textFile(inputFile) + val partitioner = new HashPartitioner(sc.defaultParallelism) + val links = + if (usePartitioner) + input.map(parseArticle _).partitionBy(partitioner).cache() + else + input.map(parseArticle _).cache() + val n = links.count() + val defaultRank = 1.0 / n + val a = 0.15 + + // Do the computation + val startTime = System.currentTimeMillis + val ranks = + pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism) + + // Print the result + System.err.println("Articles with PageRank >= "+threshold+":") + val top = + (ranks + .filter { case (id, rank) => rank >= threshold } + .map { case (id, rank) => "%s\t%s\n".format(id, rank) } + .collect().mkString) + println(top) + + val time = (System.currentTimeMillis - startTime) / 1000.0 + println("Completed %d iterations in %f seconds: %f seconds per iteration" + .format(numIterations, time, time / numIterations)) + System.exit(0) + } + + def parseArticle(line: String): (String, Array[String]) = { + val fields = line.split("\t") + val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) + val id = new String(title) + val links = + if (body == "\\N") + NodeSeq.Empty + else + try { + XML.loadString(body) \\ "link" \ "target" + } catch { + case e: org.xml.sax.SAXParseException => + System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) + NodeSeq.Empty + } + val outEdges = links.map(link => new String(link.text)).toArray + (id, outEdges) + } + + def pageRank( + links: RDD[(String, Array[String])], + numIterations: Int, + defaultRank: Double, + a: Double, + n: Long, + partitioner: Partitioner, + usePartitioner: Boolean, + numPartitions: Int + ): RDD[(String, Double)] = { + var ranks = links.mapValues { edges => defaultRank } + for (i <- 1 to numIterations) { + val contribs = links.groupWith(ranks).flatMap { + case (id, (linksWrapper, rankWrapper)) => + if (linksWrapper.length > 0) { + if (rankWrapper.length > 0) { + linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size)) + } else { + linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size)) + } + } else { + Array[(String, Double)]() + } + } + ranks = (contribs.combineByKey((x: Double) => x, + (x: Double, y: Double) => x + y, + (x: Double, y: Double) => x + y, + partitioner) + .mapValues(sum => a/n + (1-a)*sum)) + } + ranks + } +} + +class WPRSerializer extends spark.serializer.Serializer { + def newInstance(): SerializerInstance = new WPRSerializerInstance() +} + +class WPRSerializerInstance extends SerializerInstance { + def serialize[T](t: T): ByteBuffer = { + throw new UnsupportedOperationException() + } + + def deserialize[T](bytes: ByteBuffer): T = { + throw new UnsupportedOperationException() + } + + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { + throw new UnsupportedOperationException() + } + + def serializeStream(s: OutputStream): SerializationStream = { + new WPRSerializationStream(s) + } + + def deserializeStream(s: InputStream): DeserializationStream = { + new WPRDeserializationStream(s) + } +} + +class WPRSerializationStream(os: OutputStream) extends SerializationStream { + val dos = new DataOutputStream(os) + + def writeObject[T](t: T): SerializationStream = t match { + case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match { + case links: Array[String] => { + dos.writeInt(0) // links + dos.writeUTF(id) + dos.writeInt(links.length) + for (link <- links) { + dos.writeUTF(link) + } + this + } + case rank: Double => { + dos.writeInt(1) // rank + dos.writeUTF(id) + dos.writeDouble(rank) + this + } + } + case (id: String, rank: Double) => { + dos.writeInt(2) // rank without wrapper + dos.writeUTF(id) + dos.writeDouble(rank) + this + } + } + + def flush() { dos.flush() } + def close() { dos.close() } +} + +class WPRDeserializationStream(is: InputStream) extends DeserializationStream { + val dis = new DataInputStream(is) + + def readObject[T](): T = { + val typeId = dis.readInt() + typeId match { + case 0 => { + val id = dis.readUTF() + val numLinks = dis.readInt() + val links = new Array[String](numLinks) + for (i <- 0 until numLinks) { + val link = dis.readUTF() + links(i) = link + } + (id, ArrayBuffer(links)).asInstanceOf[T] + } + case 1 => { + val id = dis.readUTF() + val rank = dis.readDouble() + (id, ArrayBuffer(rank)).asInstanceOf[T] + } + case 2 => { + val id = dis.readUTF() + val rank = dis.readDouble() + (id, rank).asInstanceOf[T] + } + } + } + + def close() { dis.close() } +} diff --git a/make-distribution.sh b/make-distribution.sh index 70aff418c7..df7bbf1e74 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -47,7 +47,7 @@ VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z # Initialize defaults SPARK_HADOOP_VERSION=1.0.4 -SPARK_WITH_YARN=false +SPARK_YARN=false MAKE_TGZ=false # Parse arguments @@ -58,7 +58,7 @@ while (( "$#" )); do shift ;; --with-yarn) - SPARK_WITH_YARN=true + SPARK_YARN=true ;; --tgz) MAKE_TGZ=true @@ -74,7 +74,7 @@ else fi echo "Hadoop version set to $SPARK_HADOOP_VERSION" -if [ "$SPARK_WITH_YARN" == "true" ]; then +if [ "$SPARK_YARN" == "true" ]; then echo "YARN enabled" else echo "YARN disabled" @@ -82,21 +82,22 @@ fi # Build fat JAR export SPARK_HADOOP_VERSION -export SPARK_WITH_YARN -"$FWDIR/sbt/sbt" "repl/assembly" +export SPARK_YARN +"$FWDIR/sbt/sbt" "assembly/assembly" # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/jars" -echo "$VERSION" > "$DISTDIR/RELEASE" +echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE" # Copy jars -cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" +cp $FWDIR/assembly/target/*/*assembly*.jar "$DISTDIR/jars/" # Copy other things cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/conf" "$DISTDIR" -cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" +cp "$FWDIR/spark-class" "$DISTDIR" +cp "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/spark-executor" "$DISTDIR" diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5fdcf19b62..b3bf3ef89b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -26,30 +26,35 @@ import AssemblyKeys._ object SparkBuild extends Build { // Hadoop version to build against. For example, "1.0.4" for Apache releases, or // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set - // through the environment variables SPARK_HADOOP_VERSION and SPARK_WITH_YARN. + // through the environment variables SPARK_HADOOP_VERSION and SPARK_YARN. val DEFAULT_HADOOP_VERSION = "1.0.4" - val DEFAULT_WITH_YARN = false + val DEFAULT_YARN = false // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*) lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*) + lazy val repl = Project("repl", file("repl"), settings = replSettings) + .dependsOn(core, bagel, mllib) dependsOn(maybeYarn: _*) - lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib) + lazy val examples = Project("examples", file("examples"), settings = examplesSettings) + .dependsOn(core, mllib, bagel, streaming) - lazy val tools = Project("tools", file("tools"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) + lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming) - lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core) + lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core) - lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core) + lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core) - lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core) + lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core) - lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core) + lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) + + lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings) + .dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*) // A configuration to set an alternative publishLocalConfiguration lazy val MavenCompile = config("m2r") extend(Compile) @@ -57,15 +62,16 @@ object SparkBuild extends Build { // Allows build configuration to be set through environment variables lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION) - lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match { - case None => DEFAULT_WITH_YARN + lazy val isYarnEnabled = scala.util.Properties.envOrNone("SPARK_YARN") match { + case None => DEFAULT_YARN case Some(v) => v.toBoolean } // Conditionally include the yarn sub-project lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() - lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef + lazy val allProjects = Seq[ProjectReference]( + core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", @@ -100,8 +106,8 @@ object SparkBuild extends Build { http://spark-project.org/ - BSD License - https://github.com/mesos/spark/blob/master/LICENSE + Apache 2.0 License + http://www.apache.org/licenses/LICENSE-2.0.html repo @@ -195,7 +201,7 @@ object SparkBuild extends Build { "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" ) - ) ++ assemblySettings ++ extraAssemblySettings + ) def rootSettings = sharedSettings ++ Seq( publish := {} @@ -204,7 +210,7 @@ object SparkBuild extends Build { def replSettings = sharedSettings ++ Seq( name := "spark-repl", libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _) - ) ++ assemblySettings ++ extraAssemblySettings + ) def examplesSettings = sharedSettings ++ Seq( name := "spark-examples", @@ -223,7 +229,7 @@ object SparkBuild extends Build { exclude("org.apache.cassandra.deps", "avro") excludeAll(excludeSnappy) ) - ) + ) ++ assemblySettings ++ extraAssemblySettings def toolsSettings = sharedSettings ++ Seq( name := "spark-tools" @@ -251,7 +257,7 @@ object SparkBuild extends Build { "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty) ) - ) ++ assemblySettings ++ extraAssemblySettings + ) def yarnSettings = sharedSettings ++ Seq( name := "spark-yarn" @@ -271,7 +277,13 @@ object SparkBuild extends Build { ) ) - def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( + def assemblyProjSettings = sharedSettings ++ Seq( + name := "spark-assembly", + jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" } + ) ++ assemblySettings ++ extraAssemblySettings + + def extraAssemblySettings() = Seq( + test in assembly := {}, mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard diff --git a/project/build.properties b/project/build.properties index 08e17131f6..9647277162 100644 --- a/project/build.properties +++ b/project/build.properties @@ -15,4 +15,4 @@ # limitations under the License. # -sbt.version=0.12.3 +sbt.version=0.12.4 diff --git a/project/plugins.sbt b/project/plugins.sbt index 783b40d4f5..cfcd85082a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,7 +4,7 @@ resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/release resolvers += "Spray Repository" at "http://repo.spray.cc/" -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1") addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index e503fb7621..18011c0dc9 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -28,7 +28,7 @@ SPARK_HOME = os.environ["SPARK_HOME"] def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and SPARK_MEM settings from spark-env.sh - command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer", + command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] proc = Popen(command, stdout=PIPE, stdin=PIPE) # Determine which ephemeral port the server started on: diff --git a/run b/run deleted file mode 100755 index 715bbf93d5..0000000000 --- a/run +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env bash - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -SCALA_VERSION=2.9.3 - -# Figure out where the Scala framework is installed -FWDIR="$(cd `dirname $0`; pwd)" - -# Export this as SPARK_HOME -export SPARK_HOME="$FWDIR" - -# Load environment variables from conf/spark-env.sh, if it exists -if [ -e $FWDIR/conf/spark-env.sh ] ; then - . $FWDIR/conf/spark-env.sh -fi - -if [ -z "$1" ]; then - echo "Usage: run []" >&2 - exit 1 -fi - -# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable -# values for that; it doesn't need a lot -if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then - SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} - SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" - # Do not overwrite SPARK_JAVA_OPTS environment variable in this script - OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default -else - OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" -fi - - -# Add java opts for master, worker, executor. The opts maybe null -case "$1" in - 'spark.deploy.master.Master') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" - ;; - 'spark.deploy.worker.Worker') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" - ;; - 'spark.executor.StandaloneExecutorBackend') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" - ;; - 'spark.executor.MesosExecutorBackend') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" - ;; - 'spark.repl.Main') - OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" - ;; -esac - -# Figure out whether to run our class with java or with the scala launcher. -# In most cases, we'd prefer to execute our process with java because scala -# creates a shell script as the parent of its Java process, which makes it -# hard to kill the child with stuff like Process.destroy(). However, for -# the Spark shell, the wrapper is necessary to properly reset the terminal -# when we exit, so we allow it to set a variable to launch with scala. -# We still fall back on java for the shell if this is a "release" created -# from make-distribution.sh since it's possible scala is not installed -# but we have everything we need to run the shell. -if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then - if [ "$SCALA_HOME" ]; then - RUNNER="${SCALA_HOME}/bin/scala" - else - if [ `command -v scala` ]; then - RUNNER="scala" - else - echo "SCALA_HOME is not set and scala is not in PATH" >&2 - exit 1 - fi - fi -else - if [ -n "${JAVA_HOME}" ]; then - RUNNER="${JAVA_HOME}/bin/java" - else - if [ `command -v java` ]; then - RUNNER="java" - else - echo "JAVA_HOME is not set" >&2 - exit 1 - fi - fi - if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then - if [ -z "$SCALA_HOME" ]; then - echo "SCALA_HOME is not set" >&2 - exit 1 - fi - SCALA_LIBRARY_PATH="$SCALA_HOME/lib" - fi -fi - -# Figure out how much memory to use per executor and set it as an environment -# variable so that our process sees it and can report it to Mesos -if [ -z "$SPARK_MEM" ] ; then - SPARK_MEM="512m" -fi -export SPARK_MEM - -# Set JAVA_OPTS to be able to load native libraries and to set heap size -JAVA_OPTS="$OUR_JAVA_OPTS" -JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" -JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" -# Load extra JAVA_OPTS from conf/java-opts, if it exists -if [ -e $FWDIR/conf/java-opts ] ; then - JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" -fi -export JAVA_OPTS -# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! - -if [ ! -f "$FWDIR/RELEASE" ]; then - CORE_DIR="$FWDIR/core" - EXAMPLES_DIR="$FWDIR/examples" - REPL_DIR="$FWDIR/repl" - - # Exit if the user hasn't compiled Spark - if [ ! -e "$CORE_DIR/target" ]; then - echo "Failed to find Spark classes in $CORE_DIR/target" >&2 - echo "You need to compile Spark before running this program" >&2 - exit 1 - fi - - if [[ "$@" = *repl* && ! -e "$REPL_DIR/target" ]]; then - echo "Failed to find Spark classes in $REPL_DIR/target" >&2 - echo "You need to compile Spark repl module before running this program" >&2 - exit 1 - fi - - # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack - # to avoid the -sources and -doc packages that are built by publish-local. - if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar ]; then - # Use the JAR from the SBT build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*[0-9T].jar` - fi - if [ -e "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar ]; then - # Use the JAR from the Maven build - export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` - fi -fi - -# Compute classpath using external script -CLASSPATH=`$FWDIR/bin/compute-classpath.sh` -export CLASSPATH - -if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then - EXTRA_ARGS="" # Java options will be passed to scala as JAVA_OPTS -else - # The JVM doesn't read JAVA_OPTS by default so we need to pass it in - EXTRA_ARGS="$JAVA_OPTS" -fi - -command="$RUNNER -cp \"$CLASSPATH\" $EXTRA_ARGS $@" -if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then - echo "Spark Command: $command" - echo "========================================" - echo -fi - -exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@" diff --git a/run-example b/run-example new file mode 100755 index 0000000000..e1b26257e1 --- /dev/null +++ b/run-example @@ -0,0 +1,76 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SCALA_VERSION=2.9.3 + +# Figure out where the Scala framework is installed +FWDIR="$(cd `dirname $0`; pwd)" + +# Export this as SPARK_HOME +export SPARK_HOME="$FWDIR" + +# Load environment variables from conf/spark-env.sh, if it exists +if [ -e $FWDIR/conf/spark-env.sh ] ; then + . $FWDIR/conf/spark-env.sh +fi + +if [ -z "$1" ]; then + echo "Usage: run-example []" >&2 + exit 1 +fi + +# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack +# to avoid the -sources and -doc packages that are built by publish-local. +EXAMPLES_DIR="$FWDIR"/examples +SPARK_EXAMPLES_JAR="" +if [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9T].jar ]; then + # Use the JAR from the SBT build + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9T].jar` +fi +if [ -e "$EXAMPLES_DIR"/target/spark-examples*[0-9T].jar ]; then + # Use the JAR from the Maven build + # TODO: this also needs to become an assembly! + export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/spark-examples*[0-9T].jar` +fi +if [[ -z $SPARK_EXAMPLES_JAR ]]; then + echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 +fi + +# Find java binary +if [ -n "${JAVA_HOME}" ]; then + RUNNER="${JAVA_HOME}/bin/java" +else + if [ `command -v java` ]; then + RUNNER="java" + else + echo "JAVA_HOME is not set" >&2 + exit 1 + fi +fi + +if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then + echo -n "Spark Command: " + echo "$RUNNER" -cp "$SPARK_EXAMPLES_JAR" "$@" + echo "========================================" + echo +fi + +exec "$RUNNER" -cp "$SPARK_EXAMPLES_JAR" "$@" diff --git a/spark-class b/spark-class new file mode 100755 index 0000000000..5ef3de9773 --- /dev/null +++ b/spark-class @@ -0,0 +1,124 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +SCALA_VERSION=2.9.3 + +# Figure out where the Scala framework is installed +FWDIR="$(cd `dirname $0`; pwd)" + +# Export this as SPARK_HOME +export SPARK_HOME="$FWDIR" + +# Load environment variables from conf/spark-env.sh, if it exists +if [ -e $FWDIR/conf/spark-env.sh ] ; then + . $FWDIR/conf/spark-env.sh +fi + +if [ -z "$1" ]; then + echo "Usage: run []" >&2 + exit 1 +fi + +# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable +# values for that; it doesn't need a lot +if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then + SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m} + SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true" + # Do not overwrite SPARK_JAVA_OPTS environment variable in this script + OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS" # Empty by default +else + OUR_JAVA_OPTS="$SPARK_JAVA_OPTS" +fi + + +# Add java opts for master, worker, executor. The opts maybe null +case "$1" in + 'spark.deploy.master.Master') + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS" + ;; + 'spark.deploy.worker.Worker') + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS" + ;; + 'spark.executor.StandaloneExecutorBackend') + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + ;; + 'spark.executor.MesosExecutorBackend') + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS" + ;; + 'spark.repl.Main') + OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS" + ;; +esac + +# Find the java binary +if [ -n "${JAVA_HOME}" ]; then + RUNNER="${JAVA_HOME}/bin/java" +else + if [ `command -v java` ]; then + RUNNER="java" + else + echo "JAVA_HOME is not set" >&2 + exit 1 + fi +fi +if [[ ! -f "$FWDIR/RELEASE" && -z "$SCALA_LIBRARY_PATH" ]]; then + if [ -z "$SCALA_HOME" ]; then + echo "SCALA_HOME is not set" >&2 + exit 1 + fi + SCALA_LIBRARY_PATH="$SCALA_HOME/lib" +fi + +# Set SPARK_MEM if it isn't already set since we also use it for this process +SPARK_MEM=${SPARK_MEM:-512m} +export SPARK_MEM + +# Set JAVA_OPTS to be able to load native libraries and to set heap size +JAVA_OPTS="$OUR_JAVA_OPTS" +JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" +JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM" +# Load extra JAVA_OPTS from conf/java-opts, if it exists +if [ -e $FWDIR/conf/java-opts ] ; then + JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" +fi +export JAVA_OPTS +# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in ExecutorRunner.scala! + +if [ ! -f "$FWDIR/RELEASE" ]; then + # Exit if the user hasn't compiled Spark + ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*.jar >& /dev/null + if [[ $? != 0 ]]; then + echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 + fi +fi + +# Compute classpath using external script +CLASSPATH=`$FWDIR/bin/compute-classpath.sh` +export CLASSPATH + +if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then + echo -n "Spark Command: " + echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" + echo "========================================" + echo +fi + +exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" diff --git a/spark-executor b/spark-executor index feccbf5cc2..63692bd46c 100755 --- a/spark-executor +++ b/spark-executor @@ -19,4 +19,4 @@ FWDIR="`dirname $0`" echo "Running spark-executor with framework dir = $FWDIR" -exec $FWDIR/run spark.executor.MesosExecutorBackend +exec $FWDIR/spark-class spark.executor.MesosExecutorBackend diff --git a/spark-shell b/spark-shell index 62fc18550d..4d379c5cfb 100755 --- a/spark-shell +++ b/spark-shell @@ -79,8 +79,7 @@ if [[ ! $? ]]; then saved_stty="" fi -export SPARK_LAUNCH_WITH_SCALA=${SPARK_LAUNCH_WITH_SCALA:-1} -$FWDIR/run $OPTIONS spark.repl.Main "$@" +$FWDIR/spark-class $OPTIONS spark.repl.Main "$@" # record the exit status lest it be overwritten: # then reenable echo and propagate the code. -- cgit v1.2.3 From ab0e625d9e0abd62a20754125952e3a00f2c275a Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 22 Aug 2013 23:02:09 -0700 Subject: Fix PySpark for assembly run and include it in dist --- .gitignore | 1 + core/lib/PY4J_LICENSE.txt | 27 +++++++++++++++++++++++++++ core/lib/PY4J_VERSION.txt | 1 + core/lib/py4j0.7.jar | Bin 0 -> 103286 bytes make-distribution.sh | 5 ++++- pyspark | 12 ++++++++---- python/lib/py4j0.7.jar | Bin 103286 -> 0 bytes 7 files changed, 41 insertions(+), 5 deletions(-) create mode 100644 core/lib/PY4J_LICENSE.txt create mode 100644 core/lib/PY4J_VERSION.txt create mode 100644 core/lib/py4j0.7.jar delete mode 100644 python/lib/py4j0.7.jar (limited to 'make-distribution.sh') diff --git a/.gitignore b/.gitignore index 00fbff6a2c..e1f64a1133 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ checkpoint derby.log dist/ spark-*-bin.tar.gz +unit-tests.log diff --git a/core/lib/PY4J_LICENSE.txt b/core/lib/PY4J_LICENSE.txt new file mode 100644 index 0000000000..a70279ca14 --- /dev/null +++ b/core/lib/PY4J_LICENSE.txt @@ -0,0 +1,27 @@ + +Copyright (c) 2009-2011, Barthelemy Dagenais All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +- Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +- The name of the author may not be used to endorse or promote products +derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. diff --git a/core/lib/PY4J_VERSION.txt b/core/lib/PY4J_VERSION.txt new file mode 100644 index 0000000000..04a0cd52a8 --- /dev/null +++ b/core/lib/PY4J_VERSION.txt @@ -0,0 +1 @@ +b7924aabe9c5e63f0a4d8bbd17019534c7ec014e diff --git a/core/lib/py4j0.7.jar b/core/lib/py4j0.7.jar new file mode 100644 index 0000000000..73b7ddb7d1 Binary files /dev/null and b/core/lib/py4j0.7.jar differ diff --git a/make-distribution.sh b/make-distribution.sh index df7bbf1e74..92b2706126 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -94,11 +94,14 @@ echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE cp $FWDIR/assembly/target/*/*assembly*.jar "$DISTDIR/jars/" # Copy other things +mkdir "$DISTDIR"/conf +cp -r "$FWDIR/conf/*.template" "$DISTDIR" cp -r "$FWDIR/bin" "$DISTDIR" -cp -r "$FWDIR/conf" "$DISTDIR" +cp -r "$FWDIR/python" "$DISTDIR" cp "$FWDIR/spark-class" "$DISTDIR" cp "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/spark-executor" "$DISTDIR" +cp "$FWDIR/pyspark" "$DISTDIR" if [ "$MAKE_TGZ" == "true" ]; then diff --git a/pyspark b/pyspark index 801239c108..155ccd4fdf 100755 --- a/pyspark +++ b/pyspark @@ -24,10 +24,14 @@ FWDIR="$(cd `dirname $0`; pwd)" export SPARK_HOME="$FWDIR" # Exit if the user hasn't compiled Spark -if [ ! -e "$SPARK_HOME/repl/target" ]; then - echo "Failed to find Spark classes in $SPARK_HOME/repl/target" >&2 - echo "You need to compile Spark before running this program" >&2 - exit 1 +if [ ! -f "$FWDIR/RELEASE" ]; then + # Exit if the user hasn't compiled Spark + ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*.jar >& /dev/null + if [[ $? != 0 ]]; then + echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2 + echo "You need to compile Spark before running this program" >&2 + exit 1 + fi fi # Load environment variables from conf/spark-env.sh, if it exists diff --git a/python/lib/py4j0.7.jar b/python/lib/py4j0.7.jar deleted file mode 100644 index 73b7ddb7d1..0000000000 Binary files a/python/lib/py4j0.7.jar and /dev/null differ -- cgit v1.2.3 From d8a4008685996756c3fc871a5a196a60d8e8989c Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 27 Aug 2013 19:44:59 -0700 Subject: Fix path to assembly in make-distribution.sh --- make-distribution.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'make-distribution.sh') diff --git a/make-distribution.sh b/make-distribution.sh index 92b2706126..91f6278491 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -91,7 +91,7 @@ mkdir -p "$DISTDIR/jars" echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE" # Copy jars -cp $FWDIR/assembly/target/*/*assembly*.jar "$DISTDIR/jars/" +cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/" # Copy other things mkdir "$DISTDIR"/conf -- cgit v1.2.3