aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2015-10-01 21:33:27 -0400
committerReynold Xin <rxin@databricks.com>2015-10-01 21:33:27 -0400
commit2272962eb087ffedaee12c761506e33e45bd0239 (patch)
tree879167d8dc582986f685ec2338d7696e3317319d
parent01cd688f5245cbb752863100b399b525b31c3510 (diff)
downloadspark-2272962eb087ffedaee12c761506e33e45bd0239.tar.gz
spark-2272962eb087ffedaee12c761506e33e45bd0239.tar.bz2
spark-2272962eb087ffedaee12c761506e33e45bd0239.zip
[SPARK-9867] [SQL] Move utilities for binary data into ByteArray
The utilities such as Substring#substringBinarySQL and BinaryPrefixComparator#computePrefix for binary data are put together in ByteArray for easy-to-read. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #8122 from maropu/CleanUpForBinaryType.
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java17
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala39
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java47
3 files changed, 52 insertions, 51 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index 71b76d5ddf..d2bf297c6c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -21,6 +21,7 @@ import com.google.common.primitives.UnsignedLongs;
import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.Utils;
@@ -62,21 +63,7 @@ public class PrefixComparators {
}
public static long computePrefix(byte[] bytes) {
- if (bytes == null) {
- return 0L;
- } else {
- /**
- * TODO: If a wrapper for BinaryType is created (SPARK-8786),
- * these codes below will be in the wrapper class.
- */
- final int minLen = Math.min(bytes.length, 8);
- long p = 0;
- for (int i = 0; i < minLen; ++i) {
- p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
- << (56 - 8 * i);
- }
- return p;
- }
+ return ByteArray.getPrefix(bytes);
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index a09d5b6e3a..4ab27c044f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -18,14 +18,12 @@
package org.apache.spark.sql.catalyst.expressions
import java.text.DecimalFormat
-import java.util.Arrays
-import java.util.{Map => JMap, HashMap}
-import java.util.Locale
+import java.util.{HashMap, Locale, Map => JMap}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines expressions for string operations.
@@ -690,34 +688,6 @@ case class StringSpace(child: Expression)
override def prettyName: String = "space"
}
-object Substring {
- def subStringBinarySQL(bytes: Array[Byte], pos: Int, len: Int): Array[Byte] = {
- if (pos > bytes.length) {
- return Array[Byte]()
- }
-
- var start = if (pos > 0) {
- pos - 1
- } else if (pos < 0) {
- bytes.length + pos
- } else {
- 0
- }
-
- val end = if ((bytes.length - start) < len) {
- bytes.length
- } else {
- start + len
- }
-
- start = Math.max(start, 0) // underflow
- if (start < end) {
- Arrays.copyOfRange(bytes, start, end)
- } else {
- Array[Byte]()
- }
- }
-}
/**
* A function that takes a substring of its first argument starting at a given position.
* Defined for String and Binary types.
@@ -740,18 +710,17 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
str.dataType match {
case StringType => string.asInstanceOf[UTF8String]
.substringSQL(pos.asInstanceOf[Int], len.asInstanceOf[Int])
- case BinaryType => Substring.subStringBinarySQL(string.asInstanceOf[Array[Byte]],
+ case BinaryType => ByteArray.subStringSQL(string.asInstanceOf[Array[Byte]],
pos.asInstanceOf[Int], len.asInstanceOf[Int])
}
}
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
- val cls = classOf[Substring].getName
defineCodeGen(ctx, ev, (string, pos, len) => {
str.dataType match {
case StringType => s"$string.substringSQL($pos, $len)"
- case BinaryType => s"$cls.subStringBinarySQL($string, $pos, $len)"
+ case BinaryType => s"${classOf[ByteArray].getName}.subStringSQL($string, $pos, $len)"
}
})
}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java b/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
index c08c9c73d2..3ced2094f5 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java
@@ -19,7 +19,11 @@ package org.apache.spark.unsafe.types;
import org.apache.spark.unsafe.Platform;
-public class ByteArray {
+import java.util.Arrays;
+
+public final class ByteArray {
+
+ public static final byte[] EMPTY_BYTE = new byte[0];
/**
* Writes the content of a byte array into a memory address, identified by an object and an
@@ -29,4 +33,45 @@ public class ByteArray {
public static void writeToMemory(byte[] src, Object target, long targetOffset) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET, target, targetOffset, src.length);
}
+
+ /**
+ * Returns a 64-bit integer that can be used as the prefix used in sorting.
+ */
+ public static long getPrefix(byte[] bytes) {
+ if (bytes == null) {
+ return 0L;
+ } else {
+ final int minLen = Math.min(bytes.length, 8);
+ long p = 0;
+ for (int i = 0; i < minLen; ++i) {
+ p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
+ << (56 - 8 * i);
+ }
+ return p;
+ }
+ }
+
+ public static byte[] subStringSQL(byte[] bytes, int pos, int len) {
+ // This pos calculation is according to UTF8String#subStringSQL
+ if (pos > bytes.length) {
+ return EMPTY_BYTE;
+ }
+ int start = 0;
+ int end;
+ if (pos > 0) {
+ start = pos - 1;
+ } else if (pos < 0) {
+ start = bytes.length + pos;
+ }
+ if ((bytes.length - start) < len) {
+ end = bytes.length;
+ } else {
+ end = start + len;
+ }
+ start = Math.max(start, 0); // underflow
+ if (start >= end) {
+ return EMPTY_BYTE;
+ }
+ return Arrays.copyOfRange(bytes, start, end);
+ }
}