diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 67 |
1 files changed, 1 insertions, 66 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index a2c8092e01..5de3507a67 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -27,12 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc._ import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} +import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -260,69 +258,6 @@ private[orc] class OrcOutputWriter( } } -private[orc] case class OrcTableScan( - @transient sparkSession: SparkSession, - attributes: Seq[Attribute], - filters: Array[Filter], - @transient inputPaths: Seq[FileStatus]) - extends Logging - with HiveInspectors { - - def execute(): RDD[InternalRow] = { - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) - val conf = job.getConfiguration - - // Figure out the actual schema from the ORC source (without partition columns) so that we - // can pick the correct ordinals. Note that this assumes that all files have the same schema. - val orcFormat = new OrcFileFormat - val dataSchema = - orcFormat - .inferSchema(sparkSession, Map.empty, inputPaths) - .getOrElse(sys.error("Failed to read schema from target ORC files.")) - - // Tries to push down filters if ORC filter push-down is enabled - if (sparkSession.sessionState.conf.orcFilterPushDown) { - OrcFilters.createFilter(dataSchema, filters).foreach { f => - conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) - } - } - - // Sets requested columns - OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) - - if (inputPaths.isEmpty) { - // the input path probably be pruned, return an empty RDD. - return sparkSession.sparkContext.emptyRDD[InternalRow] - } - FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) - - val inputFormatClass = - classOf[OrcInputFormat] - .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] - - val rdd = sparkSession.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], - inputFormatClass, - classOf[NullWritable], - classOf[Writable] - ).asInstanceOf[HadoopRDD[NullWritable, Writable]] - - val wrappedConf = new SerializableConfiguration(conf) - - rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => - val writableIterator = iterator.map(_._2) - val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf)) - OrcRelation.unwrapOrcStructs( - wrappedConf.value, - StructType.fromAttributes(attributes), - maybeStructOI, - writableIterator - ) - } - } -} - private[orc] object OrcTableScan { // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. private[orc] val SARG_PUSHDOWN = "sarg.pushdown" |