aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornavis.ryu <navis@apache.org>2015-06-09 19:33:00 -0700
committerReynold Xin <rxin@databricks.com>2015-06-09 19:33:00 -0700
commit778f3ca81f8d90faec0775509632fe68f1399dc4 (patch)
treea502582ffa855967a578d05b0538bb1b1fa60c84
parent6e4fb0c9e8f03cf068c422777cfce82a89e8e738 (diff)
downloadspark-778f3ca81f8d90faec0775509632fe68f1399dc4.tar.gz
spark-778f3ca81f8d90faec0775509632fe68f1399dc4.tar.bz2
spark-778f3ca81f8d90faec0775509632fe68f1399dc4.zip
[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
Just replaced mutable.HashMap to ConcurrentHashMap Author: navis.ryu <navis@apache.org> Closes #6699 from navis/SPARK-7792 and squashes the following commits: f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala28
1 files changed, 17 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 3e240fd55e..1541491608 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -17,7 +17,11 @@
package org.apache.spark.sql.catalyst.analysis
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConversions._
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
}
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
- val tables = new mutable.HashMap[String, LogicalPlan]()
+ val tables = new ConcurrentHashMap[String, LogicalPlan]
override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables += ((getDbTableName(tableIdent), plan))
+ tables.put(getDbTableName(tableIdent), plan)
}
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables -= getDbTableName(tableIdent)
+ tables.remove(getDbTableName(tableIdent))
}
override def unregisterAllTables(): Unit = {
@@ -101,10 +105,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
- tables.get(getDbTableName(tableIdent)) match {
- case Some(_) => true
- case None => false
- }
+ tables.containsKey(getDbTableName(tableIdent))
}
override def lookupRelation(
@@ -112,7 +113,10 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
- val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
+ val table = tables.get(tableFullName)
+ if (table == null) {
+ sys.error(s"Table Not Found: $tableFullName")
+ }
val tableWithQualifiers = Subquery(tableIdent.last, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
- tables.map {
- case (name, _) => (name, true)
- }.toSeq
+ val result = ArrayBuffer.empty[(String, Boolean)]
+ for (name <- tables.keySet()) {
+ result += ((name, true))
+ }
+ result
}
override def refreshTable(databaseName: String, tableName: String): Unit = {