aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-17 16:31:05 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-17 16:31:05 -0800
commit6b7f2f753d16ff038881772f1958e3f4fd5597a7 (patch)
tree6c1a741df551c231a15cb3bf70738882c945cb83 /sql/hive
parent69e858cc7748b6babadd0cbe20e65f3982161cbf (diff)
downloadspark-6b7f2f753d16ff038881772f1958e3f4fd5597a7.tar.gz
spark-6b7f2f753d16ff038881772f1958e3f4fd5597a7.tar.bz2
spark-6b7f2f753d16ff038881772f1958e3f4fd5597a7.zip
[SPARK-4309][SPARK-4407][SQL] Date type support for Thrift server, and fixes for complex types
This PR is exactly the same as #3178 except it reverts the `FileStatus.isDir` to `FileStatus.isDirectory` change, since it doesn't compile with Hadoop 1. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3298) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #3298 from liancheng/date-for-thriftserver and squashes the following commits: 866037e [Cheng Lian] Revers isDirectory to isDir (it breaks Hadoop 1 profile) 6f71d0b [Cheng Lian] Makes toHiveString static 26fa955 [Cheng Lian] Fixes complex type support in Hive 0.13.1 shim a92882a [Cheng Lian] Updates HiveShim for 0.13.1 73f442b [Cheng Lian] Adds Date support for HiveThriftServer2 (Hive 0.12.0)
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala125
1 files changed, 58 insertions, 67 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index e88afaaf00..304b9a73ee 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -19,36 +19,27 @@ package org.apache.spark.sql.hive
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.sql.{Date, Timestamp}
-import java.util.{ArrayList => JArrayList}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.spark.sql.catalyst.types.DecimalType
-import org.apache.spark.sql.catalyst.types.decimal.Decimal
import scala.collection.JavaConversions._
import scala.language.implicitConversions
-import scala.reflect.runtime.universe.{TypeTag, typeTag}
+import scala.reflect.runtime.universe.TypeTag
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.Table
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde2.io.TimestampWritable
-import org.apache.hadoop.hive.serde2.io.DateWritable
+import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators}
-import org.apache.spark.sql.catalyst.analysis.{OverrideCatalog, OverrideFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.ExtractPythonUdfs
-import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.{Command => PhysicalCommand}
+import org.apache.spark.sql.catalyst.types.DecimalType
+import org.apache.spark.sql.catalyst.types.decimal.Decimal
+import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
import org.apache.spark.sql.sources.DataSourceStrategy
@@ -136,7 +127,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val relation = EliminateAnalysisOperators(catalog.lookupRelation(None, tableName))
relation match {
- case relation: MetastoreRelation => {
+ case relation: MetastoreRelation =>
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -157,7 +148,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
def getFileSizeForTable(conf: HiveConf, table: Table): Long = {
- val path = table.getPath()
+ val path = table.getPath
var size: Long = 0L
try {
val fs = path.getFileSystem(conf)
@@ -187,15 +178,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val hiveTTable = relation.hiveQlTable.getTTable
hiveTTable.setParameters(tableParameters)
val tableFullName =
- relation.hiveQlTable.getDbName() + "." + relation.hiveQlTable.getTableName()
+ relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
}
- }
case otherRelation =>
throw new NotImplementedError(
s"Analyze has only implemented for Hive tables, " +
- s"but ${tableName} is a ${otherRelation.nodeName}")
+ s"but $tableName is a ${otherRelation.nodeName}")
}
}
@@ -374,50 +364,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
/** Extends QueryExecution with hive specific features. */
protected[sql] abstract class QueryExecution extends super.QueryExecution {
- protected val primitiveTypes =
- Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
- ShortType, DateType, TimestampType, BinaryType)
-
- protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
- case (struct: Row, StructType(fields)) =>
- struct.zip(fields).map {
- case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
- }.mkString("{", ",", "}")
- case (seq: Seq[_], ArrayType(typ, _)) =>
- seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_,_], MapType(kType, vType, _)) =>
- map.map {
- case (key, value) =>
- toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
- }.toSeq.sorted.mkString("{", ",", "}")
- case (null, _) => "NULL"
- case (d: Date, DateType) => new DateWritable(d).toString
- case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
- case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
- case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString
- HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
- case (other, tpe) if primitiveTypes contains tpe => other.toString
- }
-
- /** Hive outputs fields of structs slightly differently than top level attributes. */
- protected def toHiveStructString(a: (Any, DataType)): String = a match {
- case (struct: Row, StructType(fields)) =>
- struct.zip(fields).map {
- case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
- }.mkString("{", ",", "}")
- case (seq: Seq[_], ArrayType(typ, _)) =>
- seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
- case (map: Map[_, _], MapType(kType, vType, _)) =>
- map.map {
- case (key, value) =>
- toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
- }.toSeq.sorted.mkString("{", ",", "}")
- case (null, _) => "null"
- case (s: String, StringType) => "\"" + s + "\""
- case (decimal, DecimalType()) => decimal.toString
- case (other, tpe) if primitiveTypes contains tpe => other.toString
- }
-
/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
* execution is simply passed back to Hive.
@@ -435,8 +381,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
- val asString = result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq
- asString
+ result.map(_.zip(types).map(HiveContext.toHiveString)).map(_.mkString("\t")).toSeq
}
override def simpleString: String =
@@ -447,3 +392,49 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
}
}
}
+
+object HiveContext {
+ protected val primitiveTypes =
+ Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
+ ShortType, DateType, TimestampType, BinaryType)
+
+ protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
+ case (struct: Row, StructType(fields)) =>
+ struct.zip(fields).map {
+ case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+ }.mkString("{", ",", "}")
+ case (seq: Seq[_], ArrayType(typ, _)) =>
+ seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+ case (map: Map[_,_], MapType(kType, vType, _)) =>
+ map.map {
+ case (key, value) =>
+ toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+ }.toSeq.sorted.mkString("{", ",", "}")
+ case (null, _) => "NULL"
+ case (d: Date, DateType) => new DateWritable(d).toString
+ case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
+ case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
+ case (decimal: Decimal, DecimalType()) => // Hive strips trailing zeros so use its toString
+ HiveShim.createDecimal(decimal.toBigDecimal.underlying()).toString
+ case (other, tpe) if primitiveTypes contains tpe => other.toString
+ }
+
+ /** Hive outputs fields of structs slightly differently than top level attributes. */
+ protected def toHiveStructString(a: (Any, DataType)): String = a match {
+ case (struct: Row, StructType(fields)) =>
+ struct.zip(fields).map {
+ case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""
+ }.mkString("{", ",", "}")
+ case (seq: Seq[_], ArrayType(typ, _)) =>
+ seq.map(v => (v, typ)).map(toHiveStructString).mkString("[", ",", "]")
+ case (map: Map[_, _], MapType(kType, vType, _)) =>
+ map.map {
+ case (key, value) =>
+ toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
+ }.toSeq.sorted.mkString("{", ",", "}")
+ case (null, _) => "null"
+ case (s: String, StringType) => "\"" + s + "\""
+ case (decimal, DecimalType()) => decimal.toString
+ case (other, tpe) if primitiveTypes contains tpe => other.toString
+ }
+}