From daaca14c16dc2c1abc98f15ab8c6f7c14761b627 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 11 Nov 2014 21:36:48 -0800 Subject: Support cross building for Scala 2.11 Let's give this another go using a version of Hive that shades its JLine dependency. Author: Prashant Sharma Author: Patrick Wendell Closes #3159 from pwendell/scala-2.11-prashant and squashes the following commits: e93aa3e [Patrick Wendell] Restoring -Phive-thriftserver profile and cleaning up build script. f65d17d [Patrick Wendell] Fixing build issue due to merge conflict a8c41eb [Patrick Wendell] Reverting dev/run-tests back to master state. 7a6eb18 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into scala-2.11-prashant 583aa07 [Prashant Sharma] REVERT ME: removed hive thirftserver 3680e58 [Prashant Sharma] Revert "REVERT ME: Temporarily removing some Cli tests." 935fb47 [Prashant Sharma] Revert "Fixed by disabling a few tests temporarily." 925e90f [Prashant Sharma] Fixed by disabling a few tests temporarily. 2fffed3 [Prashant Sharma] Exclude groovy from sbt build, and also provide a way for such instances in future. 8bd4e40 [Prashant Sharma] Switched to gmaven plus, it fixes random failures observer with its predecessor gmaven. 5272ce5 [Prashant Sharma] SPARK_SCALA_VERSION related bugs. 2121071 [Patrick Wendell] Migrating version detection to PySpark b1ed44d [Patrick Wendell] REVERT ME: Temporarily removing some Cli tests. 1743a73 [Patrick Wendell] Removing decimal test that doesn't work with Scala 2.11 f5cad4e [Patrick Wendell] Add Scala 2.11 docs 210d7e1 [Patrick Wendell] Revert "Testing new Hive version with shaded jline" 48518ce [Patrick Wendell] Remove association of Hive and Thriftserver profiles. e9d0a06 [Patrick Wendell] Revert "Enable thritfserver for Scala 2.10 only" 67ec364 [Patrick Wendell] Guard building of thriftserver around Scala 2.10 check 8502c23 [Patrick Wendell] Enable thritfserver for Scala 2.10 only e22b104 [Patrick Wendell] Small fix in pom file ec402ab [Patrick Wendell] Various fixes 0be5a9d [Patrick Wendell] Testing new Hive version with shaded jline 4eaec65 [Prashant Sharma] Changed scripts to ignore target. 5167bea [Prashant Sharma] small correction a4fcac6 [Prashant Sharma] Run against scala 2.11 on jenkins. 80285f4 [Prashant Sharma] MAven equivalent of setting spark.executor.extraClasspath during tests. 034b369 [Prashant Sharma] Setting test jars on executor classpath during tests from sbt. d4874cb [Prashant Sharma] Fixed Python Runner suite. null check should be first case in scala 2.11. 6f50f13 [Prashant Sharma] Fixed build after rebasing with master. We should use ${scala.binary.version} instead of just 2.10 e56ca9d [Prashant Sharma] Print an error if build for 2.10 and 2.11 is spotted. 937c0b8 [Prashant Sharma] SCALA_VERSION -> SPARK_SCALA_VERSION cb059b0 [Prashant Sharma] Code review 0476e5e [Prashant Sharma] Scala 2.11 support with repl and all build changes. --- .rat-excludes | 1 + assembly/pom.xml | 13 +- bin/compute-classpath.sh | 46 +- bin/load-spark-env.sh | 20 + bin/pyspark | 6 +- bin/run-example | 8 +- bin/spark-class | 8 +- core/pom.xml | 57 +- .../org/apache/spark/deploy/PythonRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- dev/change-version-to-2.10.sh | 20 + dev/change-version-to-2.11.sh | 21 + dev/create-release/create-release.sh | 12 +- dev/run-tests | 13 +- dev/scalastyle | 2 +- docs/building-spark.md | 31 +- docs/sql-programming-guide.md | 2 +- examples/pom.xml | 199 ++- .../examples/streaming/JavaKafkaWordCount.java | 113 ++ .../spark/examples/streaming/KafkaWordCount.scala | 102 ++ .../examples/streaming/TwitterAlgebirdCMS.scala | 114 ++ .../examples/streaming/TwitterAlgebirdHLL.scala | 92 ++ .../examples/streaming/JavaKafkaWordCount.java | 113 -- .../spark/examples/streaming/KafkaWordCount.scala | 102 -- .../examples/streaming/TwitterAlgebirdCMS.scala | 114 -- .../examples/streaming/TwitterAlgebirdHLL.scala | 92 -- external/mqtt/pom.xml | 5 - make-distribution.sh | 2 +- network/shuffle/pom.xml | 4 +- network/yarn/pom.xml | 2 +- pom.xml | 178 ++- project/SparkBuild.scala | 36 +- project/project/SparkPluginBuild.scala | 2 +- repl/pom.xml | 90 +- .../main/scala/org/apache/spark/repl/Main.scala | 33 + .../org/apache/spark/repl/SparkCommandLine.scala | 37 + .../org/apache/spark/repl/SparkExprTyper.scala | 114 ++ .../scala/org/apache/spark/repl/SparkHelper.scala | 22 + .../scala/org/apache/spark/repl/SparkILoop.scala | 1091 +++++++++++++++ .../org/apache/spark/repl/SparkILoopInit.scala | 147 ++ .../scala/org/apache/spark/repl/SparkIMain.scala | 1445 ++++++++++++++++++++ .../scala/org/apache/spark/repl/SparkImports.scala | 238 ++++ .../apache/spark/repl/SparkJLineCompletion.scala | 377 +++++ .../org/apache/spark/repl/SparkJLineReader.scala | 90 ++ .../apache/spark/repl/SparkMemberHandlers.scala | 232 ++++ .../apache/spark/repl/SparkRunnerSettings.scala | 32 + .../scala/org/apache/spark/repl/ReplSuite.scala | 318 +++++ .../main/scala/org/apache/spark/repl/Main.scala | 85 ++ .../org/apache/spark/repl/SparkExprTyper.scala | 86 ++ .../scala/org/apache/spark/repl/SparkILoop.scala | 966 +++++++++++++ .../scala/org/apache/spark/repl/SparkIMain.scala | 1319 ++++++++++++++++++ .../scala/org/apache/spark/repl/SparkImports.scala | 201 +++ .../apache/spark/repl/SparkJLineCompletion.scala | 350 +++++ .../apache/spark/repl/SparkMemberHandlers.scala | 221 +++ .../org/apache/spark/repl/SparkReplReporter.scala | 53 + .../scala/org/apache/spark/repl/ReplSuite.scala | 326 +++++ .../main/scala/org/apache/spark/repl/Main.scala | 33 - .../org/apache/spark/repl/SparkCommandLine.scala | 37 - .../org/apache/spark/repl/SparkExprTyper.scala | 114 -- .../scala/org/apache/spark/repl/SparkHelper.scala | 22 - .../scala/org/apache/spark/repl/SparkILoop.scala | 1091 --------------- .../org/apache/spark/repl/SparkILoopInit.scala | 147 -- .../scala/org/apache/spark/repl/SparkIMain.scala | 1445 -------------------- .../scala/org/apache/spark/repl/SparkImports.scala | 238 ---- .../apache/spark/repl/SparkJLineCompletion.scala | 377 ----- .../org/apache/spark/repl/SparkJLineReader.scala | 90 -- .../apache/spark/repl/SparkMemberHandlers.scala | 232 ---- .../apache/spark/repl/SparkRunnerSettings.scala | 32 - .../scala/org/apache/spark/repl/ReplSuite.scala | 318 ----- sql/catalyst/pom.xml | 29 +- .../sql/catalyst/types/decimal/DecimalSuite.scala | 1 - 71 files changed, 8801 insertions(+), 4812 deletions(-) create mode 100755 dev/change-version-to-2.10.sh create mode 100755 dev/change-version-to-2.11.sh create mode 100644 examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java create mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala create mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala create mode 100644 examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala delete mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java delete mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala delete mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala create mode 100644 repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala create mode 100644 repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkIMain.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkImports.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala create mode 100644 repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkReplReporter.scala create mode 100644 repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/Main.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkImports.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala delete mode 100644 repl/src/main/scala/org/apache/spark/repl/SparkRunnerSettings.scala delete mode 100644 repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala diff --git a/.rat-excludes b/.rat-excludes index 20e3372464..d8bee1f8e4 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -44,6 +44,7 @@ SparkImports.scala SparkJLineCompletion.scala SparkJLineReader.scala SparkMemberHandlers.scala +SparkReplReporter.scala sbt sbt-launch-lib.bash plugins.sbt diff --git a/assembly/pom.xml b/assembly/pom.xml index 31a01e4d8e..c65192bde6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -66,22 +66,22 @@ org.apache.spark - spark-repl_${scala.binary.version} + spark-streaming_${scala.binary.version} ${project.version} org.apache.spark - spark-streaming_${scala.binary.version} + spark-graphx_${scala.binary.version} ${project.version} org.apache.spark - spark-graphx_${scala.binary.version} + spark-sql_${scala.binary.version} ${project.version} org.apache.spark - spark-sql_${scala.binary.version} + spark-repl_${scala.binary.version} ${project.version} @@ -197,6 +197,11 @@ spark-hive_${scala.binary.version} ${project.version} + + + + hive-thriftserver + org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 905bbaf99b..298641f268 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -20,8 +20,6 @@ # This script computes Spark's classpath and prints it to stdout; it's used by both the "run" # script and the ExecutorRunner in standalone cluster mode. -SCALA_VERSION=2.10 - # Figure out where Spark is installed FWDIR="$(cd "`dirname "$0"`"/..; pwd)" @@ -36,7 +34,7 @@ else CLASSPATH="$CLASSPATH:$FWDIR/conf" fi -ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION" +ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SPARK_SCALA_VERSION" if [ -n "$JAVA_HOME" ]; then JAR_CMD="$JAVA_HOME/bin/jar" @@ -48,19 +46,19 @@ fi if [ -n "$SPARK_PREPEND_CLASSES" ]; then echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\ "classes ahead of assembly." >&2 - CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*" - CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes" - CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes" fi # Use spark-assembly jar from either RELEASE or assembly directory @@ -123,15 +121,15 @@ fi # Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1 if [[ $SPARK_TESTING == 1 ]]; then - CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/test-classes" - CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SPARK_SCALA_VERSION/test-classes" + CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/test-classes" fi # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh index 6d4231b204..356b3d49b2 100644 --- a/bin/load-spark-env.sh +++ b/bin/load-spark-env.sh @@ -36,3 +36,23 @@ if [ -z "$SPARK_ENV_LOADED" ]; then set +a fi fi + +# Setting SPARK_SCALA_VERSION if not already set. + +if [ -z "$SPARK_SCALA_VERSION" ]; then + + ASSEMBLY_DIR2="$FWDIR/assembly/target/scala-2.11" + ASSEMBLY_DIR1="$FWDIR/assembly/target/scala-2.10" + + if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then + echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2 + echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2 + exit 1 + fi + + if [ -d "$ASSEMBLY_DIR2" ]; then + export SPARK_SCALA_VERSION="2.11" + else + export SPARK_SCALA_VERSION="2.10" + fi +fi diff --git a/bin/pyspark b/bin/pyspark index 96f30a260a..1d8c94d43d 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -25,7 +25,7 @@ export SPARK_HOME="$FWDIR" source "$FWDIR/bin/utils.sh" -SCALA_VERSION=2.10 +source "$FWDIR"/bin/load-spark-env.sh function usage() { echo "Usage: ./bin/pyspark [options]" 1>&2 @@ -40,7 +40,7 @@ fi # Exit if the user hasn't compiled Spark if [ ! -f "$FWDIR/RELEASE" ]; then # Exit if the user hasn't compiled Spark - ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null + ls "$FWDIR"/assembly/target/scala-$SPARK_SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null if [[ $? != 0 ]]; then echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2 echo "You need to build Spark before running this program" 1>&2 @@ -48,8 +48,6 @@ if [ ! -f "$FWDIR/RELEASE" ]; then fi fi -. "$FWDIR"/bin/load-spark-env.sh - # In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython` # executable, while the worker would still be launched using PYSPARK_PYTHON. # diff --git a/bin/run-example b/bin/run-example index 34dd71c718..3d93250942 100755 --- a/bin/run-example +++ b/bin/run-example @@ -17,12 +17,12 @@ # limitations under the License. # -SCALA_VERSION=2.10 - FWDIR="$(cd "`dirname "$0"`"/..; pwd)" export SPARK_HOME="$FWDIR" EXAMPLES_DIR="$FWDIR"/examples +. "$FWDIR"/bin/load-spark-env.sh + if [ -n "$1" ]; then EXAMPLE_CLASS="$1" shift @@ -36,8 +36,8 @@ fi if [ -f "$FWDIR/RELEASE" ]; then export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`" -elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar ]; then - export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar`" +elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then + export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`" fi if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then diff --git a/bin/spark-class b/bin/spark-class index 925367b0dd..0d58d95c1a 100755 --- a/bin/spark-class +++ b/bin/spark-class @@ -24,8 +24,6 @@ case "`uname`" in CYGWIN*) cygwin=true;; esac -SCALA_VERSION=2.10 - # Figure out where Spark is installed FWDIR="$(cd "`dirname "$0"`"/..; pwd)" @@ -128,9 +126,9 @@ fi TOOLS_DIR="$FWDIR"/tools SPARK_TOOLS_JAR="" -if [ -e "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then +if [ -e "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar ]; then # Use the JAR from the SBT build - export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SCALA_VERSION/spark-tools*[0-9Tg].jar`" + export SPARK_TOOLS_JAR="`ls "$TOOLS_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-tools*[0-9Tg].jar`" fi if [ -e "$TOOLS_DIR"/target/spark-tools*[0-9Tg].jar ]; then # Use the JAR from the Maven build @@ -149,7 +147,7 @@ fi if [[ "$1" =~ org.apache.spark.tools.* ]]; then if test -z "$SPARK_TOOLS_JAR"; then - echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2 + echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2 echo "You need to build Spark before running $1." 1>&2 exit 1 fi diff --git a/core/pom.xml b/core/pom.xml index 41296e0eca..492eddda74 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,34 @@ Spark Project Core http://spark.apache.org/ + + com.twitter + chill_${scala.binary.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + + + com.twitter + chill-java + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + org.apache.hadoop hadoop-client @@ -46,12 +74,12 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} org.apache.spark - spark-network-shuffle_2.10 + spark-network-shuffle_${scala.binary.version} ${project.version} @@ -132,14 +160,6 @@ net.jpountz.lz4 lz4 - - com.twitter - chill_${scala.binary.version} - - - com.twitter - chill-java - org.roaringbitmap RoaringBitmap @@ -309,14 +329,16 @@ org.scalatest scalatest-maven-plugin - - - ${basedir}/.. - 1 - ${spark.classpath} - - + + + test + + test + + + + org.apache.maven.plugins @@ -424,4 +446,5 @@ + diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index af94b05ce3..039c8719e2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -87,8 +87,8 @@ object PythonRunner { // Strip the URI scheme from the path formattedPath = new URI(formattedPath).getScheme match { - case Utils.windowsDrive(d) if windows => formattedPath case null => formattedPath + case Utils.windowsDrive(d) if windows => formattedPath case _ => new URI(formattedPath).getPath } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b43e68e40f..8a62519bd2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -340,7 +340,7 @@ object SparkSubmit { e.printStackTrace(printStream) if (childMainClass.contains("thriftserver")) { println(s"Failed to load main class $childMainClass.") - println("You need to build Spark with -Phive.") + println("You need to build Spark with -Phive and -Phive-thriftserver.") } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) } diff --git a/dev/change-version-to-2.10.sh b/dev/change-version-to-2.10.sh new file mode 100755 index 0000000000..7473c20d28 --- /dev/null +++ b/dev/change-version-to-2.10.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +find . -name 'pom.xml' | grep -v target \ + | xargs -I {} sed -i -e 's|\(artifactId.*\)_2.11|\1_2.10|g' {} diff --git a/dev/change-version-to-2.11.sh b/dev/change-version-to-2.11.sh new file mode 100755 index 0000000000..3957a9f3ba --- /dev/null +++ b/dev/change-version-to-2.11.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +find . -name 'pom.xml' | grep -v target \ + | xargs -I {} sed -i -e 's|\(artifactId.*\)_2.10|\1_2.11|g' {} diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh index 50a9a2fa1c..db441b3e49 100755 --- a/dev/create-release/create-release.sh +++ b/dev/create-release/create-release.sh @@ -118,13 +118,13 @@ make_binary_release() { spark-$RELEASE_VERSION-bin-$NAME.tgz.sha } -make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4" & -make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & -make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Pyarn" & -make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Pyarn" & +make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" & +make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" & +make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" & +make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" & make_binary_release "hadoop2.4-without-hive" "-Phadoop-2.4 -Pyarn" & -make_binary_release "mapr3" "-Pmapr3 -Phive" & -make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive" & +make_binary_release "mapr3" "-Pmapr3 -Phive -Phive-thriftserver" & +make_binary_release "mapr4" "-Pmapr4 -Pyarn -Phive -Phive-thriftserver" & wait # Copy data diff --git a/dev/run-tests b/dev/run-tests index de607e4344..328a73bd8b 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -139,9 +139,6 @@ echo "=========================================================================" CURRENT_BLOCK=$BLOCK_BUILD { - # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" - # NOTE: echo "q" is needed because sbt on encountering a build file with failure #+ (either resolution or compilation) prompts the user for input either q, r, etc @@ -151,15 +148,17 @@ CURRENT_BLOCK=$BLOCK_BUILD # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? # First build with 0.12 to ensure patches do not break the hive 12 build + HIVE_12_BUILD_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver -Phive-0.12.0" echo "[info] Compile with hive 0.12" echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \ + | sbt/sbt $HIVE_12_BUILD_ARGS clean hive/compile hive-thriftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" # Then build with default version(0.13.1) because tests are based on this version - echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive" + echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS"\ + " -Phive -Phive-thriftserver" echo -e "q\n" \ - | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver package assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } @@ -174,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver" fi if [ -n "$_SQL_TESTS_ONLY" ]; then diff --git a/dev/scalastyle b/dev/scalastyle index ed1b6b730a..c3c6012e74 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -17,7 +17,7 @@ # limitations under the License. # -echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt +echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt # Check style with YARN alpha built too echo -e "q\n" | sbt/sbt -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \ >> scalastyle.txt diff --git a/docs/building-spark.md b/docs/building-spark.md index 238ddae155..20ba7da5d7 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -101,25 +101,34 @@ mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -Dski # Building With Hive and JDBC Support To enable Hive integration for Spark SQL along with its JDBC server and CLI, -add the `-Phive` profile to your existing build options. By default Spark -will build with Hive 0.13.1 bindings. You can also build for Hive 0.12.0 using -the `-Phive-0.12.0` profile. +add the `-Phive` and `Phive-thriftserver` profiles to your existing build options. +By default Spark will build with Hive 0.13.1 bindings. You can also build for +Hive 0.12.0 using the `-Phive-0.12.0` profile. {% highlight bash %} # Apache Hadoop 2.4.X with Hive 13 support -mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package +mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support -mvn -Pyarn -Phive-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package +mvn -Pyarn -Phive -Phive-thriftserver-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package {% endhighlight %} +# Building for Scala 2.11 +To produce a Spark package compiled with Scala 2.11, use the `-Pscala-2.11` profile: + + mvn -Pyarn -Phadoop-2.4 -Pscala-2.11 -DskipTests clean package + +Scala 2.11 support in Spark is experimental and does not support a few features. +Specifically, Spark's external Kafka library and JDBC component are not yet +supported in Scala 2.11 builds. + # Spark Tests in Maven Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.org/user_guide/using_the_scalatest_maven_plugin). Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence: - mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive clean package - mvn -Pyarn -Phadoop-2.3 -Phive test + mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-thriftserver clean package + mvn -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test The ScalaTest plugin also supports running only a specific test suite as follows: @@ -182,16 +191,16 @@ can be set to control the SBT build. For example: Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly - sbt/sbt -Pyarn -Phadoop-2.3 -Phive test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver assembly + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver test To run only a specific test suite as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite" + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver "test-only org.apache.spark.repl.ReplSuite" To run test suites of a specific sub project as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver core/test # Speeding up Compilation with Zinc diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ffcce2c588..48e8267ac0 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -728,7 +728,7 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/). However, since Hive has a large number of dependencies, it is not included in the default Spark assembly. -In order to use Hive you must first run "`sbt/sbt -Phive assembly/assembly`" (or use `-Phive` for maven). +Hive support is enabled by adding the `-Phive` and `-Phive-thriftserver` flags to Spark's build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive. diff --git a/examples/pom.xml b/examples/pom.xml index 910eb55308..2ec5728154 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -34,48 +34,6 @@ Spark Project Examples http://spark.apache.org/ - - - kinesis-asl - - - org.apache.spark - spark-streaming-kinesis-asl_${scala.binary.version} - ${project.version} - - - org.apache.httpcomponents - httpclient - ${commons.httpclient.version} - - - - - hbase-hadoop2 - - - hbase.profile - hadoop2 - - - - 0.98.7-hadoop2 - - - - hbase-hadoop1 - - - !hbase.profile - - - - 0.98.7-hadoop1 - - - - - @@ -124,11 +82,6 @@ spark-streaming-twitter_${scala.binary.version} ${project.version} - - org.apache.spark - spark-streaming-kafka_${scala.binary.version} - ${project.version} - org.apache.spark spark-streaming-flume_${scala.binary.version} @@ -136,12 +89,12 @@ org.apache.spark - spark-streaming-zeromq_${scala.binary.version} + spark-streaming-mqtt_${scala.binary.version} ${project.version} org.apache.spark - spark-streaming-mqtt_${scala.binary.version} + spark-streaming-zeromq_${scala.binary.version} ${project.version} @@ -260,11 +213,6 @@ test-jar test - - com.twitter - algebird-core_${scala.binary.version} - 0.1.11 - org.apache.commons commons-math3 @@ -401,4 +349,147 @@ + + + kinesis-asl + + + org.apache.spark + spark-streaming-kinesis-asl_${scala.binary.version} + ${project.version} + + + org.apache.httpcomponents + httpclient + ${commons.httpclient.version} + + + + + hbase-hadoop2 + + + hbase.profile + hadoop2 + + + + 0.98.7-hadoop2 + + + + hbase-hadoop1 + + + !hbase.profile + + + + 0.98.7-hadoop1 + + + + + scala-2.10 + + true + + + + org.apache.spark + spark-streaming-kafka_${scala.binary.version} + ${project.version} + + + com.twitter + algebird-core_${scala.binary.version} + 0.1.11 + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.10/src/main/scala + scala-2.10/src/main/java + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.10/src/test/scala + scala-2.10/src/test/java + + + + + + + + + + scala-2.11 + + false + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.11/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.11/src/test/scala + + + + + + + + + diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000000..16ae9a3319 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * + * Usage: JavaKafkaWordCount + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + int numThreads = Integer.parseInt(args[3]); + Map topicMap = new HashMap(); + String[] topics = args[2].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream messages = + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); + + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala new file mode 100644 index 0000000000..c9e1511278 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.util.Properties + +import kafka.producer._ + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: KafkaWordCount + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * Example: + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` + */ +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(zkQuorum, group, topics, numThreads) = args + val sparkConf = new SparkConf().setAppName("KafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)) + .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCountProducer " + + " ") + System.exit(1) + } + + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeper connection properties + val props = new Properties() + props.put("metadata.broker.list", brokers) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) + }.toArray + + producer.send(messages: _*) + Thread.sleep(100) + } + } + +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..683752ac96 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird._ + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.twitter._ + +// scalastyle:off +/** + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + *
+ * Note that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + *

