aboutsummaryrefslogtreecommitdiff
path: root/common/unsafe
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2017-02-16 20:51:19 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-16 20:51:19 -0800
commit21fde57f15db974b710e7b00e72c744da7c1ac3c (patch)
treee51d0ab5ad405ff66c6459738186406a597a8f1c /common/unsafe
parentdcc2d540a53f0bd04baead43fdee1c170ef2b9f3 (diff)
downloadspark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.gz
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.bz2
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.zip
[SPARK-18352][SQL] Support parsing multiline json files
## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. Author: Nathan Howell <nhowell@godaddy.com> Closes #16386 from NathanHowell/SPARK-18352.
Diffstat (limited to 'common/unsafe')
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java20
1 files changed, 17 insertions, 3 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 3800d53c02..87b9e8eb44 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -147,7 +147,13 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
buffer.position(pos + numBytes);
}
- public void writeTo(OutputStream out) throws IOException {
+ /**
+ * Returns a {@link ByteBuffer} wrapping the base object if it is a byte array
+ * or a copy of the data if the base object is not a byte array.
+ *
+ * Unlike getBytes this will not create a copy the array if this is a slice.
+ */
+ public @Nonnull ByteBuffer getByteBuffer() {
if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = (byte[]) base;
@@ -160,12 +166,20 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
throw new ArrayIndexOutOfBoundsException();
}
- out.write(bytes, (int) arrayOffset, numBytes);
+ return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
} else {
- out.write(getBytes());
+ return ByteBuffer.wrap(getBytes());
}
}
+ public void writeTo(OutputStream out) throws IOException {
+ final ByteBuffer bb = this.getByteBuffer();
+ assert(bb.hasArray());
+
+ // similar to Utils.writeByteBuffer but without the spark-core dependency
+ out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+ }
+
/**
* Returns the number of bytes for a code point with the first byte as `b`
* @param b The first byte of a code point