aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-10-31 21:16:09 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-31 21:16:09 -0700
commitaa494a9c2ebd59baec47beb434cd09bf3f188218 (patch)
tree54bd81e8b7540d08c27be3671bc250ec54020bbf /sql
parent40d3c6797a3dfd037eb69b2bcd336d8544deddf5 (diff)
downloadspark-aa494a9c2ebd59baec47beb434cd09bf3f188218.tar.gz
spark-aa494a9c2ebd59baec47beb434cd09bf3f188218.tar.bz2
spark-aa494a9c2ebd59baec47beb434cd09bf3f188218.zip
[SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation data sources produce UnsafeRow
This PR fixes two issues: 1. `PhysicalRDD.outputsUnsafeRows` is always `false` Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`. 1. Internal/external row conversion for `HadoopFsRelation` is kinda messy Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary. Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`. This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s). All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s. In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`. A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows? At least all well known ones do so. However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations. If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]). This PR supersedes #9125. Follow-ups: 1. Makes JSON and ORC data sources output `UnsafeRow` directly 1. Makes `HiveTableScan` output `UnsafeRow` directly This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`. [1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353 [2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335 [3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669 Author: Cheng Lian <lian@databricks.com> Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala44
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala30
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala31
12 files changed, 156 insertions, 59 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
index 7980a6f36d..ff9393b465 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
@@ -34,7 +34,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
/**
* An helper class to update the fields of UnsafeRow, used by ColumnAccessor
*
- * WARNNING: These setter MUST be called in increasing order of ordinals.
+ * WARNING: These setter MUST be called in increasing order of ordinals.
*/
class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 87bd92e00a..7a466cf6a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SQLContext}
@@ -93,7 +93,9 @@ private[sql] case class LogicalRDD(
private[sql] case class PhysicalRDD(
output: Seq[Attribute],
rdd: RDD[InternalRow],
- extraInformation: String) extends LeafNode {
+ extraInformation: String,
+ override val outputsUnsafeRows: Boolean = false)
+ extends LeafNode {
protected override def doExecute(): RDD[InternalRow] = rdd
@@ -105,7 +107,7 @@ private[sql] object PhysicalRDD {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation): PhysicalRDD = {
- PhysicalRDD(output, rdd, relation.toString)
+ PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index af6626c897..65859865c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,21 +17,21 @@
package org.apache.spark.sql.execution.datasources
-import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, TaskContext}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -106,8 +106,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
l,
projects,
filters,
- (a, f) =>
- toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
+ (a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil
case l @ LogicalRelation(baseRelation: TableScan, _) =>
execution.PhysicalRDD.createFromDataSource(
@@ -152,7 +151,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
- val dataRows = relation.buildScan(
+ val dataRows = relation.buildInternalScan(
requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
// Merges data values with partition values.
@@ -161,7 +160,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
requiredDataColumns,
partitionColumns,
partitionValues,
- toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
+ dataRows)
}
val unionedRows =
@@ -199,15 +198,24 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Builds `AttributeReference`s for all partition columns so that we can use them to project
// required partition columns. Note that if a partition column appears in `requiredColumns`,
// we should use the `AttributeReference` in `requiredColumns`.
- val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
- val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
- requiredColumnMap.getOrElse(a.name, a)
+ val partitionColumns = {
+ val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
+ partitionColumnSchema.toAttributes.map { a =>
+ requiredColumnMap.getOrElse(a.name, a)
+ }
}
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
- val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+ // Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and
+ // `UnsafeProjection`. Because the projection may also adjust column order.
val mutableJoinedRow = new JoinedRow()
- iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
+ val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues)
+ val unsafeProjection =
+ UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+
+ iterator.map { unsafeDataRow =>
+ unsafeProjection(mutableJoinedRow(unsafeDataRow, unsafePartitionValues))
+ }
}
// This is an internal RDD whose call site the user should not be concerned with
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 5f104fca7d..85b52f04c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -122,14 +123,21 @@ private[sql] class JSONRelation(
jsonSchema
}
- override def buildScan(
+ override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[FileStatus]): RDD[Row] = {
- JacksonParser(
+ inputPaths: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+ val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
+ val rows = JacksonParser(
inputRDD.getOrElse(createBaseRdd(inputPaths)),
- StructType(requiredColumns.map(dataSchema(_))),
- sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
+ requiredDataSchema,
+ sqlContext.conf.columnNameOfCorruptRecord)
+
+ rows.mapPartitions { iterator =>
+ val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
+ iterator.map(unsafeProjection)
+ }
}
override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
index ed9e0aa659..eeead9f5d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
@@ -35,7 +35,7 @@ private[parquet] class CatalystRecordMaterializer(
private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
- override def getCurrentRecord: InternalRow = rootConverter.currentRow
+ override def getCurrentRecord: InternalRow = rootConverter.currentRecord
override def getRootConverter: GroupConverter = rootConverter
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index b16c46579f..1f653cd3d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -163,10 +163,14 @@ private[parquet] class CatalystRowConverter(
override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
}
+ private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+
+ private val unsafeProjection = UnsafeProjection.create(catalystType)
+
/**
- * Represents the converted row object once an entire Parquet record is converted.
+ * The [[UnsafeRow]] converted from an entire Parquet record.
*/
- val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+ def currentRecord: UnsafeRow = unsafeProjection(currentRow)
// Converters for each field.
private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 44649a68b3..5a7c6b95b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -282,11 +282,11 @@ private[sql] class ParquetRelation(
}
}
- override def buildScan(
+ override def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
@@ -361,7 +361,7 @@ private[sql] class ParquetRelation(
id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable])
}
}
- }.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row]
+ }
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index ab26c57ad1..52c4421d7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -25,16 +25,20 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder}
+import org.apache.spark.sql.columnar.MutableUnsafeRow
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
/**
* A data source for reading text files.
@@ -79,8 +83,12 @@ private[sql] class TextRelation(
/** This is an internal data source that outputs internal row format. */
override val needConversion: Boolean = false
- /** Read path. */
- override def buildScan(inputPaths: Array[FileStatus]): RDD[Row] = {
+
+ override private[sql] def buildInternalScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputPaths: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
@@ -92,17 +100,19 @@ private[sql] class TextRelation(
sqlContext.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
.mapPartitions { iter =>
- var buffer = new Array[Byte](1024)
- val row = new GenericMutableRow(1)
+ val bufferHolder = new BufferHolder
+ val unsafeRowWriter = new UnsafeRowWriter
+ val unsafeRow = new UnsafeRow
+
iter.map { case (_, line) =>
- if (line.getLength > buffer.length) {
- buffer = new Array[Byte](line.getLength)
- }
- System.arraycopy(line.getBytes, 0, buffer, 0, line.getLength)
- row.update(0, UTF8String.fromBytes(buffer, 0, line.getLength))
- row
+ // Writes to an UnsafeRow directly
+ bufferHolder.reset()
+ unsafeRowWriter.initialize(bufferHolder, 1)
+ unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+ unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
+ unsafeRow
}
- }.asInstanceOf[RDD[Row]]
+ }
}
/** Write path. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a9a013e936..7a55351148 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -585,11 +585,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}
- final private[sql] def buildScan(
+ final private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
- broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
@@ -604,7 +604,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
}
- buildScan(requiredColumns, filters, inputStatuses, broadcastedConf)
+ buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
}
/**
@@ -741,6 +741,44 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
/**
+ * For a non-partitioned relation, this method builds an `RDD[InternalRow]` containing all rows
+ * within this relation. For partitioned relations, this method is called for each selected
+ * partition, and builds an `RDD[InternalRow]` containing all rows within that single partition.
+ *
+ * Note:
+ *
+ * 1. Rows contained in the returned `RDD[InternalRow]` are assumed to be `UnsafeRow`s.
+ * 2. This interface is subject to change in future.
+ *
+ * @param requiredColumns Required columns.
+ * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
+ * of all `filters`. The pushed down filters are currently purely an optimization as they
+ * will all be evaluated again. This means it is safe to use them with methods that produce
+ * false positives such as filtering partitions based on a bloom filter.
+ * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
+ * relation. For a partitioned relation, it contains paths of all data files in a single
+ * selected partition.
+ * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
+ * overhead of broadcasting the Configuration for every Hadoop RDD.
+ */
+ private[sql] def buildInternalScan(
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ inputFiles: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+ val requiredSchema = StructType(requiredColumns.map(dataSchema.apply))
+ val internalRows = {
+ val externalRows = buildScan(requiredColumns, filters, inputFiles, broadcastedConf)
+ execution.RDDConversions.rowToRowRdd(externalRows, requiredSchema.map(_.dataType))
+ }
+
+ internalRows.mapPartitions { iterator =>
+ val unsafeProjection = UnsafeProjection.create(requiredSchema)
+ iterator.map(unsafeProjection)
+ }
+ }
+
+ /**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index baff7f5752..70fae32b7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,8 +22,8 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index d1f30e188e..45de567039 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc
import java.util.Properties
-import scala.collection.JavaConverters._
-
import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
+import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -199,12 +198,13 @@ private[sql] class OrcRelation(
partitionColumns)
}
- override def buildScan(
+ override private[sql] def buildInternalScan(
requiredColumns: Array[String],
filters: Array[Filter],
- inputPaths: Array[FileStatus]): RDD[Row] = {
+ inputPaths: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
- OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]]
+ OrcTableScan(output, this, filters, inputPaths).execute()
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
@@ -253,16 +253,17 @@ private[orc] case class OrcTableScan(
path: String,
conf: Configuration,
iterator: Iterator[Writable],
- nonPartitionKeyAttrs: Seq[(Attribute, Int)],
- mutableRow: MutableRow): Iterator[InternalRow] = {
+ nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+ val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+ val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
// SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
// rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
// partition since we know that this file is empty.
maybeStructOI.map { soi =>
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
@@ -280,7 +281,7 @@ private[orc] case class OrcTableScan(
}
i += 1
}
- mutableRow: InternalRow
+ unsafeProjection(mutableRow)
}
}.getOrElse {
Iterator.empty
@@ -322,13 +323,8 @@ private[orc] case class OrcTableScan(
val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
- fillObject(
- split.getPath.toString,
- wrappedConf.value,
- iterator.map(_._2),
- attributes.zipWithIndex,
- mutableRow)
+ val writableIterator = iterator.map(_._2)
+ fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e3605bb3f6..100b97137c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ConvertToUnsafe
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
@@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
+
+ test("HadoopFsRelation produces UnsafeRow") {
+ withTempTable("test_unsafe") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ sqlContext.range(3).write.format(dataSourceName).save(path)
+ sqlContext.read
+ .format(dataSourceName)
+ .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
+ .load(path)
+ .registerTempTable("test_unsafe")
+
+ val df = sqlContext.sql(
+ """SELECT COUNT(*)
+ |FROM test_unsafe a JOIN test_unsafe b
+ |WHERE a.id = b.id
+ """.stripMargin)
+
+ val plan = df.queryExecution.executedPlan
+
+ assert(
+ plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
+ s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
+ |$plan
+ """.stripMargin)
+
+ checkAnswer(df, Row(3))
+ }
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when