+ *

+ * + * This blog post has a good overview of the Count-Min Sketch (CMS). The CMS is a data + * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency + * of any given element, etc), that uses space sub-linear in the number of elements in the + * stream. Once elements are added to the CMS, the estimated count of an element can be computed, + * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total + * count. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + StreamingExamples.setStreamingLogLevels() + + // CMS parameters + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + // K highest frequency elements to take + val TOPK = 10 + + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + val ssc = new StreamingContext(sparkConf, Seconds(10)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) + + val users = stream.map(status => status.getUser.getId) + + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) + var globalCMS = cms.zero + val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..62db5e663b --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf + +// scalastyle:off +/** + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + *

+ *

+ * This + * blog post and this + * + * blog post + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for + * estimating the cardinality of a data stream, i.e. the number of unique elements. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + + StreamingExamples.setStreamingLogLevels() + + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ + val BIT_SIZE = 12 + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + val ssc = new StreamingContext(sparkConf, Seconds(5)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 + ) * 100)) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java deleted file mode 100644 index 16ae9a3319..0000000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Pattern; - - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * - * Usage: JavaKafkaWordCount - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ - * zoo03 my-consumer-group topic1,topic2 1` - */ - -public final class JavaKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - private JavaKafkaWordCount() { - } - - public static void main(String[] args) { - if (args.length < 4) { - System.err.println("Usage: JavaKafkaWordCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); - // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - int numThreads = Integer.parseInt(args[3]); - Map topicMap = new HashMap(); - String[] topics = args[2].split(","); - for (String topic: topics) { - topicMap.put(topic, numThreads); - } - - JavaPairReceiverInputDStream messages = - KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); - - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala deleted file mode 100644 index c9e1511278..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import java.util.Properties - -import kafka.producer._ - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.kafka._ -import org.apache.spark.SparkConf - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: KafkaWordCount - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * Example: - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` - */ -object KafkaWordCount { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCount ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(zkQuorum, group, topics, numThreads) = args - val sparkConf = new SparkConf().setAppName("KafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("checkpoint") - - val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1L)) - .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -// Produces some random words between 1 and 100. -object KafkaWordCountProducer { - - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCountProducer " + - " ") - System.exit(1) - } - - val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - - // Zookeper connection properties - val props = new Properties() - props.put("metadata.broker.list", brokers) - props.put("serializer.class", "kafka.serializer.StringEncoder") - - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) - - // Send some messages - while(true) { - val messages = (1 to messagesPerSec.toInt).map { messageNum => - val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) - .mkString(" ") - - new KeyedMessage[String, String](topic, str) - }.toArray - - producer.send(messages: _*) - Thread.sleep(100) - } - } - -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala deleted file mode 100644 index 683752ac96..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import com.twitter.algebird._ - -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.twitter._ - -// scalastyle:off -/** - * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute - * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. - *
- * Note that since Algebird's implementation currently only supports Long inputs, - * the example operates on Long IDs. Once the implementation supports other inputs (such as String), - * the same approach could be used for computing popular topics for example. - *

