diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
commit | 4106ae9fbf8a582697deba2198b3b966dec00bfe (patch) | |
tree | 7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /repl | |
parent | e0dd24dc858777904335218f3001a24bffe73b27 (diff) | |
parent | 5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff) | |
download | spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2 spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip |
Merged with master
Diffstat (limited to 'repl')
18 files changed, 369 insertions, 372 deletions
diff --git a/repl/pom.xml b/repl/pom.xml index d618f176f7..faabb0e1ec 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -1,18 +1,35 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>org.spark-project</groupId> + <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>org.spark-project</groupId> + <groupId>org.apache.spark</groupId> <artifactId>spark-repl</artifactId> <packaging>jar</packaging> <name>Spark Project REPL</name> - <url>http://spark-project.org/</url> + <url>http://spark.incubator.apache.org/</url> <properties> <deb.install.path>/usr/share/spark</deb.install.path> @@ -21,6 +38,23 @@ <dependencies> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-bagel</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> @@ -42,7 +76,6 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -59,192 +92,57 @@ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>test</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <exportAntProperties>true</exportAntProperties> + <tasks> + <property name="spark.classpath" refid="maven.test.classpath"/> + <property environment="env"/> + <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> + <condition> + <not> + <or> + <isset property="env.SCALA_HOME"/> + <isset property="env.SCALA_LIBRARY_PATH"/> + </or> + </not> + </condition> + </fail> + </tasks> + </configuration> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.scalatest</groupId> <artifactId>scalatest-maven-plugin</artifactId> <configuration> <environmentVariables> <SPARK_HOME>${basedir}/..</SPARK_HOME> <SPARK_TESTING>1</SPARK_TESTING> + <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH> </environmentVariables> </configuration> </plugin> </plugins> </build> - <profiles> <profile> - <id>hadoop1</id> - <properties> - <classifier>hadoop1</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <properties> - <classifier>hadoop2</classifier> - </properties> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> <id>hadoop2-yarn</id> - <properties> - <classifier>hadoop2-yarn</classifier> - </properties> <dependencies> <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-bagel</artifactId> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn</artifactId> <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-examples</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-streaming</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> </dependency> </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> </profile> </profiles> </project> diff --git a/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 0e9aa863b5..3e171849e3 100644 --- a/repl/src/main/scala/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -1,4 +1,21 @@ -package spark.repl +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl import java.io.{ByteArrayOutputStream, InputStream} import java.net.{URI, URL, URLClassLoader, URLEncoder} diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000000..14b448d076 --- /dev/null +++ b/repl/src/main/scala/org/apache/spark/repl/Main.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.collection.mutable.Set + +object Main { + private var _interp: SparkILoop = _ + + def interp = _interp + + def interp_=(i: SparkILoop) { _interp = i } + + def main(args: Array[String]) { + _interp = new SparkILoop + _interp.process(args) + } +} diff --git a/repl/src/main/scala/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala index c55797eb54..85b0978c81 100644 --- a/repl/src/main/scala/spark/repl/SparkExprTyper.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -3,7 +3,7 @@ * @author Paul Phillips */ -package spark.repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ @@ -11,7 +11,8 @@ import scala.tools.nsc.interpreter._ import scala.reflect.internal.util.BatchSourceFile import scala.tools.nsc.ast.parser.Tokens.EOF -import spark.Logging +import org.apache.spark.Logging +import scala.Some trait SparkExprTyper extends Logging { val repl: SparkIMain diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala new file mode 100644 index 0000000000..5340951d91 --- /dev/null +++ b/repl/src/main/scala/org/apache/spark/repl/SparkHelper.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package scala.tools.nsc + +object SparkHelper { + def explicitParentLoader(settings: Settings) = settings.explicitParentLoader +} diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index cc7a63e166..bd132c29bf 100644 --- a/repl/src/main/scala/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -3,7 +3,7 @@ * @author Alexander Spoon */ -package spark.repl +package org.apache.spark.repl import scala.tools.nsc._ @@ -34,8 +34,8 @@ import scala.tools.reflect.StdRuntimeTags._ import java.lang.{Class => jClass} import scala.reflect.api.{Mirror, TypeCreator, Universe => ApiUniverse} -import spark.Logging -import spark.SparkContext +import org.apache.spark.Logging +import org.apache.spark.SparkContext /** The Scala interactive shell. It provides a read-eval-print loop * around the Interpreter class. @@ -872,7 +872,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, case x => x } } - lazy val tagOfSparkIMain = tagOfStaticClass[spark.repl.SparkIMain] + lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain] // Bind intp somewhere out of the regular namespace where // we can get at it in generated code. addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain]))) diff --git a/repl/src/main/scala/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala index b89495d2f3..0405a9dd80 100644 --- a/repl/src/main/scala/spark/repl/SparkILoopInit.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala @@ -3,7 +3,7 @@ * @author Paul Phillips */ -package spark.repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ @@ -117,7 +117,7 @@ trait SparkILoopInit { def initializeSpark() { intp.beQuietDuring { command(""" - @transient val sc = spark.repl.Main.interp.createSparkContext(); + @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext(); """) command("import spark.SparkContext._") } diff --git a/repl/src/main/scala/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index b1977d6788..e1455ef8a1 100644 --- a/repl/src/main/scala/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -3,7 +3,7 @@ * @author Martin Odersky */ -package spark.repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ @@ -34,10 +34,10 @@ import scala.tools.reflect.StdRuntimeTags._ import scala.util.control.ControlThrowable import util.stackTraceString -import spark.HttpServer -import spark.Utils -import spark.SparkEnv -import spark.Logging +import org.apache.spark.HttpServer +import org.apache.spark.util.Utils +import org.apache.spark.SparkEnv +import org.apache.spark.Logging // /** directory to save .class files to */ // private class ReplVirtualDirectory(out: JPrintWriter) extends VirtualDirectory("((memory))", None) { @@ -86,7 +86,6 @@ import spark.Logging class SparkIMain(initialSettings: Settings, val out: JPrintWriter) extends SparkImports with Logging { imain => - val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") /** Local directory to save .class files too */ diff --git a/repl/src/main/scala/spark/repl/SparkImports.scala b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala index 20345aa9ee..a33f07a83e 100644 --- a/repl/src/main/scala/spark/repl/SparkImports.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkImports.scala @@ -3,8 +3,7 @@ * @author Paul Phillips */ -package spark -package repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ diff --git a/repl/src/main/scala/spark/repl/SparkJLineCompletion.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala index 0e1170688d..8865f82bc0 100644 --- a/repl/src/main/scala/spark/repl/SparkJLineCompletion.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineCompletion.scala @@ -3,9 +3,7 @@ * @author Paul Phillips */ -package spark -package repl - +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ @@ -14,7 +12,7 @@ import scala.tools.jline._ import scala.tools.jline.console.completer._ import Completion._ import scala.collection.mutable.ListBuffer -import spark.Logging +import org.apache.spark.Logging // REPL completor - queries supplied interpreter for valid // completions based on current contents of buffer. diff --git a/repl/src/main/scala/spark/repl/SparkJLineReader.scala b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala index 00aca60f34..60a4d7841e 100644 --- a/repl/src/main/scala/spark/repl/SparkJLineReader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkJLineReader.scala @@ -3,7 +3,7 @@ * @author Stepan Koltsov */ -package spark.repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ diff --git a/repl/src/main/scala/spark/repl/SparkMemberHandlers.scala b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala index 2d894a0356..382f8360a7 100644 --- a/repl/src/main/scala/spark/repl/SparkMemberHandlers.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkMemberHandlers.scala @@ -3,8 +3,7 @@ * @author Martin Odersky */ -package spark -package repl +package org.apache.spark.repl import scala.tools.nsc._ import scala.tools.nsc.interpreter._ diff --git a/repl/src/main/scala/spark/repl/Main.scala b/repl/src/main/scala/spark/repl/Main.scala deleted file mode 100644 index b0a78e0bb8..0000000000 --- a/repl/src/main/scala/spark/repl/Main.scala +++ /dev/null @@ -1,16 +0,0 @@ -package spark.repl - -import scala.collection.mutable.Set - -object Main { - private var _interp: SparkILoop = _ - - def interp = _interp - - def interp_=(i: SparkILoop) { _interp = i } - - def main(args: Array[String]) { - _interp = new SparkILoop - _interp.process(args) - } -} diff --git a/repl/src/main/scala/spark/repl/SparkHelper.scala b/repl/src/main/scala/spark/repl/SparkHelper.scala deleted file mode 100644 index d8fb7191b4..0000000000 --- a/repl/src/main/scala/spark/repl/SparkHelper.scala +++ /dev/null @@ -1,5 +0,0 @@ -package scala.tools.nsc - -object SparkHelper { - def explicitParentLoader(settings: Settings) = settings.explicitParentLoader -} diff --git a/repl/src/test/resources/log4j.properties b/repl/src/test/resources/log4j.properties index cfb1a390e6..a6d33e69d2 100644 --- a/repl/src/test/resources/log4j.properties +++ b/repl/src/test/resources/log4j.properties @@ -1,4 +1,21 @@ -# Set everything to be logged to the repl/target/unit-tests.log +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the repl/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala new file mode 100644 index 0000000000..b06999a42c --- /dev/null +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -0,0 +1,189 @@ +package org.apache.spark.repl + +import java.io._ +import java.net.URLClassLoader + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.FunSuite +import com.google.common.io.Files + +class ReplSuite extends FunSuite { + def runInterpreter(master: String, input: String): String = { + val in = new BufferedReader(new StringReader(input + "\n")) + val out = new StringWriter() + val cl = getClass.getClassLoader + var paths = new ArrayBuffer[String] + if (cl.isInstanceOf[URLClassLoader]) { + val urlLoader = cl.asInstanceOf[URLClassLoader] + for (url <- urlLoader.getURLs) { + if (url.getProtocol == "file") { + paths += url.getFile + } + } + } + val interp = new SparkILoop(in, new PrintWriter(out), master) + org.apache.spark.repl.Main.interp = interp + val separator = System.getProperty("path.separator") + interp.process(Array("-classpath", paths.mkString(separator))) + org.apache.spark.repl.Main.interp = null + if (interp.sparkContext != null) { + interp.sparkContext.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + System.clearProperty("spark.hostPort") + return out.toString + } + + def assertContains(message: String, output: String) { + assert(output.contains(message), + "Interpreter output did not contain '" + message + "':\n" + output) + } + + def assertDoesNotContain(message: String, output: String) { + assert(!output.contains(message), + "Interpreter output contained '" + message + "':\n" + output) + } + + test("simple foreach with accumulator") { + val output = runInterpreter("local", """ + |val accum = sc.accumulator(0) + |sc.parallelize(1 to 10).foreach(x => accum += x) + |accum.value + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res1: Int = 55", output) + } + + test("external vars") { + val output = runInterpreter("local", """ + |var v = 7 + |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + } + + test("external classes") { + val output = runInterpreter("local", """ + |class C { + |def foo = 5 + |} + |sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_) + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 50", output) + } + + test("external functions") { + val output = runInterpreter("local", """ + |def double(x: Int) = x + x + |sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 110", output) + } + + test("external functions that access vars") { + val output = runInterpreter("local", """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + } + + test("broadcast vars") { + // Test that the value that a broadcast var had when it was created is used, + // even if that variable is then modified in the driver program + // TODO: This doesn't actually work for arrays when we run in local mode! + val output = runInterpreter("local", """ + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) + } + + test("interacting with files") { + val tempDir = Files.createTempDir() + val out = new FileWriter(tempDir + "/input") + out.write("Hello world!\n") + out.write("What's up?\n") + out.write("Goodbye\n") + out.close() + val output = runInterpreter("local", """ + |var file = sc.textFile("%s/input").cache() + |file.count() + |file.count() + |file.count() + """.stripMargin.format(tempDir.getAbsolutePath)) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Long = 3", output) + assertContains("res1: Long = 3", output) + assertContains("res2: Long = 3", output) + } + + test("local-cluster mode") { + val output = runInterpreter("local-cluster[1,1,512]", """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + + if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { + test("running on Mesos") { + val output = runInterpreter("localquiet", """ + |var v = 7 + |def getV() = v + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |v = 10 + |sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) + |var array = new Array[Int](5) + |val broadcastArray = sc.broadcast(array) + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + |array(0) = 5 + |sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect + """) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains("res0: Int = 70", output) + assertContains("res1: Int = 100", output) + assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) + assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) + } + } +} diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuiteMixin.scala index d88e44ad19..ccfbf5193a 100644 --- a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuiteMixin.scala @@ -1,4 +1,4 @@ -package spark.repl +package org.apache.spark.repl import java.io.BufferedReader import java.io.PrintWriter @@ -10,8 +10,6 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.future -import spark.deploy.master.Master -import spark.deploy.worker.Worker trait ReplSuiteMixin { def runInterpreter(master: String, input: String): String = { diff --git a/repl/src/test/scala/spark/repl/ReplSuite.scala b/repl/src/test/scala/spark/repl/ReplSuite.scala deleted file mode 100644 index 8df0c3a7f4..0000000000 --- a/repl/src/test/scala/spark/repl/ReplSuite.scala +++ /dev/null @@ -1,152 +0,0 @@ -package spark.repl - -import java.io.FileWriter - -import org.scalatest.FunSuite - -import com.google.common.io.Files - -class ReplSuite extends FunSuite with ReplSuiteMixin { - - test("simple foreach with accumulator") { - val output = runInterpreter("local", """ - val accum = sc.accumulator(0) - sc.parallelize(1 to 10).foreach(x => accum += x) - accum.value - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res1: Int = 55", output) - } - - test ("external vars") { - val output = runInterpreter("local", """ - var v = 7 - sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) - v = 10 - sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_) - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 70", output) - assertContains("res1: Int = 100", output) - } - - test("external classes") { - val output = runInterpreter("local", """ - class C { - def foo = 5 - } - sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_) - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 50", output) - } - - test("external functions") { - val output = runInterpreter("local", """ - def double(x: Int) = x + x - sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_) - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 110", output) - } - - test("external functions that access vars") { - val output = runInterpreter("local", """ - var v = 7 - def getV() = v - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - v = 10 - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 70", output) - assertContains("res1: Int = 100", output) - } - - test("broadcast vars") { - // Test that the value that a broadcast var had when it was created is used, - // even if that variable is then modified in the driver program - // TODO: This doesn't actually work for arrays when we run in local mode! - val output = runInterpreter("local", """ - var array = new Array[Int](5) - val broadcastArray = sc.broadcast(array) - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - array(0) = 5 - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output) - } - - test("interacting with files") { - val tempDir = Files.createTempDir() - val out = new FileWriter(tempDir + "/input") - out.write("Hello world!\n") - out.write("What's up?\n") - out.write("Goodbye\n") - out.close() - val output = runInterpreter("local", """ - var file = sc.textFile("%s/input").cache() - file.count() - file.count() - file.count() - """.format(tempDir.getAbsolutePath)) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Long = 3", output) - assertContains("res1: Long = 3", output) - assertContains("res2: Long = 3", output) - } - - test ("local-cluster mode") { - val output = runInterpreter("local-cluster[1,1,512]", """ - var v = 7 - def getV() = v - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - v = 10 - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - var array = new Array[Int](5) - val broadcastArray = sc.broadcast(array) - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - array(0) = 5 - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 70", output) - assertContains("res1: Int = 100", output) - assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) - } - - if (System.getenv("MESOS_NATIVE_LIBRARY") != null) { - test("running on Mesos") { - val output = runInterpreter("localquiet", """ - var v = 7 - def getV() = v - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - v = 10 - sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_) - var array = new Array[Int](5) - val broadcastArray = sc.broadcast(array) - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - array(0) = 5 - sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect - """) - assertDoesNotContain("error:", output) - assertDoesNotContain("Exception", output) - assertContains("res0: Int = 70", output) - assertContains("res1: Int = 100", output) - assertContains("res2: Array[Int] = Array(0, 0, 0, 0, 0)", output) - assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output) - } - } - -} |