aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala7
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala41
2 files changed, 46 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 02cb2d9a2b..c12b5c20ea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -32,8 +32,10 @@ object ScalaReflection extends ScalaReflection {
// Since we are creating a runtime mirror usign the class loader of current thread,
// we need to use def at here. So, every time we call mirror, it is using the
// class loader of the current thread.
- override def mirror: universe.Mirror =
+ // SPARK-13640: Synchronize this because universe.runtimeMirror is not thread-safe in Scala 2.10.
+ override def mirror: universe.Mirror = ScalaReflectionLock.synchronized {
universe.runtimeMirror(Thread.currentThread().getContextClassLoader)
+ }
import universe._
@@ -665,7 +667,8 @@ trait ScalaReflection {
*
* @see SPARK-5281
*/
- def localTypeOf[T: TypeTag]: `Type` = {
+ // SPARK-13640: Synchronize this because TypeTag.tpe is not thread-safe in Scala 2.10.
+ def localTypeOf[T: TypeTag]: `Type` = ScalaReflectionLock.synchronized {
val tag = implicitly[TypeTag[T]]
tag.in(mirror).tpe.normalize
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 5fe09b1a4d..dd31050bb5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -17,10 +17,15 @@
package org.apache.spark.sql.catalyst
+import java.net.URLClassLoader
import java.sql.{Date, Timestamp}
+import scala.reflect.runtime.universe.typeOf
+
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
case class PrimitiveData(
intField: Int,
@@ -236,4 +241,40 @@ class ScalaReflectionSuite extends SparkFunSuite {
assert(anyTypes.forall(!_.isPrimitive))
assert(anyTypes === Seq(classOf[java.lang.Object], classOf[java.lang.Object]))
}
+
+ private val dataTypeForComplexData = dataTypeFor[ComplexData]
+ private val typeOfComplexData = typeOf[ComplexData]
+
+ Seq(
+ ("mirror", () => mirror),
+ ("dataTypeFor", () => dataTypeFor[ComplexData]),
+ ("constructorFor", () => constructorFor[ComplexData]),
+ ("extractorsFor", {
+ val inputObject = BoundReference(0, dataTypeForComplexData, nullable = false)
+ () => extractorsFor[ComplexData](inputObject)
+ }),
+ ("getConstructorParameters(cls)", () => getConstructorParameters(classOf[ComplexData])),
+ ("getConstructorParameterNames", () => getConstructorParameterNames(classOf[ComplexData])),
+ ("getClassFromType", () => getClassFromType(typeOfComplexData)),
+ ("schemaFor", () => schemaFor[ComplexData]),
+ ("localTypeOf", () => localTypeOf[ComplexData]),
+ ("getClassNameFromType", () => getClassNameFromType(typeOfComplexData)),
+ ("getParameterTypes", () => getParameterTypes(() => ())),
+ ("getConstructorParameters(tpe)", () => getClassNameFromType(typeOfComplexData))).foreach {
+ case (name, exec) =>
+ test(s"SPARK-13640: thread safety of ${name}") {
+ (0 until 100).foreach { _ =>
+ val loader = new URLClassLoader(Array.empty, Utils.getContextOrSparkClassLoader)
+ (0 until 10).par.foreach { _ =>
+ val cl = Thread.currentThread.getContextClassLoader
+ try {
+ Thread.currentThread.setContextClassLoader(loader)
+ exec()
+ } finally {
+ Thread.currentThread.setContextClassLoader(cl)
+ }
+ }
+ }
+ }
+ }
}