From c82f16c15e0d4bfc54fb890a667d9164a088b5c6 Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Thu, 1 Dec 2016 21:40:49 -0800 Subject: [SPARK-18658][SQL] Write text records directly to a FileOutputStream ## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell Closes #16089 from NathanHowell/SPARK-18658. --- .../org/apache/spark/unsafe/types/UTF8String.java | 19 ++++ .../apache/spark/unsafe/types/UTF8StringSuite.java | 109 +++++++++++++++++++++ 2 files changed, 128 insertions(+) (limited to 'common') diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index e09a6b7d93..0255f53113 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -147,6 +147,25 @@ public final class UTF8String implements Comparable, Externalizable, buffer.position(pos + numBytes); } + public void writeTo(OutputStream out) throws IOException { + if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) { + final byte[] bytes = (byte[]) base; + + // the offset includes an object header... this is only needed for unsafe copies + final long arrayOffset = offset - BYTE_ARRAY_OFFSET; + + // verify that the offset and length points somewhere inside the byte array + // and that the offset can safely be truncated to a 32-bit integer + if ((long) bytes.length < arrayOffset + numBytes) { + throw new ArrayIndexOutOfBoundsException(); + } + + out.write(bytes, (int) arrayOffset, numBytes); + } else { + out.write(getBytes()); + } + } + /** * Returns the number of bytes for a code point with the first byte as `b` * @param b The first byte of a code point diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java index 7f03686dce..04f684577d 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java @@ -17,15 +17,22 @@ package org.apache.spark.unsafe.types; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import com.google.common.collect.ImmutableMap; +import org.apache.spark.unsafe.Platform; import org.junit.Test; import static org.junit.Assert.*; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; import static org.apache.spark.unsafe.types.UTF8String.*; public class UTF8StringSuite { @@ -499,4 +506,106 @@ public class UTF8StringSuite { assertEquals(fromString("123").soundex(), fromString("123")); assertEquals(fromString("世界千世").soundex(), fromString("世界千世")); } + + @Test + public void writeToOutputStreamUnderflow() throws IOException { + // offset underflow is apparently supported? + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); + + for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) { + UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i) + .writeTo(outputStream); + final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length); + assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString()); + outputStream.reset(); + } + } + + @Test + public void writeToOutputStreamSlice() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i < test.length; ++i) { + for (int j = 0; j < test.length - i; ++j) { + UTF8String.fromAddress(test, Platform.BYTE_ARRAY_OFFSET + i, j) + .writeTo(outputStream); + + assertArrayEquals(Arrays.copyOfRange(test, i, i + j), outputStream.toByteArray()); + outputStream.reset(); + } + } + } + + @Test + public void writeToOutputStreamOverflow() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8); + + final HashSet offsets = new HashSet<>(); + for (int i = 0; i < 16; ++i) { + // touch more points around MAX_VALUE + offsets.add((long) Integer.MAX_VALUE - i); + // subtract off BYTE_ARRAY_OFFSET to avoid wrapping around to a negative value, + // which will hit the slower copy path instead of the optimized one + offsets.add(Long.MAX_VALUE - BYTE_ARRAY_OFFSET - i); + } + + for (long i = 1; i > 0L; i <<= 1) { + for (long j = 0; j < 32L; ++j) { + offsets.add(i + j); + } + } + + for (final long offset : offsets) { + try { + fromAddress(test, BYTE_ARRAY_OFFSET + offset, test.length) + .writeTo(outputStream); + + throw new IllegalStateException(Long.toString(offset)); + } catch (ArrayIndexOutOfBoundsException e) { + // ignore + } finally { + outputStream.reset(); + } + } + } + + @Test + public void writeToOutputStream() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + EMPTY_UTF8.writeTo(outputStream); + assertEquals("", outputStream.toString("UTF-8")); + outputStream.reset(); + + fromString("数据砖很重").writeTo(outputStream); + assertEquals( + "数据砖很重", + outputStream.toString("UTF-8")); + outputStream.reset(); + } + + @Test + public void writeToOutputStreamIntArray() throws IOException { + // verify that writes work on objects that are not byte arrays + final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大千世界"); + buffer.position(0); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + final int length = buffer.limit(); + assertEquals(12, length); + + final int ints = length / 4; + final int[] array = new int[ints]; + + for (int i = 0; i < ints; ++i) { + array[i] = buffer.getInt(); + } + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + fromAddress(array, Platform.INT_ARRAY_OFFSET, length) + .writeTo(outputStream); + assertEquals("大千世界", outputStream.toString("UTF-8")); + } } -- cgit v1.2.3