diff options
author | hongshen <shenh062326@126.com> | 2016-08-12 09:58:02 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-12 09:58:02 +0100 |
commit | 993923c8f5ca719daf905285738b7fdcaf944d8c (patch) | |
tree | c215c4beeda67201b8ded63c4e669d1718a6fd10 | |
parent | 00e103a6edd1a1f001a94d41dd1f7acc40a1e30f (diff) | |
download | spark-993923c8f5ca719daf905285738b7fdcaf944d8c.tar.gz spark-993923c8f5ca719daf905285738b7fdcaf944d8c.tar.bz2 spark-993923c8f5ca719daf905285738b7fdcaf944d8c.zip |
[SPARK-16985] Change dataFormat from yyyyMMddHHmm to yyyyMMddHHmmss
## What changes were proposed in this pull request?
In our cluster, sometimes the sql output maybe overrided. When I submit some sql, all insert into the same table, and the sql will cost less one minute, here is the detail,
1 sql1, 11:03 insert into table.
2 sql2, 11:04:11 insert into table.
3 sql3, 11:04:48 insert into table.
4 sql4, 11:05 insert into table.
5 sql5, 11:06 insert into table.
The sql3's output file will override the sql2's output file. here is the log:
```
16/05/04 11:04:11 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_1204544348/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1
16/05/04 11:04:48 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_212180468/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1
```
The reason is the output file use SimpleDateFormat("yyyyMMddHHmm"), if two sql insert into the same table in the same minute, the output will be overrite. I think we should change dateFormat to "yyyyMMddHHmmss", in our cluster, we can't finished a sql in one second.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: hongshen <shenh062326@126.com>
Closes #14574 from shenh062326/SPARK-16985.
4 files changed, 5 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 17daac173c..6550d703bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -67,7 +67,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { def setup(jobid: Int, splitid: Int, attemptid: Int) { setIDs(jobid, splitid, attemptid) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now), + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss").format(now), jobid, splitID, attemptID, conf.value) } @@ -162,7 +162,7 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable { private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val formatter = new SimpleDateFormat("yyyyMMddHHmmss") val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 99afe0250c..fd3a14bd48 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -241,7 +241,7 @@ class HadoopRDD[K, V]( var reader: RecordReader[K, V] = null val inputFormat = getInputFormat(jobConf) - HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmmss").format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index b086baa084..be919e6587 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -77,7 +77,7 @@ class NewHadoopRDD[K, V]( // private val serializableConf = new SerializableWritable(_conf) private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val formatter = new SimpleDateFormat("yyyyMMddHHmmss") formatter.format(new Date()) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 104e0cb371..7d6a8805bc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1079,7 +1079,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = NewAPIHadoopJob.getInstance(hadoopConf) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val formatter = new SimpleDateFormat("yyyyMMddHHmmss") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val jobConfiguration = job.getConfiguration |