- *

- * - * This blog post has a good overview of the Count-Min Sketch (CMS). The CMS is a data - * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency - * of any given element, etc), that uses space sub-linear in the number of elements in the - * stream. Once elements are added to the CMS, the estimated count of an element can be computed, - * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total - * count. - *

- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdCMS { - def main(args: Array[String]) { - StreamingExamples.setStreamingLogLevels() - - // CMS parameters - val DELTA = 1E-3 - val EPS = 0.01 - val SEED = 1 - val PERC = 0.001 - // K highest frequency elements to take - val TOPK = 10 - - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") - val ssc = new StreamingContext(sparkConf, Seconds(10)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) - - val users = stream.map(status => status.getUser.getId) - - val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) - var globalCMS = cms.zero - val mm = new MapMonoid[Long, Int]() - var globalExact = Map[Long, Int]() - - val approxTopUsers = users.mapPartitions(ids => { - ids.map(id => cms.create(id)) - }).reduce(_ ++ _) - - val exactTopUsers = users.map(id => (id, 1)) - .reduceByKey((a, b) => a + b) - - approxTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - globalCMS ++= partial - val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, - partialTopK.mkString("[", ",", "]"))) - println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, - globalTopK.mkString("[", ",", "]"))) - } - }) - - exactTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partialMap = rdd.collect().toMap - val partialTopK = rdd.map( - {case (id, count) => (count, id)}) - .sortByKey(ascending = false).take(TOPK) - globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) - println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala deleted file mode 100644 index 62db5e663b..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import com.twitter.algebird.HyperLogLogMonoid -import com.twitter.algebird.HyperLogLog._ - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ -import org.apache.spark.SparkConf - -// scalastyle:off -/** - * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute - * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. - *

- *

- * This - * blog post and this - * - * blog post - * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for - * estimating the cardinality of a data stream, i.e. the number of unique elements. - *

- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdHLL { - def main(args: Array[String]) { - - StreamingExamples.setStreamingLogLevels() - - /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ - val BIT_SIZE = 12 - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") - val ssc = new StreamingContext(sparkConf, Seconds(5)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) - - val users = stream.map(status => status.getUser.getId) - - val hll = new HyperLogLogMonoid(BIT_SIZE) - var globalHll = hll.zero - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - ids.map(id => hll(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - approxUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - globalHll += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) - } - }) - - exactUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 - ) * 100)) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 371f1f1e9d..362a76e515 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -52,11 +52,6 @@ mqtt-client 0.4.0 - - ${akka.group} - akka-zeromq_${scala.binary.version} - ${akka.version} - org.scalatest scalatest_${scala.binary.version} diff --git a/make-distribution.sh b/make-distribution.sh index 0bc839e1db..d46edbc50d 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -59,7 +59,7 @@ while (( "$#" )); do exit_with_usage ;; --with-hive) - echo "Error: '--with-hive' is no longer supported, use Maven option -Phive" + echo "Error: '--with-hive' is no longer supported, use Maven options -Phive and -Phive-thriftserver" exit_with_usage ;; --skip-java-test) diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 27c8467687..a180a5e5f9 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -39,7 +39,7 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} @@ -58,7 +58,7 @@ org.apache.spark - spark-network-common_2.10 + spark-network-common_${scala.binary.version} ${project.version} test-jar test diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 6e6f6f3e79..85960eb85b 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -39,7 +39,7 @@ org.apache.spark - spark-network-shuffle_2.10 + spark-network-shuffle_${scala.binary.version} ${project.version} diff --git a/pom.xml b/pom.xml index 4e0cd6c151..7bbde31e57 100644 --- a/pom.xml +++ b/pom.xml @@ -97,30 +97,26 @@ sql/catalyst sql/core sql/hive - repl assembly external/twitter - external/kafka external/flume external/flume-sink - external/zeromq external/mqtt + external/zeromq examples + repl UTF-8 UTF-8 - + org.spark-project.akka + 2.3.4-spark 1.6 spark - 2.10.4 - 2.10 2.0.1 0.18.1 shaded-protobuf - org.spark-project.akka - 2.3.4-spark 1.7.5 1.2.17 1.0.4 @@ -137,7 +133,7 @@ 1.6.0rc3 1.2.3 8.1.14.v20131031 - 0.3.6 + 0.5.0 3.0.0 1.7.6 @@ -146,9 +142,13 @@ 1.1.0 4.2.6 3.1.1 - + ${project.build.directory}/spark-test-classpath.txt 64m 512m + 2.10.4 + 2.10 + ${scala.version} + org.scala-lang @@ -267,19 +267,66 @@ + + - org.spark-project.spark unused 1.0.0 + + + org.codehaus.groovy + groovy-all + 2.3.7 + provided + + + ${jline.groupid} + jline + ${jline.version} + + + com.twitter + chill_${scala.binary.version} + ${chill.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + + + com.twitter + chill-java + ${chill.version} + + + org.ow2.asm + asm + + + org.ow2.asm + asm-commons + + + org.eclipse.jetty jetty-util @@ -395,36 +442,6 @@ protobuf-java ${protobuf.version} - - com.twitter - chill_${scala.binary.version} - ${chill.version} - - - org.ow2.asm - asm - - - org.ow2.asm - asm-commons - - - - - com.twitter - chill-java - ${chill.version} - - - org.ow2.asm - asm - - - org.ow2.asm - asm-commons - - - ${akka.group} akka-actor_${scala.binary.version} @@ -512,11 +529,6 @@ scala-reflect ${scala.version} - - org.scala-lang - jline - ${scala.version} - org.scala-lang scala-library @@ -965,6 +977,7 @@ ${session.executionRootDirectory} 1 false + ${test_classpath} @@ -1026,6 +1039,47 @@ + + + org.apache.maven.plugins + maven-dependency-plugin + 2.9 + + + test-compile + + build-classpath + + + test + ${test_classpath_file} + + + + + + + + org.codehaus.gmavenplus + gmavenplus-plugin + 1.2 + + + process-test-classes + + execute + + + + + + + + + org.apache.maven.plugins @@ -1335,7 +1389,7 @@ - hive + hive-thriftserver false @@ -1365,5 +1419,35 @@ 10.10.1.1 + + + scala-2.10 + + true + + + 2.10.4 + 2.10 + ${scala.version} + org.scala-lang + + + external/kafka + + + + + scala-2.11 + + false + + + 2.11.2 + 2.11 + 2.12 + jline + + + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 657e4b4432..5eb3ed439c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -31,8 +31,8 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, - streamingMqtt, streamingTwitter, streamingZeromq) = + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + streamingMqtt, streamingTwitter, streamingZeromq) = Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", @@ -68,8 +68,8 @@ object SparkBuild extends PomBuild { profiles ++= Seq("spark-ganglia-lgpl") } if (Properties.envOrNone("SPARK_HIVE").isDefined) { - println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.") - profiles ++= Seq("hive") + println("NOTE: SPARK_HIVE is deprecated, please use -Phive and -Phive-thriftserver flags.") + profiles ++= Seq("hive", "hive-thriftserver") } Properties.envOrNone("SPARK_HADOOP_VERSION") match { case Some(v) => @@ -91,13 +91,21 @@ object SparkBuild extends PomBuild { profiles } - override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { + override val profiles = { + val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { case None => backwardCompatibility case Some(v) => if (backwardCompatibility.nonEmpty) println("Note: We ignore environment variables, when use of profile is detected in " + "conjunction with environment variable.") v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq + } + if (profiles.exists(_.contains("scala-"))) { + profiles + } else { + println("Enabled default scala profile") + profiles ++ Seq("scala-2.10") + } } Properties.envOrNone("SBT_MAVEN_PROPERTIES") match { @@ -136,7 +144,8 @@ object SparkBuild extends PomBuild { // Note ordering of these settings matter. /* Enable shared settings on all projects */ - (allProjects ++ optionallyEnabledProjects ++ assemblyProjects).foreach(enable(sharedSettings)) + (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, tools)) + .foreach(enable(sharedSettings ++ ExludedDependencies.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) @@ -178,6 +187,16 @@ object Flume { lazy val settings = sbtavro.SbtAvro.avroSettings } +/** + This excludes library dependencies in sbt, which are specified in maven but are + not needed by sbt build. + */ +object ExludedDependencies { + lazy val settings = Seq( + libraryDependencies ~= { libs => libs.filterNot(_.name == "groovy-all") } + ) +} + /** * Following project only exists to pull previous artifacts of Spark for generating * Mima ignores. For more information see: SPARK 2071 @@ -353,8 +372,11 @@ object TestSettings { .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" .split(" ").toSeq, + // This places test scope jars on the classpath of executors during tests. + javaOptions in Test += + "-Dspark.executor.extraClassPath=" + (fullClasspath in Test).value.files. + map(_.getAbsolutePath).mkString(":").stripSuffix(":"), javaOptions += "-Xmx3g", - // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 3ef2d5451d..8863f272da 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -26,7 +26,7 @@ import sbt.Keys._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, sbtPomReader) lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings) - lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git") + lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id") // There is actually no need to publish this artifact. def styleSettings = Defaults.defaultSettings ++ Seq ( diff --git a/repl/pom.xml b/repl/pom.xml index af528c8914..bd688c8c1e 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -38,6 +38,11 @@ + + ${jline.groupid} + jline + ${jline.version} + org.apache.spark spark-core_${scala.binary.version} @@ -75,11 +80,6 @@ scala-reflect ${scala.version} - - org.scala-lang - jline - ${scala.version} - org.slf4j jul-to-slf4j @@ -124,4 +124,84 @@ + + + scala-2.10 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.10/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.10/src/test/scala + + + + + + + + + + scala-2.11 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + src/main/scala + scala-2.11/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + scala-2.11/src/test/scala + + + + + + + + + diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000000..14b448d076 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.collection.mutable.Set + +object Main { + private var _interp: SparkILoop = _ + + def interp = _interp + + def interp_=(i: SparkILoop) { _interp = i } + + def main(args: Array[String]) { + _interp = new SparkILoop + _interp.process(args) + } +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala new file mode 100644 index 0000000000..05816941b5 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.tools.nsc.{Settings, CompilerCommand} +import scala.Predef._ + +/** + * Command class enabling Spark-specific command line options (provided by + * org.apache.spark.repl.SparkRunnerSettings). + */ +class SparkCommandLine(args: List[String], override val settings: Settings) + extends CompilerCommand(args, settings) { + + def this(args: List[String], error: String => Unit) { + this(args, new SparkRunnerSettings(error)) + } + + def this(args: List[String]) { + this(args, str => Console.println("Error: " + str)) + } +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala new file mode 100644 index 0000000000..f8432c8af6 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -0,0 +1,114 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package org.apache.spark.repl + +import scala.tools.nsc._ +import scala.tools.nsc.interpreter._ + +import scala.reflect.internal.util.BatchSourceFile +import scala.tools.nsc.ast.parser.Tokens.EOF + +import org.apache.spark.Logging + +trait SparkExprTyper extends Logging { + val repl: SparkIMain + + import repl._ + import global.{ reporter => _, Import => _, _ } + import definitions._ + import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name } + import naming.freshInternalVarName + + object codeParser extends { val global: repl.global.type = repl.global } with CodeHandlers[Tree] { + def applyRule[T](code: String, rule: UnitParser => T): T = { + reporter.reset() + val scanner = newUnitParser(code) + val result = rule(scanner) + + if (!reporter.hasErrors) + scanner.accept(EOF) + + result + } + + def defns(code: String) = stmts(code) collect { case x: DefTree => x } + def expr(code: String) = applyRule(code, _.expr()) + def stmts(code: String) = applyRule(code, _.templateStats()) + def stmt(code: String) = stmts(code).last // guaranteed nonempty + } + + /** Parse a line into a sequence of trees. Returns None if the input is incomplete. */ + def parse(line: String): Option[List[Tree]] = debugging(s"""parse("$line")""") { + var isIncomplete = false + reporter.withIncompleteHandler((_, _) => isIncomplete = true) { + val trees = codeParser.stmts(line) + if (reporter.hasErrors) { + Some(Nil) + } else if (isIncomplete) { + None + } else { + Some(trees) + } + } + } + // def parsesAsExpr(line: String) = { + // import codeParser._ + // (opt expr line).isDefined + // } + + def symbolOfLine(code: String): Symbol = { + def asExpr(): Symbol = { + val name = freshInternalVarName() + // Typing it with a lazy val would give us the right type, but runs + // into compiler bugs with things like existentials, so we compile it + // behind a def and strip the NullaryMethodType which wraps the expr. + val line = "def " + name + " = {\n" + code + "\n}" + + interpretSynthetic(line) match { + case IR.Success => + val sym0 = symbolOfTerm(name) + // drop NullaryMethodType + val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType) + if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym + case _ => NoSymbol + } + } + def asDefn(): Symbol = { + val old = repl.definedSymbolList.toSet + + interpretSynthetic(code) match { + case IR.Success => + repl.definedSymbolList filterNot old match { + case Nil => NoSymbol + case sym :: Nil => sym + case syms => NoSymbol.newOverloaded(NoPrefix, syms) + } + case _ => NoSymbol + } + } + beQuietDuring(asExpr()) orElse beQuietDuring(asDefn()) + } + + private var typeOfExpressionDepth = 0 + def typeOfExpression(expr: String, silent: Boolean = true): Type = { + if (typeOfExpressionDepth > 2) { + logDebug("Terminating typeOfExpression recursion for expression: " + expr) + return NoType + } + typeOfExpressionDepth += 1 + // Don't presently have a good way to suppress undesirable success output + // while letting errors through, so it is first trying it silently: if there + // is an error, and errors are desired, then it re-evaluates non-silently + // to induce the error message. + try beSilentDuring(symbolOfLine(expr).tpe) match { + case NoType if !silent => symbolOfLine(expr).tpe // generate error + case tpe => tpe + } + finally typeOfExpressionDepth -= 1 + } +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala new file mode 100644 index 0000000000..5340951d91 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package scala.tools.nsc + +object SparkHelper { + def explicitParentLoader(settings: Settings) = settings.explicitParentLoader +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000000..e56b74edba --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,1091 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Alexander Spoon + */ + +package org.apache.spark.repl + + +import java.net.URL + +import scala.reflect.io.AbstractFile +import scala.tools.nsc._ +import scala.tools.nsc.backend.JavaPlatform +import scala.tools.nsc.interpreter._ + +import scala.tools.nsc.interpreter.{Results => IR} +import Predef.{println => _, _} +import java.io.{BufferedReader, FileReader} +import java.net.URI +import java.util.concurrent.locks.ReentrantLock +import scala.sys.process.Process +import scala.tools.nsc.interpreter.session._ +import scala.util.Properties.{jdkHome, javaVersion} +import scala.tools.util.{Javap} +import scala.annotation.tailrec +import scala.collection.mutable.ListBuffer +import scala.concurrent.ops +import scala.tools.nsc.util._ +import scala.tools.nsc.interpreter._ +import scala.tools.nsc.io.{File, Directory} +import scala.reflect.NameTransformer._ +import scala.tools.nsc.util.ScalaClassLoader._ +import scala.tools.util._ +import scala.language.{implicitConversions, existentials, postfixOps} +import scala.reflect.{ClassTag, classTag} +import scala.tools.reflect.StdRuntimeTags._ + +import java.lang.{Class => jClass} +import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse} + +import org.apache.spark.Logging +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.util.Utils + +/** The Scala interactive shell. It provides a read-eval-print loop + * around the Interpreter class. + * After instantiation, clients should call the main() method. + * + * If no in0 is specified, then input will come from the console, and + * the class will attempt to provide input editing feature such as + * input history. + * + * @author Moez A. Abdel-Gawad + * @author Lex Spoon + * @version 1.2 + */ +class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, + val master: Option[String]) + extends AnyRef + with LoopCommands + with SparkILoopInit + with Logging +{ + def this(in0: BufferedReader, out: JPrintWriter, master: String) = this(Some(in0), out, Some(master)) + def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None) + def this() = this(None, new JPrintWriter(Console.out, true), None) + + var in: InteractiveReader = _ // the input stream from which commands come + var settings: Settings = _ + var intp: SparkIMain = _ + + @deprecated("Use `intp` instead.", "2.9.0") def interpreter = intp + @deprecated("Use `intp` instead.", "2.9.0") def interpreter_= (i: SparkIMain): Unit = intp = i + + /** Having inherited the difficult "var-ness" of the repl instance, + * I'm trying to work around it by moving operations into a class from + * which it will appear a stable prefix. + */ + private def onIntp[T](f: SparkIMain => T): T = f(intp) + + class IMainOps[T <: SparkIMain](val intp: T) { + import intp._ + import global._ + + def printAfterTyper(msg: => String) = + intp.reporter printMessage afterTyper(msg) + + /** Strip NullaryMethodType artifacts. */ + private def replInfo(sym: Symbol) = { + sym.info match { + case NullaryMethodType(restpe) if sym.isAccessor => restpe + case info => info + } + } + def echoTypeStructure(sym: Symbol) = + printAfterTyper("" + deconstruct.show(replInfo(sym))) + + def echoTypeSignature(sym: Symbol, verbose: Boolean) = { + if (verbose) SparkILoop.this.echo("// Type signature") + printAfterTyper("" + replInfo(sym)) + + if (verbose) { + SparkILoop.this.echo("\n// Internal Type structure") + echoTypeStructure(sym) + } + } + } + implicit def stabilizeIMain(intp: SparkIMain) = new IMainOps[intp.type](intp) + + /** TODO - + * -n normalize + * -l label with case class parameter names + * -c complete - leave nothing out + */ + private def typeCommandInternal(expr: String, verbose: Boolean): Result = { + onIntp { intp => + val sym = intp.symbolOfLine(expr) + if (sym.exists) intp.echoTypeSignature(sym, verbose) + else "" + } + } + + var sparkContext: SparkContext = _ + + override def echoCommandMessage(msg: String) { + intp.reporter printMessage msg + } + + // def isAsync = !settings.Yreplsync.value + def isAsync = false + // lazy val power = new Power(intp, new StdReplVals(this))(tagOfStdReplVals, classTag[StdReplVals]) + def history = in.history + + /** The context class loader at the time this object was created */ + protected val originalClassLoader = Utils.getContextOrSparkClassLoader + + // classpath entries added via :cp + var addedClasspath: String = "" + + /** A reverse list of commands to replay if the user requests a :replay */ + var replayCommandStack: List[String] = Nil + + /** A list of commands to replay if the user requests a :replay */ + def replayCommands = replayCommandStack.reverse + + /** Record a command for replay should the user request a :replay */ + def addReplay(cmd: String) = replayCommandStack ::= cmd + + def savingReplayStack[T](body: => T): T = { + val saved = replayCommandStack + try body + finally replayCommandStack = saved + } + def savingReader[T](body: => T): T = { + val saved = in + try body + finally in = saved + } + + + def sparkCleanUp(){ + echo("Stopping spark context.") + intp.beQuietDuring { + command("sc.stop()") + } + } + /** Close the interpreter and set the var to null. */ + def closeInterpreter() { + if (intp ne null) { + sparkCleanUp() + intp.close() + intp = null + } + } + + class SparkILoopInterpreter extends SparkIMain(settings, out) { + outer => + + override lazy val formatting = new Formatting { + def prompt = SparkILoop.this.prompt + } + override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader) + } + + /** Create a new interpreter. */ + def createInterpreter() { + require(settings != null) + + if (addedClasspath != "") settings.classpath.append(addedClasspath) + val addedJars = + if (Utils.isWindows) { + // Strip any URI scheme prefix so we can add the correct path to the classpath + // e.g. file:/C:/my/path.jar -> C:/my/path.jar + SparkILoop.getAddedJars.map { jar => new URI(jar).getPath.stripPrefix("/") } + } else { + SparkILoop.getAddedJars + } + // work around for Scala bug + val totalClassPath = addedJars.foldLeft( + settings.classpath.value)((l, r) => ClassPath.join(l, r)) + this.settings.classpath.value = totalClassPath + + intp = new SparkILoopInterpreter + } + + /** print a friendly help message */ + def helpCommand(line: String): Result = { + if (line == "") helpSummary() + else uniqueCommand(line) match { + case Some(lc) => echo("\n" + lc.longHelp) + case _ => ambiguousError(line) + } + } + private def helpSummary() = { + val usageWidth = commands map (_.usageMsg.length) max + val formatStr = "%-" + usageWidth + "s %s %s" + + echo("All commands can be abbreviated, e.g. :he instead of :help.") + echo("Those marked with a * have more detailed help, e.g. :help imports.\n") + + commands foreach { cmd => + val star = if (cmd.hasLongHelp) "*" else " " + echo(formatStr.format(cmd.usageMsg, star, cmd.help)) + } + } + private def ambiguousError(cmd: String): Result = { + matchingCommands(cmd) match { + case Nil => echo(cmd + ": no such command. Type :help for help.") + case xs => echo(cmd + " is ambiguous: did you mean " + xs.map(":" + _.name).mkString(" or ") + "?") + } + Result(true, None) + } + private def matchingCommands(cmd: String) = commands filter (_.name startsWith cmd) + private def uniqueCommand(cmd: String): Option[LoopCommand] = { + // this lets us add commands willy-nilly and only requires enough command to disambiguate + matchingCommands(cmd) match { + case List(x) => Some(x) + // exact match OK even if otherwise appears ambiguous + case xs => xs find (_.name == cmd) + } + } + private var fallbackMode = false + + private def toggleFallbackMode() { + val old = fallbackMode + fallbackMode = !old + System.setProperty("spark.repl.fallback", fallbackMode.toString) + echo(s""" + |Switched ${if (old) "off" else "on"} fallback mode without restarting. + | If you have defined classes in the repl, it would + |be good to redefine them incase you plan to use them. If you still run + |into issues it would be good to restart the repl and turn on `:fallback` + |mode as first command. + """.stripMargin) + } + + /** Show the history */ + lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") { + override def usage = "[num]" + def defaultLines = 20 + + def apply(line: String): Result = { + if (history eq NoHistory) + return "No history available." + + val xs = words(line) + val current = history.index + val count = try xs.head.toInt catch { case _: Exception => defaultLines } + val lines = history.asStrings takeRight count + val offset = current - lines.size + 1 + + for ((line, index) <- lines.zipWithIndex) + echo("%3d %s".format(index + offset, line)) + } + } + + // When you know you are most likely breaking into the middle + // of a line being typed. This softens the blow. + protected def echoAndRefresh(msg: String) = { + echo("\n" + msg) + in.redrawLine() + } + protected def echo(msg: String) = { + out println msg + out.flush() + } + protected def echoNoNL(msg: String) = { + out print msg + out.flush() + } + + /** Search the history */ + def searchHistory(_cmdline: String) { + val cmdline = _cmdline.toLowerCase + val offset = history.index - history.size + 1 + + for ((line, index) <- history.asStrings.zipWithIndex ; if line.toLowerCase contains cmdline) + echo("%d %s".format(index + offset, line)) + } + + private var currentPrompt = Properties.shellPromptString + def setPrompt(prompt: String) = currentPrompt = prompt + /** Prompt to print when awaiting input */ + def prompt = currentPrompt + + import LoopCommand.{ cmd, nullary } + + /** Standard commands */ + lazy val standardCommands = List( + cmd("cp", "", "add a jar or directory to the classpath", addClasspath), + cmd("help", "[command]", "print this summary or command-specific help", helpCommand), + historyCommand, + cmd("h?", "", "search the history", searchHistory), + cmd("imports", "[name name ...]", "show import history, identifying sources of names", importsCommand), + cmd("implicits", "[-v]", "show the implicits in scope", implicitsCommand), + cmd("javap", "", "disassemble a file or class name", javapCommand), + cmd("load", "", "load and interpret a Scala file", loadCommand), + nullary("paste", "enter paste mode: all input up to ctrl-D compiled together", pasteCommand), +// nullary("power", "enable power user mode", powerCmd), + nullary("quit", "exit the repl", () => Result(false, None)), + nullary("replay", "reset execution and replay all previous commands", replay), + nullary("reset", "reset the repl to its initial state, forgetting all session entries", resetCommand), + shCommand, + nullary("silent", "disable/enable automatic printing of results", verbosity), + nullary("fallback", """ + |disable/enable advanced repl changes, these fix some issues but may introduce others. + |This mode will be removed once these fixes stablize""".stripMargin, toggleFallbackMode), + cmd("type", "[-v] ", "display the type of an expression without evaluating it", typeCommand), + nullary("warnings", "show the suppressed warnings from the most recent line which had any", warningsCommand) + ) + + /** Power user commands */ + lazy val powerCommands: List[LoopCommand] = List( + // cmd("phase", "", "set the implicit phase for power commands", phaseCommand) + ) + + // private def dumpCommand(): Result = { + // echo("" + power) + // history.asStrings takeRight 30 foreach echo + // in.redrawLine() + // } + // private def valsCommand(): Result = power.valsDescription + + private val typeTransforms = List( + "scala.collection.immutable." -> "immutable.", + "scala.collection.mutable." -> "mutable.", + "scala.collection.generic." -> "generic.", + "java.lang." -> "jl.", + "scala.runtime." -> "runtime." + ) + + private def importsCommand(line: String): Result = { + val tokens = words(line) + val handlers = intp.languageWildcardHandlers ++ intp.importHandlers + val isVerbose = tokens contains "-v" + + handlers.filterNot(_.importedSymbols.isEmpty).zipWithIndex foreach { + case (handler, idx) => + val (types, terms) = handler.importedSymbols partition (_.name.isTypeName) + val imps = handler.implicitSymbols + val found = tokens filter (handler importsSymbolNamed _) + val typeMsg = if (types.isEmpty) "" else types.size + " types" + val termMsg = if (terms.isEmpty) "" else terms.size + " terms" + val implicitMsg = if (imps.isEmpty) "" else imps.size + " are implicit" + val foundMsg = if (found.isEmpty) "" else found.mkString(" // imports: ", ", ", "") + val statsMsg = List(typeMsg, termMsg, implicitMsg) filterNot (_ == "") mkString ("(", ", ", ")") + + intp.reporter.printMessage("%2d) %-30s %s%s".format( + idx + 1, + handler.importString, + statsMsg, + foundMsg + )) + } + } + + private def implicitsCommand(line: String): Result = onIntp { intp => + import intp._ + import global._ + + def p(x: Any) = intp.reporter.printMessage("" + x) + + // If an argument is given, only show a source with that + // in its name somewhere. + val args = line split "\\s+" + val filtered = intp.implicitSymbolsBySource filter { + case (source, syms) => + (args contains "-v") || { + if (line == "") (source.fullName.toString != "scala.Predef") + else (args exists (source.name.toString contains _)) + } + } + + if (filtered.isEmpty) + return "No implicits have been imported other than those in Predef." + + filtered foreach { + case (source, syms) => + p("/* " + syms.size + " implicit members imported from " + source.fullName + " */") + + // This groups the members by where the symbol is defined + val byOwner = syms groupBy (_.owner) + val sortedOwners = byOwner.toList sortBy { case (owner, _) => afterTyper(source.info.baseClasses indexOf owner) } + + sortedOwners foreach { + case (owner, members) => + // Within each owner, we cluster results based on the final result type + // if there are more than a couple, and sort each cluster based on name. + // This is really just trying to make the 100 or so implicits imported + // by default into something readable. + val memberGroups: List[List[Symbol]] = { + val groups = members groupBy (_.tpe.finalResultType) toList + val (big, small) = groups partition (_._2.size > 3) + val xss = ( + (big sortBy (_._1.toString) map (_._2)) :+ + (small flatMap (_._2)) + ) + + xss map (xs => xs sortBy (_.name.toString)) + } + + val ownerMessage = if (owner == source) " defined in " else " inherited from " + p(" /* " + members.size + ownerMessage + owner.fullName + " */") + + memberGroups foreach { group => + group foreach (s => p(" " + intp.symbolDefString(s))) + p("") + } + } + p("") + } + } + + private def findToolsJar() = { + val jdkPath = Directory(jdkHome) + val jar = jdkPath / "lib" / "tools.jar" toFile; + + if (jar isFile) + Some(jar) + else if (jdkPath.isDirectory) + jdkPath.deepFiles find (_.name == "tools.jar") + else None + } + private def addToolsJarToLoader() = { + val cl = findToolsJar match { + case Some(tools) => ScalaClassLoader.fromURLs(Seq(tools.toURL), intp.classLoader) + case _ => intp.classLoader + } + if (Javap.isAvailable(cl)) { + logDebug(":javap available.") + cl + } + else { + logDebug(":javap unavailable: no tools.jar at " + jdkHome) + intp.classLoader + } + } + + protected def newJavap() = new JavapClass(addToolsJarToLoader(), new SparkIMain.ReplStrippingWriter(intp)) { + override def tryClass(path: String): Array[Byte] = { + val hd :: rest = path split '.' toList; + // If there are dots in the name, the first segment is the + // key to finding it. + if (rest.nonEmpty) { + intp optFlatName hd match { + case Some(flat) => + val clazz = flat :: rest mkString NAME_JOIN_STRING + val bytes = super.tryClass(clazz) + if (bytes.nonEmpty) bytes + else super.tryClass(clazz + MODULE_SUFFIX_STRING) + case _ => super.tryClass(path) + } + } + else { + // Look for Foo first, then Foo$, but if Foo$ is given explicitly, + // we have to drop the $ to find object Foo, then tack it back onto + // the end of the flattened name. + def className = intp flatName path + def moduleName = (intp flatName path.stripSuffix(MODULE_SUFFIX_STRING)) + MODULE_SUFFIX_STRING + + val bytes = super.tryClass(className) + if (bytes.nonEmpty) bytes + else super.tryClass(moduleName) + } + } + } + // private lazy val javap = substituteAndLog[Javap]("javap", NoJavap)(newJavap()) + private lazy val javap = + try newJavap() + catch { case _: Exception => null } + + // Still todo: modules. + private def typeCommand(line0: String): Result = { + line0.trim match { + case "" => ":type [-v] " + case s if s startsWith "-v " => typeCommandInternal(s stripPrefix "-v " trim, true) + case s => typeCommandInternal(s, false) + } + } + + private def warningsCommand(): Result = { + if (intp.lastWarnings.isEmpty) + "Can't find any cached warnings." + else + intp.lastWarnings foreach { case (pos, msg) => intp.reporter.warning(pos, msg) } + } + + private def javapCommand(line: String): Result = { + if (javap == null) + ":javap unavailable, no tools.jar at %s. Set JDK_HOME.".format(jdkHome) + else if (javaVersion startsWith "1.7") + ":javap not yet working with java 1.7" + else if (line == "") + ":javap [-lcsvp] [path1 path2 ...]" + else + javap(words(line)) foreach { res => + if (res.isError) return "Failed: " + res.value + else res.show() + } + } + + private def wrapCommand(line: String): Result = { + def failMsg = "Argument to :wrap must be the name of a method with signature [T](=> T): T" + onIntp { intp => + import intp._ + import global._ + + words(line) match { + case Nil => + intp.executionWrapper match { + case "" => "No execution wrapper is set." + case s => "Current execution wrapper: " + s + } + case "clear" :: Nil => + intp.executionWrapper match { + case "" => "No execution wrapper is set." + case s => intp.clearExecutionWrapper() ; "Cleared execution wrapper." + } + case wrapper :: Nil => + intp.typeOfExpression(wrapper) match { + case PolyType(List(targ), MethodType(List(arg), restpe)) => + intp setExecutionWrapper intp.pathToTerm(wrapper) + "Set wrapper to '" + wrapper + "'" + case tp => + failMsg + "\nFound: " + } + case _ => failMsg + } + } + } + + private def pathToPhaseWrapper = intp.pathToTerm("$r") + ".phased.atCurrent" + // private def phaseCommand(name: String): Result = { + // val phased: Phased = power.phased + // import phased.NoPhaseName + + // if (name == "clear") { + // phased.set(NoPhaseName) + // intp.clearExecutionWrapper() + // "Cleared active phase." + // } + // else if (name == "") phased.get match { + // case NoPhaseName => "Usage: :phase (e.g. typer, erasure.next, erasure+3)" + // case ph => "Active phase is '%s'. (To clear, :phase clear)".format(phased.get) + // } + // else { + // val what = phased.parse(name) + // if (what.isEmpty || !phased.set(what)) + // "'" + name + "' does not appear to represent a valid phase." + // else { + // intp.setExecutionWrapper(pathToPhaseWrapper) + // val activeMessage = + // if (what.toString.length == name.length) "" + what + // else "%s (%s)".format(what, name) + + // "Active phase is now: " + activeMessage + // } + // } + // } + + /** Available commands */ + def commands: List[LoopCommand] = standardCommands /*++ ( + if (isReplPower) powerCommands else Nil + )*/ + + private val replayQuestionMessage = + """|That entry seems to have slain the compiler. Shall I replay + |your session? I can re-run each line except the last one. + |[y/n] + """.trim.stripMargin + + private def crashRecovery(ex: Throwable): Boolean = { + echo(ex.toString) + ex match { + case _: NoSuchMethodError | _: NoClassDefFoundError => + echo("\nUnrecoverable error.") + throw ex + case _ => + def fn(): Boolean = + try in.readYesOrNo(replayQuestionMessage, { echo("\nYou must enter y or n.") ; fn() }) + catch { case _: RuntimeException => false } + + if (fn()) replay() + else echo("\nAbandoning crashed session.") + } + true + } + + /** The main read-eval-print loop for the repl. It calls + * command() for each line of input, and stops when + * command() returns false. + */ + def loop() { + def readOneLine() = { + out.flush() + in readLine prompt + } + // return false if repl should exit + def processLine(line: String): Boolean = { + if (isAsync) { + if (!awaitInitialized()) return false + runThunks() + } + if (line eq null) false // assume null means EOF + else command(line) match { + case Result(false, _) => false + case Result(_, Some(finalLine)) => addReplay(finalLine) ; true + case _ => true + } + } + def innerLoop() { + val shouldContinue = try { + processLine(readOneLine()) + } catch {case t: Throwable => crashRecovery(t)} + if (shouldContinue) + innerLoop() + } + innerLoop() + } + + /** interpret all lines from a specified file */ + def interpretAllFrom(file: File) { + savingReader { + savingReplayStack { + file applyReader { reader => + in = SimpleReader(reader, out, false) + echo("Loading " + file + "...") + loop() + } + } + } + } + + /** create a new interpreter and replay the given commands */ + def replay() { + reset() + if (replayCommandStack.isEmpty) + echo("Nothing to replay.") + else for (cmd <- replayCommands) { + echo("Replaying: " + cmd) // flush because maybe cmd will have its own output + command(cmd) + echo("") + } + } + def resetCommand() { + echo("Resetting repl state.") + if (replayCommandStack.nonEmpty) { + echo("Forgetting this session history:\n") + replayCommands foreach echo + echo("") + replayCommandStack = Nil + } + if (intp.namedDefinedTerms.nonEmpty) + echo("Forgetting all expression results and named terms: " + intp.namedDefinedTerms.mkString(", ")) + if (intp.definedTypes.nonEmpty) + echo("Forgetting defined types: " + intp.definedTypes.mkString(", ")) + + reset() + } + + def reset() { + intp.reset() + // unleashAndSetPhase() + } + + /** fork a shell and run a command */ + lazy val shCommand = new LoopCommand("sh", "run a shell command (result is implicitly => List[String])") { + override def usage = "" + def apply(line: String): Result = line match { + case "" => showUsage() + case _ => + val toRun = classOf[ProcessResult].getName + "(" + string2codeQuoted(line) + ")" + intp interpret toRun + () + } + } + + def withFile(filename: String)(action: File => Unit) { + val f = File(filename) + + if (f.exists) action(f) + else echo("That file does not exist") + } + + def loadCommand(arg: String) = { + var shouldReplay: Option[String] = None + withFile(arg)(f => { + interpretAllFrom(f) + shouldReplay = Some(":load " + arg) + }) + Result(true, shouldReplay) + } + + def addAllClasspath(args: Seq[String]): Unit = { + var added = false + var totalClasspath = "" + for (arg <- args) { + val f = File(arg).normalize + if (f.exists) { + added = true + addedClasspath = ClassPath.join(addedClasspath, f.path) + totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) + intp.addUrlsToClassPath(f.toURI.toURL) + sparkContext.addJar(f.toURI.toURL.getPath) + } + } + } + + def addClasspath(arg: String): Unit = { + val f = File(arg).normalize + if (f.exists) { + addedClasspath = ClassPath.join(addedClasspath, f.path) + intp.addUrlsToClassPath(f.toURI.toURL) + sparkContext.addJar(f.toURI.toURL.getPath) + echo("Added '%s'. Your new classpath is:\n\"%s\"".format(f.path, intp.global.classPath.asClasspathString)) + } + else echo("The path '" + f + "' doesn't seem to exist.") + } + + + def powerCmd(): Result = { + if (isReplPower) "Already in power mode." + else enablePowerMode(false) + } + + def enablePowerMode(isDuringInit: Boolean) = { + // replProps.power setValue true + // unleashAndSetPhase() + // asyncEcho(isDuringInit, power.banner) + } + // private def unleashAndSetPhase() { +// if (isReplPower) { +// // power.unleash() +// // Set the phase to "typer" +// intp beSilentDuring phaseCommand("typer") +// } +// } + + def asyncEcho(async: Boolean, msg: => String) { + if (async) asyncMessage(msg) + else echo(msg) + } + + def verbosity() = { + // val old = intp.printResults + // intp.printResults = !old + // echo("Switched " + (if (old) "off" else "on") + " result printing.") + } + + /** Run one command submitted by the user. Two values are returned: + * (1) whether to keep running, (2) the line to record for replay, + * if any. */ + def command(line: String): Result = { + if (line startsWith ":") { + val cmd = line.tail takeWhile (x => !x.isWhitespace) + uniqueCommand(cmd) match { + case Some(lc) => lc(line.tail stripPrefix cmd dropWhile (_.isWhitespace)) + case _ => ambiguousError(cmd) + } + } + else if (intp.global == null) Result(false, None) // Notice failure to create compiler + else Result(true, interpretStartingWith(line)) + } + + private def readWhile(cond: String => Boolean) = { + Iterator continually in.readLine("") takeWhile (x => x != null && cond(x)) + } + + def pasteCommand(): Result = { + echo("// Entering paste mode (ctrl-D to finish)\n") + val code = readWhile(_ => true) mkString "\n" + echo("\n// Exiting paste mode, now interpreting.\n") + intp interpret code + () + } + + private object paste extends Pasted { + val ContinueString = " | " + val PromptString = "scala> " + + def interpret(line: String): Unit = { + echo(line.trim) + intp interpret line + echo("") + } + + def transcript(start: String) = { + echo("\n// Detected repl transcript paste: ctrl-D to finish.\n") + apply(Iterator(start) ++ readWhile(_.trim != PromptString.trim)) + } + } + import paste.{ ContinueString, PromptString } + + /** Interpret expressions starting with the first line. + * Read lines until a complete compilation unit is available + * or until a syntax error has been seen. If a full unit is + * read, go ahead and interpret it. Return the full string + * to be recorded for replay, if any. + */ + def interpretStartingWith(code: String): Option[String] = { + // signal completion non-completion input has been received + in.completion.resetVerbosity() + + def reallyInterpret = { + val reallyResult = intp.interpret(code) + (reallyResult, reallyResult match { + case IR.Error => None + case IR.Success => Some(code) + case IR.Incomplete => + if (in.interactive && code.endsWith("\n\n")) { + echo("You typed two blank lines. Starting a new command.") + None + } + else in.readLine(ContinueString) match { + case null => + // we know compilation is going to fail since we're at EOF and the + // parser thinks the input is still incomplete, but since this is + // a file being read non-interactively we want to fail. So we send + // it straight to the compiler for the nice error message. + intp.compileString(code) + None + + case line => interpretStartingWith(code + "\n" + line) + } + }) + } + + /** Here we place ourselves between the user and the interpreter and examine + * the input they are ostensibly submitting. We intervene in several cases: + * + * 1) If the line starts with "scala> " it is assumed to be an interpreter paste. + * 2) If the line starts with "." (but not ".." or "./") it is treated as an invocation + * on the previous result. + * 3) If the Completion object's execute returns Some(_), we inject that value + * and avoid the interpreter, as it's likely not valid scala code. + */ + if (code == "") None + else if (!paste.running && code.trim.startsWith(PromptString)) { + paste.transcript(code) + None + } + else if (Completion.looksLikeInvocation(code) && intp.mostRecentVar != "") { + interpretStartingWith(intp.mostRecentVar + code) + } + else if (code.trim startsWith "//") { + // line comment, do nothing + None + } + else + reallyInterpret._2 + } + + // runs :load `file` on any files passed via -i + def loadFiles(settings: Settings) = settings match { + case settings: SparkRunnerSettings => + for (filename <- settings.loadfiles.value) { + val cmd = ":load " + filename + command(cmd) + addReplay(cmd) + echo("") + } + case _ => + } + + /** Tries to create a JLineReader, falling back to SimpleReader: + * unless settings or properties are such that it should start + * with SimpleReader. + */ + def chooseReader(settings: Settings): InteractiveReader = { + if (settings.Xnojline.value || Properties.isEmacsShell) + SimpleReader() + else try new SparkJLineReader( + if (settings.noCompletion.value) NoCompletion + else new SparkJLineCompletion(intp) + ) + catch { + case ex @ (_: Exception | _: NoClassDefFoundError) => + echo("Failed to created SparkJLineReader: " + ex + "\nFalling back to SimpleReader.") + SimpleReader() + } + } + + val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe + val m = u.runtimeMirror(Utils.getSparkClassLoader) + private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] = + u.TypeTag[T]( + m, + new TypeCreator { + def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type = + m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type] + }) + + def process(settings: Settings): Boolean = savingContextLoader { + if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + + this.settings = settings + createInterpreter() + + // sets in to some kind of reader depending on environmental cues + in = in0 match { + case Some(reader) => SimpleReader(reader, out, true) + case None => + // some post-initialization + chooseReader(settings) match { + case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x + case x => x + } + } + lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain] + // Bind intp somewhere out of the regular namespace where + // we can get at it in generated code. + addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain]))) + addThunk({ + import scala.tools.nsc.io._ + import Properties.userHome + import scala.compat.Platform.EOL + val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp()) + if (autorun.isDefined) intp.quietRun(autorun.get) + }) + + addThunk(printWelcome()) + addThunk(initializeSpark()) + + // it is broken on startup; go ahead and exit + if (intp.reporter.hasErrors) + return false + + // This is about the illusion of snappiness. We call initialize() + // which spins off a separate thread, then print the prompt and try + // our best to look ready. The interlocking lazy vals tend to + // inter-deadlock, so we break the cycle with a single asynchronous + // message to an actor. + if (isAsync) { + intp initialize initializedCallback() + createAsyncListener() // listens for signal to run postInitialization + } + else { + intp.initializeSynchronous() + postInitialization() + } + // printWelcome() + + loadFiles(settings) + + try loop() + catch AbstractOrMissingHandler() + finally closeInterpreter() + + true + } + + def createSparkContext(): SparkContext = { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + val jars = SparkILoop.getAddedJars + val conf = new SparkConf() + .setMaster(getMaster()) + .setAppName("Spark shell") + .setJars(jars) + .set("spark.repl.class.uri", intp.classServer.uri) + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + sparkContext = new SparkContext(conf) + logInfo("Created spark context..") + sparkContext + } + + private def getMaster(): String = { + val master = this.master match { + case Some(m) => m + case None => + val envMaster = sys.env.get("MASTER") + val propMaster = sys.props.get("spark.master") + propMaster.orElse(envMaster).getOrElse("local[*]") + } + master + } + + /** process command-line arguments and do as they request */ + def process(args: Array[String]): Boolean = { + val command = new SparkCommandLine(args.toList, msg => echo(msg)) + def neededHelp(): String = + (if (command.settings.help.value) command.usageMsg + "\n" else "") + + (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "") + + // if they asked for no help and command is valid, we call the real main + neededHelp() match { + case "" => command.ok && process(command.settings) + case help => echoNoNL(help) ; true + } + } + + @deprecated("Use `process` instead", "2.9.0") + def main(settings: Settings): Unit = process(settings) +} + +object SparkILoop { + implicit def loopToInterpreter(repl: SparkILoop): SparkIMain = repl.intp + private def echo(msg: String) = Console println msg + + def getAddedJars: Array[String] = { + val envJars = sys.env.get("ADD_JARS") + val propJars = sys.props.get("spark.jars").flatMap { p => + if (p == "") None else Some(p) + } + val jars = propJars.orElse(envJars).getOrElse("") + Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) + } + + // Designed primarily for use by test code: take a String with a + // bunch of code, and prints out a transcript of what it would look + // like if you'd just typed it into the repl. + def runForTranscript(code: String, settings: Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) { + override def write(str: String) = { + // completely skip continuation lines + if (str forall (ch => ch.isWhitespace || ch == '|')) () + // print a newline on empty scala prompts + else if ((str contains '\n') && (str.trim == "scala> ")) super.write("\n") + else super.write(str) + } + } + val input = new BufferedReader(new StringReader(code)) { + override def readLine(): String = { + val s = super.readLine() + // helping out by printing the line being interpreted. + if (s != null) + output.println(s) + s + } + } + val repl = new SparkILoop(input, output) + + if (settings.classpath.isDefault) + settings.classpath.value = sys.props("java.class.path") + + getAddedJars.foreach(settings.classpath.append(_)) + + repl process settings + } + } + } + + /** Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) + val repl = new ILoop(input, output) + + if (sets.classpath.isDefault) + sets.classpath.value = sys.props("java.class.path") + + repl process sets + } + } + } + def run(lines: List[String]): String = run(lines map (_ + "\n") mkString) +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala new file mode 100644 index 0000000000..7667a9c119 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -0,0 +1,147 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package org.apache.spark.repl + +import scala.tools.nsc._ +import scala.tools.nsc.interpreter._ + +import scala.reflect.internal.util.Position +import scala.util.control.Exception.ignoring +import scala.tools.nsc.util.stackTraceString + +import org.apache.spark.SPARK_VERSION + +/** + * Machinery for the asynchronous initialization of the repl. + */ +trait SparkILoopInit { + self: SparkILoop => + + /** Print a welcome message */ + def printWelcome() { + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ +""".format(SPARK_VERSION)) + import Properties._ + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + protected def asyncMessage(msg: String) { + if (isReplInfo || isReplPower) + echoAndRefresh(msg) + } + + private val initLock = new java.util.concurrent.locks.ReentrantLock() + private val initCompilerCondition = initLock.newCondition() // signal the compiler is initialized + private val initLoopCondition = initLock.newCondition() // signal the whole repl is initialized + private val initStart = System.nanoTime + + private def withLock[T](body: => T): T = { + initLock.lock() + try body + finally initLock.unlock() + } + // a condition used to ensure serial access to the compiler. + @volatile private var initIsComplete = false + @volatile private var initError: String = null + private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble / 1000000000L) + + // the method to be called when the interpreter is initialized. + // Very important this method does nothing synchronous (i.e. do + // not try to use the interpreter) because until it returns, the + // repl's lazy val `global` is still locked. + protected def initializedCallback() = withLock(initCompilerCondition.signal()) + + // Spins off a thread which awaits a single message once the interpreter + // has been initialized. + protected def createAsyncListener() = { + io.spawn { + withLock(initCompilerCondition.await()) + asyncMessage("[info] compiler init time: " + elapsed() + " s.") + postInitialization() + } + } + + // called from main repl loop + protected def awaitInitialized(): Boolean = { + if (!initIsComplete) + withLock { while (!initIsComplete) initLoopCondition.await() } + if (initError != null) { + println(""" + |Failed to initialize the REPL due to an unexpected error. + |This is a bug, please, report it along with the error diagnostics printed below. + |%s.""".stripMargin.format(initError) + ) + false + } else true + } + // private def warningsThunks = List( + // () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], intp.lastWarnings _), + // ) + + protected def postInitThunks = List[Option[() => Unit]]( + Some(intp.setContextClassLoader _), + if (isReplPower) Some(() => enablePowerMode(true)) else None + ).flatten + // ++ ( + // warningsThunks + // ) + // called once after init condition is signalled + protected def postInitialization() { + try { + postInitThunks foreach (f => addThunk(f())) + runThunks() + } catch { + case ex: Throwable => + initError = stackTraceString(ex) + throw ex + } finally { + initIsComplete = true + + if (isAsync) { + asyncMessage("[info] total init time: " + elapsed() + " s.") + withLock(initLoopCondition.signal()) + } + } + } + + def initializeSpark() { + intp.beQuietDuring { + command(""" + @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext(); + """) + command("import org.apache.spark.SparkContext._") + } + echo("Spark context available as sc.") + } + + // code to be executed only after the interpreter is initialized + // and the lazy val `global` can be accessed without risk of deadlock. + private var pendingThunks: List[() => Unit] = Nil + protected def addThunk(body: => Unit) = synchronized { + pendingThunks :+= (() => body) + } + protected def runThunks(): Unit = synchronized { + if (pendingThunks.nonEmpty) + logDebug("Clearing " + pendingThunks.size + " thunks.") + + while (pendingThunks.nonEmpty) { + val thunk = pendingThunks.head + pendingThunks = pendingThunks.tail + thunk() + } + } +} diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala new file mode 100644 index 0000000000..646c68e60c --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -0,0 +1,1445 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Martin Odersky + */ + +package org.apache.spark.repl + +import java.io.File + +import scala.tools.nsc._ +import scala.tools.nsc.backend.JavaPlatform +import scala.tools.nsc.interpreter._ + +import Predef.{ println => _, _ } +import scala.tools.nsc.util.{MergedClassPath, stringFromWriter, ScalaClassLoader, stackTraceString} +import scala.reflect.internal.util._ +import java.net.URL +import scala.sys.BooleanProp +import io.{AbstractFile, PlainFile, VirtualDirectory} + +import reporters._ +import symtab.Flags +import scala.reflect.internal.Names +import scala.tools.util.PathResolver +import ScalaClassLoader.URLClassLoader +import scala.tools.nsc.util.Exceptional.unwrap +import scala.collection.{ mutable, immutable } +import scala.util.control.Exception.{ ultimately } +import SparkIMain._ +import java.util.concurrent.Future +import typechecker.Analyzer +import scala.language.implicitConversions +import scala.reflect.runtime.{ universe => ru } +import scala.reflect.{ ClassTag, classTag } +import scala.tools.reflect.StdRuntimeTags._ +import scala.util.control.ControlThrowable + +import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf} +import org.apache.spark.util.Utils + +// /** directory to save .class files to */ +// private class ReplVirtualDirectory(out: JPrintWriter) extends VirtualDirectory("((memory))", None) { +// private def pp(root: AbstractFile, indentLevel: Int) { +// val spaces = " " * indentLevel +// out.println(spaces + root.name) +// if (root.isDirectory) +// root.toList sortBy (_.name) foreach (x => pp(x, indentLevel + 1)) +// } +// // print the contents hierarchically +// def show() = pp(this, 0) +// } + + /** An interpreter for Scala code. + * + * The main public entry points are compile(), interpret(), and bind(). + * The compile() method loads a complete Scala file. The interpret() method + * executes one line of Scala code at the request of the user. The bind() + * method binds an object to a variable that can then be used by later + * interpreted code. + * + * The overall approach is based on compiling the requested code and then + * using a Java classloader and Java reflection to run the code + * and access its results. + * + * In more detail, a single compiler instance is used + * to accumulate all successfully compiled or interpreted Scala code. To + * "interpret" a line of code, the compiler generates a fresh object that + * includes the line of code and which has public member(s) to export + * all variables defined by that code. To extract the result of an + * interpreted line to show the user, a second "result object" is created + * which imports the variables exported by the above object and then + * exports members called "$eval" and "$print". To accomodate user expressions + * that read from variables or methods defined in previous statements, "import" + * statements are used. + * + * This interpreter shares the strengths and weaknesses of using the + * full compiler-to-Java. The main strength is that interpreted code + * behaves exactly as does compiled code, including running at full speed. + * The main weakness is that redefining classes and methods is not handled + * properly, because rebinding at the Java level is technically difficult. + * + * @author Moez A. Abdel-Gawad + * @author Lex Spoon + */ + class SparkIMain( + initialSettings: Settings, + val out: JPrintWriter, + propagateExceptions: Boolean = false) + extends SparkImports with Logging { imain => + + val conf = new SparkConf() + + val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") + /** Local directory to save .class files too */ + lazy val outputDir = { + val tmp = System.getProperty("java.io.tmpdir") + val rootDir = conf.get("spark.repl.classdir", tmp) + Utils.createTempDir(rootDir) + } + if (SPARK_DEBUG_REPL) { + echo("Output directory: " + outputDir) + } + + val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles + /** Jetty server that will serve our classes to worker nodes */ + val classServerPort = conf.getInt("spark.replClassServer.port", 0) + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") + private var currentSettings: Settings = initialSettings + var printResults = true // whether to print result lines + var totalSilence = false // whether to print anything + private var _initializeComplete = false // compiler is initialized + private var _isInitialized: Future[Boolean] = null // set up initialization future + private var bindExceptions = true // whether to bind the lastException variable + private var _executionWrapper = "" // code to be wrapped around all lines + + + // Start the classServer and store its URI in a spark system property + // (which will be passed to executors so that they can connect to it) + classServer.start() + if (SPARK_DEBUG_REPL) { + echo("Class server started, URI = " + classServer.uri) + } + + /** We're going to go to some trouble to initialize the compiler asynchronously. + * It's critical that nothing call into it until it's been initialized or we will + * run into unrecoverable issues, but the perceived repl startup time goes + * through the roof if we wait for it. So we initialize it with a future and + * use a lazy val to ensure that any attempt to use the compiler object waits + * on the future. + */ + private var _classLoader: AbstractFileClassLoader = null // active classloader + private val _compiler: Global = newCompiler(settings, reporter) // our private compiler + + private trait ExposeAddUrl extends URLClassLoader { def addNewUrl(url: URL) = this.addURL(url) } + private var _runtimeClassLoader: URLClassLoader with ExposeAddUrl = null // wrapper exposing addURL + + private val nextReqId = { + var counter = 0 + () => { counter += 1 ; counter } + } + + def compilerClasspath: Seq[URL] = ( + if (isInitializeComplete) global.classPath.asURLs + else new PathResolver(settings).result.asURLs // the compiler's classpath + ) + def settings = currentSettings + def mostRecentLine = prevRequestList match { + case Nil => "" + case req :: _ => req.originalLine + } + // Run the code body with the given boolean settings flipped to true. + def withoutWarnings[T](body: => T): T = beQuietDuring { + val saved = settings.nowarn.value + if (!saved) + settings.nowarn.value = true + + try body + finally if (!saved) settings.nowarn.value = false + } + + /** construct an interpreter that reports to Console */ + def this(settings: Settings) = this(settings, new NewLinePrintWriter(new ConsoleWriter, true)) + def this() = this(new Settings()) + + lazy val repllog: Logger = new Logger { + val out: JPrintWriter = imain.out + val isInfo: Boolean = BooleanProp keyExists "scala.repl.info" + val isDebug: Boolean = BooleanProp keyExists "scala.repl.debug" + val isTrace: Boolean = BooleanProp keyExists "scala.repl.trace" + } + lazy val formatting: Formatting = new Formatting { + val prompt = Properties.shellPromptString + } + lazy val reporter: ConsoleReporter = new SparkIMain.ReplReporter(this) + + import formatting._ + import reporter.{ printMessage, withoutTruncating } + + // This exists mostly because using the reporter too early leads to deadlock. + private def echo(msg: String) { Console println msg } + private def _initSources = List(new BatchSourceFile("", "class $repl_$init { }")) + private def _initialize() = { + try { + // todo. if this crashes, REPL will hang + new _compiler.Run() compileSources _initSources + _initializeComplete = true + true + } + catch AbstractOrMissingHandler() + } + private def tquoted(s: String) = "\"\"\"" + s + "\"\"\"" + + // argument is a thunk to execute after init is done + def initialize(postInitSignal: => Unit) { + synchronized { + if (_isInitialized == null) { + _isInitialized = io.spawn { + try _initialize() + finally postInitSignal + } + } + } + } + def initializeSynchronous(): Unit = { + if (!isInitializeComplete) { + _initialize() + assert(global != null, global) + } + } + def isInitializeComplete = _initializeComplete + + /** the public, go through the future compiler */ + lazy val global: Global = { + if (isInitializeComplete) _compiler + else { + // If init hasn't been called yet you're on your own. + if (_isInitialized == null) { + logWarning("Warning: compiler accessed before init set up. Assuming no postInit code.") + initialize(()) + } + // // blocks until it is ; false means catastrophic failure + if (_isInitialized.get()) _compiler + else null + } + } + @deprecated("Use `global` for access to the compiler instance.", "2.9.0") + lazy val compiler: global.type = global + + import global._ + import definitions.{ScalaPackage, JavaLangPackage, termMember, typeMember} + import rootMirror.{RootClass, getClassIfDefined, getModuleIfDefined, getRequiredModule, getRequiredClass} + + implicit class ReplTypeOps(tp: Type) { + def orElse(other: => Type): Type = if (tp ne NoType) tp else other + def andAlso(fn: Type => Type): Type = if (tp eq NoType) tp else fn(tp) + } + + // TODO: If we try to make naming a lazy val, we run into big time + // scalac unhappiness with what look like cycles. It has not been easy to + // reduce, but name resolution clearly takes different paths. + object naming extends { + val global: imain.global.type = imain.global + } with Naming { + // make sure we don't overwrite their unwisely named res3 etc. + def freshUserTermName(): TermName = { + val name = newTermName(freshUserVarName()) + if (definedNameMap contains name) freshUserTermName() + else name + } + def isUserTermName(name: Name) = isUserVarName("" + name) + def isInternalTermName(name: Name) = isInternalVarName("" + name) + } + import naming._ + + object deconstruct extends { + val global: imain.global.type = imain.global + } with StructuredTypeStrings + + lazy val memberHandlers = new { + val intp: imain.type = imain + } with SparkMemberHandlers + import memberHandlers._ + + /** Temporarily be quiet */ + def beQuietDuring[T](body: => T): T = { + val saved = printResults + printResults = false + try body + finally printResults = saved + } + def beSilentDuring[T](operation: => T): T = { + val saved = totalSilence + totalSilence = true + try operation + finally totalSilence = saved + } + + def quietRun[T](code: String) = beQuietDuring(interpret(code)) + + + private def logAndDiscard[T](label: String, alt: => T): PartialFunction[Throwable, T] = { + case t: ControlThrowable => throw t + case t: Throwable => + logDebug(label + ": " + unwrap(t)) + logDebug(stackTraceString(unwrap(t))) + alt + } + /** takes AnyRef because it may be binding a Throwable or an Exceptional */ + + private def withLastExceptionLock[T](body: => T, alt: => T): T = { + assert(bindExceptions, "withLastExceptionLock called incorrectly.") + bindExceptions = false + + try beQuietDuring(body) + catch logAndDiscard("withLastExceptionLock", alt) + finally bindExceptions = true + } + + def executionWrapper = _executionWrapper + def setExecutionWrapper(code: String) = _executionWrapper = code + def clearExecutionWrapper() = _executionWrapper = "" + + /** interpreter settings */ + lazy val isettings = new SparkISettings(this) + + /** Instantiate a compiler. Overridable. */ + protected def newCompiler(settings: Settings, reporter: Reporter): ReplGlobal = { + settings.outputDirs setSingleOutput virtualDirectory + settings.exposeEmptyPackage.value = true + new Global(settings, reporter) with ReplGlobal { + override def toString: String = "" + } + } + + /** + * Adds any specified jars to the compile and runtime classpaths. + * + * @note Currently only supports jars, not directories + * @param urls The list of items to add to the compile and runtime classpaths + */ + def addUrlsToClassPath(urls: URL*): Unit = { + new Run // Needed to force initialization of "something" to correctly load Scala classes from jars + urls.foreach(_runtimeClassLoader.addNewUrl) // Add jars/classes to runtime for execution + updateCompilerClassPath(urls: _*) // Add jars/classes to compile time for compiling + } + + protected def updateCompilerClassPath(urls: URL*): Unit = { + require(!global.forMSIL) // Only support JavaPlatform + + val platform = global.platform.asInstanceOf[JavaPlatform] + + val newClassPath = mergeUrlsIntoClassPath(platform, urls: _*) + + // NOTE: Must use reflection until this is exposed/fixed upstream in Scala + val fieldSetter = platform.getClass.getMethods + .find(_.getName.endsWith("currentClassPath_$eq")).get + fieldSetter.invoke(platform, Some(newClassPath)) + + // Reload all jars specified into our compiler + global.invalidateClassPathEntries(urls.map(_.getPath): _*) + } + + protected def mergeUrlsIntoClassPath(platform: JavaPlatform, urls: URL*): MergedClassPath[AbstractFile] = { + // Collect our new jars/directories and add them to the existing set of classpaths + val allClassPaths = ( + platform.classPath.asInstanceOf[MergedClassPath[AbstractFile]].entries ++ + urls.map(url => { + platform.classPath.context.newClassPath( + if (url.getProtocol == "file") { + val f = new File(url.getPath) + if (f.isDirectory) + io.AbstractFile.getDirectory(f) + else + io.AbstractFile.getFile(f) + } else { + io.AbstractFile.getURL(url) + } + ) + }) + ).distinct + + // Combine all of our classpaths (old and new) into one merged classpath + new MergedClassPath(allClassPaths, platform.classPath.context) + } + + /** Parent classloader. Overridable. */ + protected def parentClassLoader: ClassLoader = + SparkHelper.explicitParentLoader(settings).getOrElse( this.getClass.getClassLoader() ) + + /* A single class loader is used for all commands interpreted by this Interpreter. + It would also be possible to create a new class loader for each command + to interpret. The advantages of the current approach are: + + - Expressions are only evaluated one time. This is especially + significant for I/O, e.g. "val x = Console.readLine" + + The main disadvantage is: + + - Objects, classes, and methods cannot be rebound. Instead, definitions + shadow the old ones, and old code objects refer to the old + definitions. + */ + def resetClassLoader() = { + logDebug("Setting new classloader: was " + _classLoader) + _classLoader = null + ensureClassLoader() + } + final def ensureClassLoader() { + if (_classLoader == null) + _classLoader = makeClassLoader() + } + def classLoader: AbstractFileClassLoader = { + ensureClassLoader() + _classLoader + } + private class TranslatingClassLoader(parent: ClassLoader) extends AbstractFileClassLoader(virtualDirectory, parent) { + /** Overridden here to try translating a simple name to the generated + * class name if the original attempt fails. This method is used by + * getResourceAsStream as well as findClass. + */ + override protected def findAbstractFile(name: String): AbstractFile = { + super.findAbstractFile(name) match { + // deadlocks on startup if we try to translate names too early + case null if isInitializeComplete => + generatedName(name) map (x => super.findAbstractFile(x)) orNull + case file => + file + } + } + } + private def makeClassLoader(): AbstractFileClassLoader = + new TranslatingClassLoader(parentClassLoader match { + case null => ScalaClassLoader fromURLs compilerClasspath + case p => + _runtimeClassLoader = new URLClassLoader(compilerClasspath, p) with ExposeAddUrl + _runtimeClassLoader + }) + + def getInterpreterClassLoader() = classLoader + + // Set the current Java "context" class loader to this interpreter's class loader + def setContextClassLoader() = classLoader.setAsContext() + + /** Given a simple repl-defined name, returns the real name of + * the class representing it, e.g. for "Bippy" it may return + * {{{ + * $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bippy + * }}} + */ + def generatedName(simpleName: String): Option[String] = { + if (simpleName endsWith nme.MODULE_SUFFIX_STRING) optFlatName(simpleName.init) map (_ + nme.MODULE_SUFFIX_STRING) + else optFlatName(simpleName) + } + def flatName(id: String) = optFlatName(id) getOrElse id + def optFlatName(id: String) = requestForIdent(id) map (_ fullFlatName id) + + def allDefinedNames = definedNameMap.keys.toList.sorted + def pathToType(id: String): String = pathToName(newTypeName(id)) + def pathToTerm(id: String): String = pathToName(newTermName(id)) + def pathToName(name: Name): String = { + if (definedNameMap contains name) + definedNameMap(name) fullPath name + else name.toString + } + + /** Most recent tree handled which wasn't wholly synthetic. */ + private def mostRecentlyHandledTree: Option[Tree] = { + prevRequests.reverse foreach { req => + req.handlers.reverse foreach { + case x: MemberDefHandler if x.definesValue && !isInternalTermName(x.name) => return Some(x.member) + case _ => () + } + } + None + } + + /** Stubs for work in progress. */ + def handleTypeRedefinition(name: TypeName, old: Request, req: Request) = { + for (t1 <- old.simpleNameOfType(name) ; t2 <- req.simpleNameOfType(name)) { + logDebug("Redefining type '%s'\n %s -> %s".format(name, t1, t2)) + } + } + + def handleTermRedefinition(name: TermName, old: Request, req: Request) = { + for (t1 <- old.compilerTypeOf get name ; t2 <- req.compilerTypeOf get name) { + // Printing the types here has a tendency to cause assertion errors, like + // assertion failed: fatal: has owner value x, but a class owner is required + // so DBG is by-name now to keep it in the family. (It also traps the assertion error, + // but we don't want to unnecessarily risk hosing the compiler's internal state.) + logDebug("Redefining term '%s'\n %s -> %s".format(name, t1, t2)) + } + } + + def recordRequest(req: Request) { + if (req == null || referencedNameMap == null) + return + + prevRequests += req + req.referencedNames foreach (x => referencedNameMap(x) = req) + + // warning about serially defining companions. It'd be easy + // enough to just redefine them together but that may not always + // be what people want so I'm waiting until I can do it better. + for { + name <- req.definedNames filterNot (x => req.definedNames contains x.companionName) + oldReq <- definedNameMap get name.companionName + newSym <- req.definedSymbols get name + oldSym <- oldReq.definedSymbols get name.companionName + if Seq(oldSym, newSym).permutations exists { case Seq(s1, s2) => s1.isClass && s2.isModule } + } { + afterTyper(replwarn(s"warning: previously defined $oldSym is not a companion to $newSym.")) + replwarn("Companions must be defined together; you may wish to use :paste mode for this.") + } + + // Updating the defined name map + req.definedNames foreach { name => + if (definedNameMap contains name) { + if (name.isTypeName) handleTypeRedefinition(name.toTypeName, definedNameMap(name), req) + else handleTermRedefinition(name.toTermName, definedNameMap(name), req) + } + definedNameMap(name) = req + } + } + + def replwarn(msg: => String) { + if (!settings.nowarnings.value) + printMessage(msg) + } + + def isParseable(line: String): Boolean = { + beSilentDuring { + try parse(line) match { + case Some(xs) => xs.nonEmpty // parses as-is + case None => true // incomplete + } + catch { case x: Exception => // crashed the compiler + replwarn("Exception in isParseable(\"" + line + "\"): " + x) + false + } + } + } + + def compileSourcesKeepingRun(sources: SourceFile*) = { + val run = new Run() + reporter.reset() + run compileSources sources.toList + (!reporter.hasErrors, run) + } + + /** Compile an nsc SourceFile. Returns true if there are + * no compilation errors, or false otherwise. + */ + def compileSources(sources: SourceFile*): Boolean = + compileSourcesKeepingRun(sources: _*)._1 + + /** Compile a string. Returns true if there are no + * compilation errors, or false otherwise. + */ + def compileString(code: String): Boolean = + compileSources(new BatchSourceFile("