aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorZongheng Yang <zongheng.y@gmail.com>2014-07-15 17:58:28 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-15 17:58:39 -0700
commit9c5f096ab26edc4bbb977f1c58ddc3a1eec47930 (patch)
tree20cd2d1f8389486df98db0108ed7c25a88c334cf /sql/catalyst
parent8da0fd8691358920138bf90fc4340d55c97a930e (diff)
downloadspark-9c5f096ab26edc4bbb977f1c58ddc3a1eec47930.tar.gz
spark-9c5f096ab26edc4bbb977f1c58ddc3a1eec47930.tar.bz2
spark-9c5f096ab26edc4bbb977f1c58ddc3a1eec47930.zip
[SPARK-2498] [SQL] Synchronize on a lock when using scala reflection inside data type objects.
JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2498 Author: Zongheng Yang <zongheng.y@gmail.com> Closes #1423 from concretevitamin/scala-ref-catalyst and squashes the following commits: 325a149 [Zongheng Yang] Synchronize on a lock when initializing data type objects in Catalyst. (cherry picked from commit c2048a5165b270f5baf2003fdfef7bc6c5875715) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala34
1 files changed, 19 insertions, 15 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index bb77bccf86..cd4b5e9c1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.catalyst.types
import java.sql.Timestamp
-import scala.util.parsing.combinator.RegexParsers
-
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
+import scala.util.parsing.combinator.RegexParsers
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.util.Utils
/**
- *
+ * A JVM-global lock that should be used to prevent thread safety issues when using things in
+ * scala.reflect.*. Note that Scala Reflection API is made thread-safe in 2.11, but not yet for
+ * 2.10.* builds. See SI-6240 for more details.
*/
+protected[catalyst] object ScalaReflectionLock
+
object DataType extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"StringType" ^^^ StringType |
@@ -62,7 +65,6 @@ object DataType extends RegexParsers {
"true" ^^^ true |
"false" ^^^ false
-
protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
@@ -106,7 +108,7 @@ abstract class NativeType extends DataType {
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
- @transient val classTag = {
+ @transient val classTag = ScalaReflectionLock.synchronized {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
@@ -114,22 +116,24 @@ abstract class NativeType extends DataType {
case object StringType extends NativeType with PrimitiveType {
type JvmType = String
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val ordering = implicitly[Ordering[JvmType]]
}
+
case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
+
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val ordering = implicitly[Ordering[JvmType]]
}
case object TimestampType extends NativeType {
type JvmType = Timestamp
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
@@ -159,7 +163,7 @@ abstract class IntegralType extends NumericType {
case object LongType extends IntegralType {
type JvmType = Long
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Long]]
val integral = implicitly[Integral[Long]]
val ordering = implicitly[Ordering[JvmType]]
@@ -167,7 +171,7 @@ case object LongType extends IntegralType {
case object IntegerType extends IntegralType {
type JvmType = Int
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Int]]
val integral = implicitly[Integral[Int]]
val ordering = implicitly[Ordering[JvmType]]
@@ -175,7 +179,7 @@ case object IntegerType extends IntegralType {
case object ShortType extends IntegralType {
type JvmType = Short
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Short]]
val integral = implicitly[Integral[Short]]
val ordering = implicitly[Ordering[JvmType]]
@@ -183,7 +187,7 @@ case object ShortType extends IntegralType {
case object ByteType extends IntegralType {
type JvmType = Byte
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Byte]]
val integral = implicitly[Integral[Byte]]
val ordering = implicitly[Ordering[JvmType]]
@@ -202,7 +206,7 @@ abstract class FractionalType extends NumericType {
case object DecimalType extends FractionalType {
type JvmType = BigDecimal
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[BigDecimal]]
val fractional = implicitly[Fractional[BigDecimal]]
val ordering = implicitly[Ordering[JvmType]]
@@ -210,7 +214,7 @@ case object DecimalType extends FractionalType {
case object DoubleType extends FractionalType {
type JvmType = Double
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Double]]
val fractional = implicitly[Fractional[Double]]
val ordering = implicitly[Ordering[JvmType]]
@@ -218,7 +222,7 @@ case object DoubleType extends FractionalType {
case object FloatType extends FractionalType {
type JvmType = Float
- @transient lazy val tag = typeTag[JvmType]
+ @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
val numeric = implicitly[Numeric[Float]]
val fractional = implicitly[Fractional[Float]]
val ordering = implicitly[Ordering[JvmType]]