aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2016-12-01 21:40:49 -0800
committerReynold Xin <rxin@databricks.com>2016-12-01 21:40:49 -0800
commitc82f16c15e0d4bfc54fb890a667d9164a088b5c6 (patch)
tree64d4c58b9c780e27135e33bfc9667abbc5d85ea0 /common
parentd3c90b74edecc527ee468bead41d1cca0b667668 (diff)
downloadspark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.gz
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.bz2
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.zip
[SPARK-18658][SQL] Write text records directly to a FileOutputStream
## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell <nhowell@godaddy.com> Closes #16089 from NathanHowell/SPARK-18658.
Diffstat (limited to 'common')
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java19
-rw-r--r--common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java109
2 files changed, 128 insertions, 0 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index e09a6b7d93..0255f53113 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -147,6 +147,25 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
buffer.position(pos + numBytes);
}
+ public void writeTo(OutputStream out) throws IOException {
+ if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
+ final byte[] bytes = (byte[]) base;
+
+ // the offset includes an object header... this is only needed for unsafe copies
+ final long arrayOffset = offset - BYTE_ARRAY_OFFSET;
+
+ // verify that the offset and length points somewhere inside the byte array
+ // and that the offset can safely be truncated to a 32-bit integer
+ if ((long) bytes.length < arrayOffset + numBytes) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ out.write(bytes, (int) arrayOffset, numBytes);
+ } else {
+ out.write(getBytes());
+ }
+ }
+
/**
* Returns the number of bytes for a code point with the first byte as `b`
* @param b The first byte of a code point
diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
index 7f03686dce..04f684577d 100644
--- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
+++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java
@@ -17,15 +17,22 @@
package org.apache.spark.unsafe.types;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import com.google.common.collect.ImmutableMap;
+import org.apache.spark.unsafe.Platform;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
import static org.apache.spark.unsafe.types.UTF8String.*;
public class UTF8StringSuite {
@@ -499,4 +506,106 @@ public class UTF8StringSuite {
assertEquals(fromString("123").soundex(), fromString("123"));
assertEquals(fromString("世界千世").soundex(), fromString("世界千世"));
}
+
+ @Test
+ public void writeToOutputStreamUnderflow() throws IOException {
+ // offset underflow is apparently supported?
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
+
+ for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
+ UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i)
+ .writeTo(outputStream);
+ final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
+ assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
+ outputStream.reset();
+ }
+ }
+
+ @Test
+ public void writeToOutputStreamSlice() throws IOException {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
+
+ for (int i = 0; i < test.length; ++i) {
+ for (int j = 0; j < test.length - i; ++j) {
+ UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j)
+ .writeTo(outputStream);
+
+ assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray());
+ outputStream.reset();
+ }
+ }
+ }
+
+ @Test
+ public void writeToOutputStreamOverflow() throws IOException {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);
+
+ final HashSet<Long> offsets = new HashSet<>();
+ for (int i = 0; i < 16; ++i) {
+ // touch more points around MAX_VALUE
+ offsets.add((long) Integer.MAX_VALUE - i);
+ // subtract off BYTE_ARRAY_OFFSET to avoid wrapping around to a negative value,
+ // which will hit the slower copy path instead of the optimized one
+ offsets.add(Long.MAX_VALUE - BYTE_ARRAY_OFFSET - i);
+ }
+
+ for (long i = 1; i > 0L; i <<= 1) {
+ for (long j = 0; j < 32L; ++j) {
+ offsets.add(i + j);
+ }
+ }
+
+ for (final long offset : offsets) {
+ try {
+ fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length)
+ .writeTo(outputStream);
+
+ throw new IllegalStateException(Long.toString(offset));
+ } catch (ArrayIndexOutOfBoundsException e) {
+ // ignore
+ } finally {
+ outputStream.reset();
+ }
+ }
+ }
+
+ @Test
+ public void writeToOutputStream() throws IOException {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ EMPTY_UTF8.writeTo(outputStream);
+ assertEquals("", outputStream.toString("UTF-8"));
+ outputStream.reset();
+
+ fromString("数据砖很重").writeTo(outputStream);
+ assertEquals(
+ "数据砖很重",
+ outputStream.toString("UTF-8"));
+ outputStream.reset();
+ }
+
+ @Test
+ public void writeToOutputStreamIntArray() throws IOException {
+ // verify that writes work on objects that are not byte arrays
+ final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界");
+ buffer.position(0);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ final int length = buffer.limit();
+ assertEquals(12, length);
+
+ final int ints = length / 4;
+ final int[] array = new int[ints];
+
+ for (int i = 0; i < ints; ++i) {
+ array[i] = buffer.getInt();
+ }
+
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ fromAddress(array, Platform.INT_ARRAY_OFFSET, length)
+ .writeTo(outputStream);
+ assertEquals("大千世界", outputStream.toString("UTF-8"));
+ }
}