aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2015-08-06 11:15:37 -0700
committerDavies Liu <davies.liu@gmail.com>2015-08-06 11:15:37 -0700
commit2eca46a17a3d46a605804ff89c010017da91e1bc (patch)
tree3ee8fa52d14bce8b62e152da4aa560eae780338b /sql/hive
parent6e009cb9c4d7a395991e10dab427f37019283758 (diff)
downloadspark-2eca46a17a3d46a605804ff89c010017da91e1bc.tar.gz
spark-2eca46a17a3d46a605804ff89c010017da91e1bc.tar.bz2
spark-2eca46a17a3d46a605804ff89c010017da91e1bc.zip
Revert "[SPARK-9632][SQL] update InternalRow.toSeq to make it accept data type info"
This reverts commit 6e009cb9c4d7a395991e10dab427f37019283758.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala24
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala10
4 files changed, 21 insertions, 40 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 9824dad239..39d798d072 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -390,10 +390,8 @@ private[hive] trait HiveInspectors {
(o: Any) => {
if (o != null) {
val struct = soi.create()
- val row = o.asInstanceOf[InternalRow]
- soi.getAllStructFieldRefs.zip(wrappers).zipWithIndex.foreach {
- case ((field, wrapper), i) =>
- soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType)))
+ (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[InternalRow].toSeq).zipped.foreach {
+ (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index ade27454b9..a6a343d395 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -88,7 +88,6 @@ case class ScriptTransformation(
// external process. That process's output will be read by this current thread.
val writerThread = new ScriptTransformationWriterThread(
inputIterator,
- input.map(_.dataType),
outputProjection,
inputSerde,
inputSoi,
@@ -202,7 +201,6 @@ case class ScriptTransformation(
private class ScriptTransformationWriterThread(
iter: Iterator[InternalRow],
- inputSchema: Seq[DataType],
outputProjection: Projection,
@Nullable inputSerde: AbstractSerDe,
@Nullable inputSoi: ObjectInspector,
@@ -228,25 +226,12 @@ private class ScriptTransformationWriterThread(
// We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so
// let's use a variable to record whether the `finally` block was hit due to an exception
var threwException: Boolean = true
- val len = inputSchema.length
try {
iter.map(outputProjection).foreach { row =>
if (inputSerde == null) {
- val data = if (len == 0) {
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
- } else {
- val sb = new StringBuilder
- sb.append(row.get(0, inputSchema(0)))
- var i = 1
- while (i < len) {
- sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
- sb.append(row.get(i, inputSchema(i)))
- i += 1
- }
- sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
- sb.toString()
- }
- outputStream.write(data.getBytes("utf-8"))
+ val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+ outputStream.write(data)
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8dc796b056..684ea1d137 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -211,18 +211,18 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
}
}
- val nonDynamicPartLen = row.numFields - dynamicPartColNames.length
- val dynamicPartPath = dynamicPartColNames.zipWithIndex.map { case (colName, i) =>
- val rawVal = row.get(nonDynamicPartLen + i, schema(colName).dataType)
- val string = if (rawVal == null) null else convertToHiveRawString(colName, rawVal)
- val colString =
- if (string == null || string.isEmpty) {
- defaultPartName
- } else {
- FileUtils.escapePathName(string, defaultPartName)
- }
- s"/$colName=$colString"
- }.mkString
+ val dynamicPartPath = dynamicPartColNames
+ .zip(row.toSeq.takeRight(dynamicPartColNames.length))
+ .map { case (col, rawVal) =>
+ val string = if (rawVal == null) null else convertToHiveRawString(col, rawVal)
+ val colString =
+ if (string == null || string.isEmpty) {
+ defaultPartName
+ } else {
+ FileUtils.escapePathName(string, defaultPartName)
+ }
+ s"/$col=$colString"
+ }.mkString
def newWriter(): FileSinkOperator.RecordWriter = {
val newFileSinkDesc = new FileSinkDesc(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
index 81a70b8d42..99e95fb921 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveInspectorSuite.scala
@@ -133,8 +133,8 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
}
}
- def checkValues(row1: Seq[Any], row2: InternalRow, row2Schema: StructType): Unit = {
- row1.zip(row2.toSeq(row2Schema)).foreach { case (r1, r2) =>
+ def checkValues(row1: Seq[Any], row2: InternalRow): Unit = {
+ row1.zip(row2.toSeq).foreach { case (r1, r2) =>
checkValue(r1, r2)
}
}
@@ -211,10 +211,8 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
case (t, idx) => StructField(s"c_$idx", t)
})
val inspector = toInspector(dt)
- checkValues(
- row,
- unwrap(wrap(InternalRow.fromSeq(row), inspector, dt), inspector).asInstanceOf[InternalRow],
- dt)
+ checkValues(row,
+ unwrap(wrap(InternalRow.fromSeq(row), inspector, dt), inspector).asInstanceOf[InternalRow])
checkValue(null, unwrap(wrap(null, toInspector(dt), dt), toInspector(dt)))
}