From 6b7f2f753d16ff038881772f1958e3f4fd5597a7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 17 Nov 2014 16:31:05 -0800 Subject: [SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types This PR is exactly the same as #3178 except it reverts the `FileStatus.isDir` to `FileStatus.isDirectory` change, since it doesn't compile with Hadoop 1. [Review on Reviewable](https://reviewable.io/reviews/apache/spark/3298) Author: Cheng Lian Closes #3298 from liancheng/date-for-thriftserver and squashes the following commits: 866037e [Cheng Lian] Revers isDirectory to isDir (it breaks Hadoop 1 profile) 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0) --- .../org/apache/spark/sql/hive/HiveContext.scala | 125 ++++++++++----------- 1 file changed, 58 insertions(+), 67 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index e88afaaf00..304b9a73ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,36 +19,27 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.spark.sql.catalyst.types.DecimalType -import org.apache.spark.sql.catalyst.types.decimal.Decimal import scala.collection.JavaConversions._ import scala.language.implicitConversions -import scala.reflect.runtime.universe.{TypeTag, typeTag} +import scala.reflect.runtime.universe.TypeTag -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.serde2.io.TimestampWritable -import org.apache.hadoop.hive.serde2.io.DateWritable +import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators} -import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.ExtractPythonUdfs -import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.{Command => PhysicalCommand} +import org.apache.spark.sql.catalyst.types.DecimalType +import org.apache.spark.sql.catalyst.types.decimal.Decimal +import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand import org.apache.spark.sql.sources.DataSourceStrategy @@ -136,7 +127,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) relation match { - case relation: MetastoreRelation => { + case relation: MetastoreRelation => // This method is mainly based on // org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table) // in Hive 0.13 (except that we do not use fs.getContentSummary). @@ -157,7 +148,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } def getFileSizeForTable(conf: HiveConf, table: Table): Long = { - val path = table.getPath() + val path = table.getPath var size: Long = 0L try { val fs = path.getFileSystem(conf) @@ -187,15 +178,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val hiveTTable = relation.hiveQlTable.getTTable hiveTTable.setParameters(tableParameters) val tableFullName = - relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName() + relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName catalog.client.alterTable(tableFullName, new Table(hiveTTable)) } - } case otherRelation => throw new NotImplementedError( s"Analyze has only implemented for Hive tables, " + - s"but ${tableName} is a ${otherRelation.nodeName}") + s"but $tableName is a ${otherRelation.nodeName}") } } @@ -374,50 +364,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /** Extends QueryExecution with hive specific features. */ protected[sql] abstract class QueryExecution extends super.QueryExecution { - protected val primitiveTypes = - Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, - ShortType, DateType, TimestampType, BinaryType) - - protected[sql] def toHiveString(a: (Any, DataType)): String = a match { - case (struct: Row, StructType(fields)) => - struct.zip(fields).map { - case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" - }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") - case (map: Map[_,_], MapType(kType, vType, _)) => - map.map { - case (key, value) => - toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) - }.toSeq.sorted.mkString("{", ",", "}") - case (null, _) => "NULL" - case (d: Date, DateType) => new DateWritable(d).toString - case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString - case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") - case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString - HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString - case (other, tpe) if primitiveTypes contains tpe => other.toString - } - - /** Hive outputs fields of structs slightly differently than top level attributes. */ - protected def toHiveStructString(a: (Any, DataType)): String = a match { - case (struct: Row, StructType(fields)) => - struct.zip(fields).map { - case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" - }.mkString("{", ",", "}") - case (seq: Seq[_], ArrayType(typ, _)) => - seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") - case (map: Map[_, _], MapType(kType, vType, _)) => - map.map { - case (key, value) => - toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) - }.toSeq.sorted.mkString("{", ",", "}") - case (null, _) => "null" - case (s: String, StringType) => "\"" + s + "\"" - case (decimal, DecimalType()) => decimal.toString - case (other, tpe) if primitiveTypes contains tpe => other.toString - } - /** * Returns the result as a hive compatible sequence of strings. For native commands, the * execution is simply passed back to Hive. @@ -435,8 +381,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { // We need the types so we can output struct field names val types = analyzed.output.map(_.dataType) // Reformat to match hive tab delimited output. - val asString = result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq - asString + result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq } override def simpleString: String = @@ -447,3 +392,49 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } } + +object HiveContext { + protected val primitiveTypes = + Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, + ShortType, DateType, TimestampType, BinaryType) + + protected[sql] def toHiveString(a: (Any, DataType)): String = a match { + case (struct: Row, StructType(fields)) => + struct.zip(fields).map { + case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" + }.mkString("{", ",", "}") + case (seq: Seq[_], ArrayType(typ, _)) => + seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") + case (map: Map[_,_], MapType(kType, vType, _)) => + map.map { + case (key, value) => + toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) + }.toSeq.sorted.mkString("{", ",", "}") + case (null, _) => "NULL" + case (d: Date, DateType) => new DateWritable(d).toString + case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString + case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") + case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString + HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString + case (other, tpe) if primitiveTypes contains tpe => other.toString + } + + /** Hive outputs fields of structs slightly differently than top level attributes. */ + protected def toHiveStructString(a: (Any, DataType)): String = a match { + case (struct: Row, StructType(fields)) => + struct.zip(fields).map { + case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}""" + }.mkString("{", ",", "}") + case (seq: Seq[_], ArrayType(typ, _)) => + seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]") + case (map: Map[_, _], MapType(kType, vType, _)) => + map.map { + case (key, value) => + toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) + }.toSeq.sorted.mkString("{", ",", "}") + case (null, _) => "null" + case (s: String, StringType) => "\"" + s + "\"" + case (decimal, DecimalType()) => decimal.toString + case (other, tpe) if primitiveTypes contains tpe => other.toString + } +} -- cgit v1.2.3