aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorjiangxingbo <jiangxb1987@gmail.com>2017-03-15 14:58:19 -0700
committerReynold Xin <rxin@databricks.com>2017-03-15 14:58:19 -0700
commit97cc5e5a5555519d221d0ca78645dde9bb8ea40b (patch)
treeb566d427d9430a874b3271d33c1b417c8e42fa91 /core/src/main/scala/org/apache
parent02c274eaba0a8e7611226e0d4e93d3c36253f4ce (diff)
downloadspark-97cc5e5a5555519d221d0ca78645dde9bb8ea40b.tar.gz
spark-97cc5e5a5555519d221d0ca78645dde9bb8ea40b.tar.bz2
spark-97cc5e5a5555519d221d0ca78645dde9bb8ea40b.zip
[SPARK-19960][CORE] Move `SparkHadoopWriter` to `internal/io/`
## What changes were proposed in this pull request? This PR introduces the following changes: 1. Move `SparkHadoopWriter` to `core/internal/io/`, so that it's in the same directory with `SparkHadoopMapReduceWriter`; 2. Move `SparkHadoopWriterUtils` to a separated file. After this PR is merged, we may consolidate `SparkHadoopWriter` and `SparkHadoopMapReduceWriter`, and make the new commit protocol support the old `mapred` package's committer; ## How was this patch tested? Tested by existing test cases. Author: jiangxingbo <jiangxb1987@gmail.com> Closes #17304 from jiangxb1987/writer.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala (renamed from core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala)7
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala93
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala3
4 files changed, 98 insertions, 64 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
index 659ad5d0ba..376ff9bb19 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala
@@ -179,62 +179,3 @@ object SparkHadoopMapReduceWriter extends Logging {
}
}
}
-
-private[spark]
-object SparkHadoopWriterUtils {
-
- private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
-
- def createJobID(time: Date, id: Int): JobID = {
- val jobtrackerID = createJobTrackerID(time)
- new JobID(jobtrackerID, id)
- }
-
- def createJobTrackerID(time: Date): String = {
- new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
- }
-
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- val outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
-
- // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
- // setting can take effect:
- def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
- val validationDisabled = disableOutputSpecValidation.value
- val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
- enabledInConf && !validationDisabled
- }
-
- // TODO: these don't seem like the right abstractions.
- // We should abstract the duplicate code in a less awkward way.
-
- def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
- val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
- (context.taskMetrics().outputMetrics, bytesWrittenCallback)
- }
-
- def maybeUpdateOutputMetrics(
- outputMetrics: OutputMetrics,
- callback: () => Long,
- recordsWritten: Long): Unit = {
- if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
- outputMetrics.setBytesWritten(callback())
- outputMetrics.setRecordsWritten(recordsWritten)
- }
- }
-
- /**
- * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
- * basis; see SPARK-4835 for more details.
- */
- val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
-}
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
index 46e22b215b..acc9c38571 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala
@@ -15,19 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.internal.io
import java.io.IOException
-import java.text.NumberFormat
-import java.text.SimpleDateFormat
+import java.text.{NumberFormat, SimpleDateFormat}
import java.util.{Date, Locale}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType
+import org.apache.spark.SerializableWritable
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf
diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
new file mode 100644
index 0000000000..de828a6d61
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriterUtils.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale}
+
+import scala.util.DynamicVariable
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{JobConf, JobID}
+
+import org.apache.spark.{SparkConf, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.OutputMetrics
+
+/**
+ * A helper object that provide common utils used during saving an RDD using a Hadoop OutputFormat
+ * (both from the old mapred API and the new mapreduce API)
+ */
+private[spark]
+object SparkHadoopWriterUtils {
+
+ private val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256
+
+ def createJobID(time: Date, id: Int): JobID = {
+ val jobtrackerID = createJobTrackerID(time)
+ new JobID(jobtrackerID, id)
+ }
+
+ def createJobTrackerID(time: Date): String = {
+ new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(time)
+ }
+
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null) {
+ throw new IllegalArgumentException("Output path is null")
+ }
+ val outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (fs == null) {
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ }
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+
+ // Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
+ // setting can take effect:
+ def isOutputSpecValidationEnabled(conf: SparkConf): Boolean = {
+ val validationDisabled = disableOutputSpecValidation.value
+ val enabledInConf = conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
+ enabledInConf && !validationDisabled
+ }
+
+ // TODO: these don't seem like the right abstractions.
+ // We should abstract the duplicate code in a less awkward way.
+
+ def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, () => Long) = {
+ val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback()
+ (context.taskMetrics().outputMetrics, bytesWrittenCallback)
+ }
+
+ def maybeUpdateOutputMetrics(
+ outputMetrics: OutputMetrics,
+ callback: () => Long,
+ recordsWritten: Long): Unit = {
+ if (recordsWritten % RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) {
+ outputMetrics.setBytesWritten(callback())
+ outputMetrics.setRecordsWritten(recordsWritten)
+ }
+ }
+
+ /**
+ * Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
+ * basis; see SPARK-4835 for more details.
+ */
+ val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 52ce03ff8c..58762cc083 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -37,7 +37,8 @@ import org.apache.spark._
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.{SparkHadoopMapReduceWriter, SparkHadoopWriter,
+ SparkHadoopWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer