From 17275fa99c670537c52843df405279a52b5c9594 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 30 Nov 2015 10:32:13 -0800 Subject: [SPARK-11700] [SQL] Remove thread local SQLContext in SparkPlan In 1.6, we introduce a public API to have a SQLContext for current thread, SparkPlan should use that. Author: Davies Liu Closes #9990 from davies/leak_context. --- .../src/main/scala/org/apache/spark/sql/SQLContext.scala | 10 +++++----- .../org/apache/spark/sql/execution/QueryExecution.scala | 3 +-- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 14 ++++---------- .../scala/org/apache/spark/sql/MultiSQLContextsSuite.scala | 2 +- .../spark/sql/execution/ExchangeCoordinatorSuite.scala | 2 +- .../spark/sql/execution/RowFormatConvertersSuite.scala | 4 ++-- 6 files changed, 14 insertions(+), 21 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c2ac5f6f1..8d27839525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,7 +26,6 @@ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.spark.{SparkException, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD @@ -45,9 +44,10 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.{execution => sparkexecution} import org.apache.spark.sql.util.ExecutionListenerManager +import org.apache.spark.sql.{execution => sparkexecution} import org.apache.spark.util.Utils +import org.apache.spark.{SparkContext, SparkException} /** * The entry point for working with structured data (rows and columns) in Spark. Allows the @@ -401,7 +401,7 @@ class SQLContext private[sql]( */ @Experimental def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { - SparkPlan.currentContext.set(self) + SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) @@ -417,7 +417,7 @@ class SQLContext private[sql]( */ @Experimental def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = { - SparkPlan.currentContext.set(self) + SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes DataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) @@ -1334,7 +1334,7 @@ object SQLContext { activeContext.remove() } - private[sql] def getActiveContextOption(): Option[SQLContext] = { + private[sql] def getActive(): Option[SQLContext] = { Option(activeContext.get()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5da5aea17e..107570f9db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -42,9 +42,8 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val optimizedPlan: LogicalPlan = sqlContext.optimizer.execute(withCachedData) - // TODO: Don't just pick the first one... lazy val sparkPlan: SparkPlan = { - SparkPlan.currentContext.set(sqlContext) + SQLContext.setActive(sqlContext) sqlContext.planner.plan(optimizedPlan).next() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 534a3bcb83..507641ff82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -23,21 +23,15 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging import org.apache.spark.rdd.{RDD, RDDOperationScope} -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric} import org.apache.spark.sql.types.DataType -object SparkPlan { - protected[sql] val currentContext = new ThreadLocal[SQLContext]() -} - /** * The base class for physical operators. */ @@ -49,7 +43,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ * populated by the query planning infrastructure. */ @transient - protected[spark] final val sqlContext = SparkPlan.currentContext.get() + protected[spark] final val sqlContext = SQLContext.getActive().get protected def sparkContext = sqlContext.sparkContext @@ -69,7 +63,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** Overridden make copy also propogates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { - SparkPlan.currentContext.set(sqlContext) + SQLContext.setActive(sqlContext) super.makeCopy(newArgs) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala index 34c5c68fd1..162c0b56c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala @@ -27,7 +27,7 @@ class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll { private var sparkConf: SparkConf = _ override protected def beforeAll(): Unit = { - originalActiveSQLContext = SQLContext.getActiveContextOption() + originalActiveSQLContext = SQLContext.getActive() originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption() SQLContext.clearActive() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index b96d50a70b..180050bdac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -30,7 +30,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { private var originalInstantiatedSQLContext: Option[SQLContext] = _ override protected def beforeAll(): Unit = { - originalActiveSQLContext = SQLContext.getActiveContextOption() + originalActiveSQLContext = SQLContext.getActive() originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption() SQLContext.clearActive() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 6876ab0f02..13d68a103a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, Literal, IsNull} import org.apache.spark.sql.catalyst.util.GenericArrayData @@ -94,7 +94,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { } test("SPARK-9683: copy UTF8String when convert unsafe array/map to safe") { - SparkPlan.currentContext.set(sqlContext) + SQLContext.setActive(sqlContext) val schema = ArrayType(StringType) val rows = (1 to 100).map { i => InternalRow(new GenericArrayData(Array[Any](UTF8String.fromString(i.toString)))) -- cgit v1.2.3