aboutsummaryrefslogtreecommitdiff
path: root/repl
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-11-20 15:17:17 -0800
committerMichael Armbrust <michael@databricks.com>2015-11-20 15:17:17 -0800
commit4b84c72dfbb9ddb415fee35f69305b5d7b280891 (patch)
treefc539c382da5480312a8be25afe8b9a922811ded /repl
parenta6239d587c638691f52eca3eee905c53fbf35a12 (diff)
downloadspark-4b84c72dfbb9ddb415fee35f69305b5d7b280891.tar.gz
spark-4b84c72dfbb9ddb415fee35f69305b5d7b280891.tar.bz2
spark-4b84c72dfbb9ddb415fee35f69305b5d7b280891.zip
[SPARK-11636][SQL] Support classes defined in the REPL with Encoders
#theScaryParts (i.e. changes to the repl, executor classloaders and codegen)... Author: Michael Armbrust <michael@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #9825 from marmbrus/dataset-replClasses2.
Diffstat (limited to 'repl')
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala14
-rw-r--r--repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala24
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala8
3 files changed, 41 insertions, 5 deletions
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 4ee605fd7f..829b12269f 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -1221,10 +1221,16 @@ import org.apache.spark.annotation.DeveloperApi
)
}
- val preamble = """
- |class %s extends Serializable {
- | %s%s%s
- """.stripMargin.format(lineRep.readName, envLines.map(" " + _ + ";\n").mkString, importsPreamble, indentCode(toCompute))
+ val preamble = s"""
+ |class ${lineRep.readName} extends Serializable {
+ | ${envLines.map(" " + _ + ";\n").mkString}
+ | $importsPreamble
+ |
+ | // If we need to construct any objects defined in the REPL on an executor we will need
+ | // to pass the outer scope to the appropriate encoder.
+ | org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
+ | ${indentCode(toCompute)}
+ """.stripMargin
val postamble = importsTrailer + "\n}" + "\n" +
"object " + lineRep.readName + " {\n" +
" val INSTANCE = new " + lineRep.readName + "();\n" +
diff --git a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 5674dcd669..081aa03002 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -262,6 +262,9 @@ class ReplSuite extends SparkFunSuite {
|import sqlContext.implicits._
|case class TestCaseClass(value: Int)
|sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF().collect()
+ |
+ |// Test Dataset Serialization in the REPL
+ |Seq(TestCaseClass(1)).toDS().collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
@@ -278,6 +281,27 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("java.lang.ClassNotFoundException", output)
}
+ test("Datasets and encoders") {
+ val output = runInterpreter("local",
+ """
+ |import org.apache.spark.sql.functions._
+ |import org.apache.spark.sql.Encoder
+ |import org.apache.spark.sql.expressions.Aggregator
+ |import org.apache.spark.sql.TypedColumn
+ |val simpleSum = new Aggregator[Int, Int, Int] with Serializable {
+ | def zero: Int = 0 // The initial value.
+ | def reduce(b: Int, a: Int) = b + a // Add an element to the running total
+ | def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
+ | def finish(b: Int) = b // Return the final result.
+ |}.toColumn
+ |
+ |val ds = Seq(1, 2, 3, 4).toDS()
+ |ds.select(simpleSum).collect
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ }
+
test("SPARK-2632 importing a method from non serializable class and not using it.") {
val output = runInterpreter("local",
"""
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 3d2d235a00..a976e96809 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -65,7 +65,13 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader
case e: ClassNotFoundException => {
val classOption = findClassLocally(name)
classOption match {
- case None => throw new ClassNotFoundException(name, e)
+ case None =>
+ // If this class has a cause, it will break the internal assumption of Janino
+ // (the compiler used for Spark SQL code-gen).
+ // See org.codehaus.janino.ClassLoaderIClassLoader's findIClass, you will see
+ // its behavior will be changed if there is a cause and the compilation
+ // of generated class will fail.
+ throw new ClassNotFoundException(name)
case Some(a) => a
}
}