aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala31
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala4
2 files changed, 30 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index e35468a624..69f481c49a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.hive
+import java.util
+
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable, Hive, HiveUtils, HiveStorageHandler}
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
@@ -287,6 +289,29 @@ class HadoopTableReader(
}
}
+private[hive] object HiveTableUtil {
+
+ // copied from PlanUtils.configureJobPropertiesForStorageHandler(tableDesc)
+ // that calls Hive.get() which tries to access metastore, but it's not valid in runtime
+ // it would be fixed in next version of hive but till then, we should use this instead
+ def configureJobPropertiesForStorageHandler(
+ tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
+ val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
+ val storageHandler = HiveUtils.getStorageHandler(jobConf, property)
+ if (storageHandler != null) {
+ val jobProperties = new util.LinkedHashMap[String, String]
+ if (input) {
+ storageHandler.configureInputJobProperties(tableDesc, jobProperties)
+ } else {
+ storageHandler.configureOutputJobProperties(tableDesc, jobProperties)
+ }
+ if (!jobProperties.isEmpty) {
+ tableDesc.setJobProperties(jobProperties)
+ }
+ }
+ }
+}
+
private[hive] object HadoopTableReader extends HiveInspectors with Logging {
/**
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
@@ -295,7 +320,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) {
FileInputFormat.setInputPaths(jobConf, Seq[Path](new Path(path)): _*)
if (tableDesc != null) {
- PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc)
+ HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index c8d6b71804..93c016b6c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils
@@ -55,7 +55,7 @@ private[hive] class SparkHiveWriterContainer(
// Add table properties from storage handler to jobConf, so any custom storage
// handler settings can be set to jobConf
if (tableDesc != null) {
- PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
+ HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, false)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
protected val conf = new SerializableJobConf(jobConf)