aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-12-31 19:40:28 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-31 19:40:28 +0800
commit35e974076dcbc5afde8d4259ce88cb5f29d94920 (patch)
tree560187c6490cba8939040cf8b750aeac7ec8b0c2
parent871f6114ac0075a1b45eda8701113fa20d647de9 (diff)
downloadspark-35e974076dcbc5afde8d4259ce88cb5f29d94920.tar.gz
spark-35e974076dcbc5afde8d4259ce88cb5f29d94920.tar.bz2
spark-35e974076dcbc5afde8d4259ce88cb5f29d94920.zip
[SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCatalog
### What changes were proposed in this pull request? Fixed non-thread-safe functions used in SessionCatalog: - refreshTable - lookupRelation ### How was this patch tested? N/A Author: gatorsmile <gatorsmile@gmail.com> Closes #16437 from gatorsmile/addSyncToLookUpTable.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala36
2 files changed, 20 insertions, 18 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e996a836fe..741ed05b63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -634,7 +634,7 @@ class SessionCatalog(
/**
* Refresh the cache entry for a metastore table, if any.
*/
- def refreshTable(name: TableIdentifier): Unit = {
+ def refreshTable(name: TableIdentifier): Unit = synchronized {
// Go through temporary tables and invalidate them.
// If the database is defined, this is definitely not a temp table.
// If the database is not defined, there is a good chance this is a temp table.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 08bf1cd0ef..462b3c2686 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -56,23 +56,25 @@ private[sql] class HiveSessionCatalog(
hadoopConf) {
override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = {
- val table = formatTableName(name.table)
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- if (db == globalTempViewManager.database) {
- val relationAlias = alias.getOrElse(table)
- globalTempViewManager.get(table).map { viewDef =>
- SubqueryAlias(relationAlias, viewDef, Some(name))
- }.getOrElse(throw new NoSuchTableException(db, table))
- } else if (name.database.isDefined || !tempTables.contains(table)) {
- val database = name.database.map(formatDatabaseName)
- val newName = name.copy(database = database, table = table)
- metastoreCatalog.lookupRelation(newName, alias)
- } else {
- val relation = tempTables(table)
- val tableWithQualifiers = SubqueryAlias(table, relation, None)
- // If an alias was specified by the lookup, wrap the plan in a subquery so that
- // attributes are properly qualified with this alias.
- alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
+ synchronized {
+ val table = formatTableName(name.table)
+ val db = formatDatabaseName(name.database.getOrElse(currentDb))
+ if (db == globalTempViewManager.database) {
+ val relationAlias = alias.getOrElse(table)
+ globalTempViewManager.get(table).map { viewDef =>
+ SubqueryAlias(relationAlias, viewDef, Some(name))
+ }.getOrElse(throw new NoSuchTableException(db, table))
+ } else if (name.database.isDefined || !tempTables.contains(table)) {
+ val database = name.database.map(formatDatabaseName)
+ val newName = name.copy(database = database, table = table)
+ metastoreCatalog.lookupRelation(newName, alias)
+ } else {
+ val relation = tempTables(table)
+ val tableWithQualifiers = SubqueryAlias(table, relation, None)
+ // If an alias was specified by the lookup, wrap the plan in a subquery so that
+ // attributes are properly qualified with this alias.
+ alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
+ }
}
}