aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-30 10:32:13 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-30 10:32:13 -0800
commit17275fa99c670537c52843df405279a52b5c9594 (patch)
tree1da9652c8a2eaa71a502c01bd729b22743b5dfb3
parent2db4662fe2d72749c06ad5f11f641a388343f77c (diff)
downloadspark-17275fa99c670537c52843df405279a52b5c9594.tar.gz
spark-17275fa99c670537c52843df405279a52b5c9594.tar.bz2
spark-17275fa99c670537c52843df405279a52b5c9594.zip
[SPARK-11700] [SQL] Remove thread local SQLContext in SparkPlan
In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should use that. Author: Davies Liu <davies@databricks.com> Closes #9990 from davies/leak_context.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala4
6 files changed, 14 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c2ac5f6f1..8d27839525 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,6 @@ import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import org.apache.spark.{SparkException, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
@@ -45,9 +44,10 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.sql.util.ExecutionListenerManager
+import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkException}
/**
* The entry point for working with structured data (rows and columns) in Spark. Allows the
@@ -401,7 +401,7 @@ class SQLContext private[sql](
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
- SparkPlan.currentContext.set(self)
+ SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
@@ -417,7 +417,7 @@ class SQLContext private[sql](
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
- SparkPlan.currentContext.set(self)
+ SQLContext.setActive(self)
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
val attributeSeq = schema.toAttributes
DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
@@ -1334,7 +1334,7 @@ object SQLContext {
activeContext.remove()
}
- private[sql] def getActiveContextOption(): Option[SQLContext] = {
+ private[sql] def getActive(): Option[SQLContext] = {
Option(activeContext.get())
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5da5aea17e..107570f9db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -42,9 +42,8 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData)
- // TODO: Don't just pick the first one...
lazy val sparkPlan: SparkPlan = {
- SparkPlan.currentContext.set(sqlContext)
+ SQLContext.setActive(sqlContext)
sqlContext.planner.plan(optimizedPlan).next()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 534a3bcb83..507641ff82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -23,21 +23,15 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
import org.apache.spark.rdd.{RDD, RDDOperationScope}
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric}
import org.apache.spark.sql.types.DataType
-object SparkPlan {
- protected[sql] val currentContext = new ThreadLocal[SQLContext]()
-}
-
/**
* The base class for physical operators.
*/
@@ -49,7 +43,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
* populated by the query planning infrastructure.
*/
@transient
- protected[spark] final val sqlContext = SparkPlan.currentContext.get()
+ protected[spark] final val sqlContext = SQLContext.getActive().get
protected def sparkContext = sqlContext.sparkContext
@@ -69,7 +63,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Overridden make copy also propogates sqlContext to copied plan. */
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
- SparkPlan.currentContext.set(sqlContext)
+ SQLContext.setActive(sqlContext)
super.makeCopy(newArgs)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
index 34c5c68fd1..162c0b56c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
@@ -27,7 +27,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
private var sparkConf: SparkConf = _
override protected def beforeAll(): Unit = {
- originalActiveSQLContext = SQLContext.getActiveContextOption()
+ originalActiveSQLContext = SQLContext.getActive()
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
SQLContext.clearActive()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
index b96d50a70b..180050bdac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
@@ -30,7 +30,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
private var originalInstantiatedSQLContext: Option[SQLContext] = _
override protected def beforeAll(): Unit = {
- originalActiveSQLContext = SQLContext.getActiveContextOption()
+ originalActiveSQLContext = SQLContext.getActive()
originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
SQLContext.clearActive()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 6876ab0f02..13d68a103a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull}
import org.apache.spark.sql.catalyst.util.GenericArrayData
@@ -94,7 +94,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
}
test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") {
- SparkPlan.currentContext.set(sqlContext)
+ SQLContext.setActive(sqlContext)
val schema = ArrayType(StringType)
val rows = (1 to 100).map { i =>
InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString))))