aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-11 10:17:54 -0800
committerReynold Xin <rxin@databricks.com>2015-11-11 10:17:54 -0800
commit95daff6459fc749949c2d71a0b7ab1c5be854f70 (patch)
treebb448664ae5fbe1806c4206f4325a4d26a5e8be6
parent27524a3a9ccee6fbe56149180ebfb3f74e0957e7 (diff)
downloadspark-95daff6459fc749949c2d71a0b7ab1c5be854f70.tar.gz
spark-95daff6459fc749949c2d71a0b7ab1c5be854f70.tar.bz2
spark-95daff6459fc749949c2d71a0b7ab1c5be854f70.zip
[SPARK-11646] WholeTextFileRDD should return Text rather than String
If it returns Text, we can reuse this in Spark SQL to provide a WholeTextFile data source and directly convert the Text into UTF8String without extra string decoding and encoding. Author: Reynold Xin <rxin@databricks.com> Closes #9622 from rxin/SPARK-11646.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala56
5 files changed, 69 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 67270c38fa..43a241686f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -863,10 +863,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
- classOf[String],
- classOf[String],
+ classOf[Text],
+ classOf[Text],
updateConf,
- minPartitions).setName(path)
+ minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 1ba34a1141..413408723b 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -20,6 +20,7 @@ package org.apache.spark.input
import scala.collection.JavaConverters._
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
@@ -33,14 +34,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
*/
private[spark] class WholeTextFileInputFormat
- extends CombineFileInputFormat[String, String] with Configurable {
+ extends CombineFileInputFormat[Text, Text] with Configurable {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
override def createRecordReader(
split: InputSplit,
- context: TaskAttemptContext): RecordReader[String, String] = {
-
+ context: TaskAttemptContext): RecordReader[Text, Text] = {
val reader =
new ConfigurableCombineFileRecordReader(split, context, classOf[WholeTextFileRecordReader])
reader.setConf(getConf)
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 31bde8a78f..b56b2aa88a 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -49,7 +49,7 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
- extends RecordReader[String, String] with Configurable {
+ extends RecordReader[Text, Text] with Configurable {
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(
@@ -58,8 +58,8 @@ private[spark] class WholeTextFileRecordReader(
// True means the current file has been processed, then skip it.
private[this] var processed = false
- private[this] val key = path.toString
- private[this] var value: String = null
+ private[this] val key: Text = new Text(path.toString)
+ private[this] var value: Text = null
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
@@ -67,9 +67,9 @@ private[spark] class WholeTextFileRecordReader(
override def getProgress: Float = if (processed) 1.0f else 0.0f
- override def getCurrentKey: String = key
+ override def getCurrentKey: Text = key
- override def getCurrentValue: String = value
+ override def getCurrentValue: Text = value
override def nextKeyValue(): Boolean = {
if (!processed) {
@@ -83,7 +83,7 @@ private[spark] class WholeTextFileRecordReader(
ByteStreams.toByteArray(fileIn)
}
- value = new Text(innerBuffer).toString
+ value = new Text(innerBuffer)
Closeables.close(fileIn, false)
processed = true
true
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 9c4b70844b..d1960990da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.input.WholeTextFileInputFormat
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel
@@ -59,7 +58,6 @@ private[spark] class NewHadoopPartition(
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
- * @param conf The Hadoop configuration.
*/
@DeveloperApi
class NewHadoopRDD[K, V](
@@ -282,32 +280,3 @@ private[spark] object NewHadoopRDD {
}
}
}
-
-private[spark] class WholeTextFileRDD(
- sc : SparkContext,
- inputFormatClass: Class[_ <: WholeTextFileInputFormat],
- keyClass: Class[String],
- valueClass: Class[String],
- conf: Configuration,
- minPartitions: Int)
- extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
-
- override def getPartitions: Array[Partition] = {
- val inputFormat = inputFormatClass.newInstance
- val conf = getConf
- inputFormat match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val jobContext = newJobContext(conf, jobId)
- inputFormat.setMinPartitions(jobContext, minPartitions)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val result = new Array[Partition](rawSplits.size)
- for (i <- 0 until rawSplits.size) {
- result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- result
- }
-}
-
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
new file mode 100644
index 0000000000..e3f14fe7ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.{Text, Writable}
+import org.apache.hadoop.mapreduce.InputSplit
+
+import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.input.WholeTextFileInputFormat
+
+/**
+ * An RDD that reads a bunch of text files in, and each text file becomes one record.
+ */
+private[spark] class WholeTextFileRDD(
+ sc : SparkContext,
+ inputFormatClass: Class[_ <: WholeTextFileInputFormat],
+ keyClass: Class[Text],
+ valueClass: Class[Text],
+ conf: Configuration,
+ minPartitions: Int)
+ extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) {
+
+ override def getPartitions: Array[Partition] = {
+ val inputFormat = inputFormatClass.newInstance
+ val conf = getConf
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val jobContext = newJobContext(conf, jobId)
+ inputFormat.setMinPartitions(jobContext, minPartitions)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val result = new Array[Partition](rawSplits.size)
+ for (i <- 0 until rawSplits.size) {
+ result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
+ }
+ result
+ }
+}