aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-08-06 13:11:59 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 13:11:59 -0700
commit1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21 (patch)
treef04d74dffd581fa1eeb8e7a1f929f2aa843cf0a0 /sql/hive/src/main
parenta1bbf1bc5c51cd796015ac159799cf024de6fa07 (diff)
downloadspark-1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21.tar.gz
spark-1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21.tar.bz2
spark-1f62f104c7a2aeac625b17d9e5ac62f1f10a2b21.zip
[SPARK-9632][SQL] update InternalRow.toSeq to make it accept data type info
This re-applies #7955, which was reverted due to a race condition to fix build breaking. Author: Wenchen Fan <cloud0fan@outlook.com> Author: Reynold Xin <rxin@databricks.com> Closes #8002 from rxin/InternalRow-toSeq and squashes the following commits: 332416a [Reynold Xin] Merge pull request #7955 from cloud-fan/toSeq 21665e2 [Wenchen Fan] fix hive again... 4addf29 [Wenchen Fan] fix hive bc16c59 [Wenchen Fan] minor fix 33d802c [Wenchen Fan] pass data type info to InternalRow.toSeq 3dd033e [Wenchen Fan] move the default special getters implementation from InternalRow to BaseGenericInternalRow
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala24
3 files changed, 34 insertions, 17 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 39d798d072..9824dad239 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -390,8 +390,10 @@ private[hive] trait HiveInspectors {
(o: Any) => {
if (o != null) {
val struct = soi.create()
- (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[InternalRow].toSeq).zipped.foreach {
- (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
+ val row = o.asInstanceOf[InternalRow]
+ soi.getAllStructFieldRefs.zip(wrappers).zipWithIndex.foreach {
+ case ((field, wrapper), i) =>
+ soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType)))
}
struct
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index a6a343d395..ade27454b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -88,6 +88,7 @@ case class ScriptTransformation(
// external process. That process's output will be read by this current thread.
val writerThread = new ScriptTransformationWriterThread(
inputIterator,
+ input.map(_.dataType),
outputProjection,
inputSerde,
inputSoi,
@@ -201,6 +202,7 @@ case class ScriptTransformation(
private class ScriptTransformationWriterThread(
iter: Iterator[InternalRow],
+ inputSchema: Seq[DataType],
outputProjection: Projection,
@Nullable inputSerde: AbstractSerDe,
@Nullable inputSoi: ObjectInspector,
@@ -226,12 +228,25 @@ private class ScriptTransformationWriterThread(
// We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so
// let's use a variable to record whether the `finally` block was hit due to an exception
var threwException: Boolean = true
+ val len = inputSchema.length
try {
iter.map(outputProjection).foreach { row =>
if (inputSerde == null) {
- val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
- outputStream.write(data)
+ val data = if (len == 0) {
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
+ } else {
+ val sb = new StringBuilder
+ sb.append(row.get(0, inputSchema(0)))
+ var i = 1
+ while (i < len) {
+ sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
+ sb.append(row.get(i, inputSchema(i)))
+ i += 1
+ }
+ sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
+ sb.toString()
+ }
+ outputStream.write(data.getBytes("utf-8"))
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 684ea1d137..8dc796b056 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -211,18 +211,18 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
}
}
- val dynamicPartPath = dynamicPartColNames
- .zip(row.toSeq.takeRight(dynamicPartColNames.length))
- .map { case (col, rawVal) =>
- val string = if (rawVal == null) null else convertToHiveRawString(col, rawVal)
- val colString =
- if (string == null || string.isEmpty) {
- defaultPartName
- } else {
- FileUtils.escapePathName(string, defaultPartName)
- }
- s"/$col=$colString"
- }.mkString
+ val nonDynamicPartLen = row.numFields - dynamicPartColNames.length
+ val dynamicPartPath = dynamicPartColNames.zipWithIndex.map { case (colName, i) =>
+ val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType)
+ val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal)
+ val colString =
+ if (string == null || string.isEmpty) {
+ defaultPartName
+ } else {
+ FileUtils.escapePathName(string, defaultPartName)
+ }
+ s"/$colName=$colString"
+ }.mkString
def newWriter(): FileSinkOperator.RecordWriter = {
val newFileSinkDesc = new FileSinkDesc(