aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authornavis.ryu <navis@apache.org>2015-10-07 14:56:02 -0700
committerReynold Xin <rxin@databricks.com>2015-10-07 14:56:02 -0700
commit713e4f44e92f25f1092c898923f77249934f38b2 (patch)
treec4620d07dc961371f7be1445c6cf4297792dd6ce /sql
parentc14aee4da97d4a7fcc8a2565a159edc74a5c8e10 (diff)
downloadspark-713e4f44e92f25f1092c898923f77249934f38b2.tar.gz
spark-713e4f44e92f25f1092c898923f77249934f38b2.tar.bz2
spark-713e4f44e92f25f1092c898923f77249934f38b2.zip
[SPARK-10679] [CORE] javax.jdo.JDOFatalUserException in executor
HadoopRDD throws exception in executor, something like below. {noformat} 5/09/17 18:51:21 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/09/17 18:51:21 INFO metastore.ObjectStore: ObjectStore, initialize called 15/09/17 18:51:21 WARN metastore.HiveMetaStore: Retrying creating default database after error: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:57) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:66) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72) at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:199) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(SessionHiveMetaStoreClient.java:74) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:86) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024) at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234) at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174) at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166) at org.apache.hadoop.hive.ql.plan.PlanUtils.configureJobPropertiesForStorageHandler(PlanUtils.java:803) at org.apache.hadoop.hive.ql.plan.PlanUtils.configureInputJobPropertiesForStorageHandler(PlanUtils.java:782) at org.apache.spark.sql.hive.HadoopTableReader$.initializeLocalJobConfFunc(TableReader.scala:298) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:274) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$12.apply(TableReader.scala:274) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:176) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:176) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:220) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {noformat} Author: navis.ryu <navis@apache.org> Closes #8804 from navis/SPARK-10679.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala31
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala4
2 files changed, 30 insertions, 5 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index e35468a624..69f481c49a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.hive
+import java.util
+
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable, Hive, HiveUtils, HiveStorageHandler}
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
@@ -287,6 +289,29 @@ class HadoopTableReader(
}
}
+private[hive] object HiveTableUtil {
+
+ // copied from PlanUtils.configureJobPropertiesForStorageHandler(tableDesc)
+ // that calls Hive.get() which tries to access metastore, but it's not valid in runtime
+ // it would be fixed in next version of hive but till then, we should use this instead
+ def configureJobPropertiesForStorageHandler(
+ tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
+ val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
+ val storageHandler = HiveUtils.getStorageHandler(jobConf, property)
+ if (storageHandler != null) {
+ val jobProperties = new util.LinkedHashMap[String, String]
+ if (input) {
+ storageHandler.configureInputJobProperties(tableDesc, jobProperties)
+ } else {
+ storageHandler.configureOutputJobProperties(tableDesc, jobProperties)
+ }
+ if (!jobProperties.isEmpty) {
+ tableDesc.setJobProperties(jobProperties)
+ }
+ }
+ }
+}
+
private[hive] object HadoopTableReader extends HiveInspectors with Logging {
/**
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
@@ -295,7 +320,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
def initializeLocalJobConfFunc(path: String, tableDesc: TableDesc)(jobConf: JobConf) {
FileInputFormat.setInputPaths(jobConf, Seq[Path](new Path(path)): _*)
if (tableDesc != null) {
- PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc)
+ HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index c8d6b71804..93c016b6c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
+import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils
@@ -55,7 +55,7 @@ private[hive] class SparkHiveWriterContainer(
// Add table properties from storage handler to jobConf, so any custom storage
// handler settings can be set to jobConf
if (tableDesc != null) {
- PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
+ HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, false)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
protected val conf = new SerializableJobConf(jobConf)