aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-08 18:22:53 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-08 18:22:53 -0700
commit74d8d3d928cc9a7386b68588ac89ae042847d146 (patch)
tree0248cc711322eb4a7a6966e9cfbf3a90ca886733 /mllib
parent2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc (diff)
downloadspark-74d8d3d928cc9a7386b68588ac89ae042847d146.tar.gz
spark-74d8d3d928cc9a7386b68588ac89ae042847d146.tar.bz2
spark-74d8d3d928cc9a7386b68588ac89ae042847d146.zip
[SPARK-8450] [SQL] [PYSARK] cleanup type converter for Python DataFrame
This PR fixes the converter for Python DataFrame, especially for DecimalType Closes #7106 Author: Davies Liu <davies@databricks.com> Closes #7131 from davies/decimal_python and squashes the following commits: 4d3c234 [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python 20531d6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python 7d73168 [Davies Liu] fix conflit 6cdd86a [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python 7104e97 [Davies Liu] improve type infer 9cd5a21 [Davies Liu] run python tests with SPARK_PREPEND_CLASSES 829a05b [Davies Liu] fix UDT in python c99e8c5 [Davies Liu] fix mima c46814a [Davies Liu] convert decimal for Python DataFrames
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala16
2 files changed, 7 insertions, 19 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 75e7004464..0df0766340 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -24,9 +24,9 @@ import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHash
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
/**
* Trait for a local matrix.
@@ -147,7 +147,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
))
}
- override def serialize(obj: Any): Row = {
+ override def serialize(obj: Any): InternalRow = {
val row = new GenericMutableRow(7)
obj match {
case sm: SparseMatrix =>
@@ -173,9 +173,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
override def deserialize(datum: Any): Matrix = {
datum match {
- // TODO: something wrong with UDT serialization, should never happen.
- case m: Matrix => m
- case row: Row =>
+ case row: InternalRow =>
require(row.length == 7,
s"MatrixUDT.deserialize given row with length ${row.length} but requires length == 7")
val tpe = row.getByte(0)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index c9c27425d2..e048b01d92 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -28,7 +28,7 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.util.NumericParser
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@@ -175,7 +175,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true)))
}
- override def serialize(obj: Any): Row = {
+ override def serialize(obj: Any): InternalRow = {
obj match {
case SparseVector(size, indices, values) =>
val row = new GenericMutableRow(4)
@@ -191,17 +191,12 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, values.toSeq)
row
- // TODO: There are bugs in UDT serialization because we don't have a clear separation between
- // TODO: internal SQL types and language specific types (including UDT). UDT serialize and
- // TODO: deserialize may get called twice. See SPARK-7186.
- case row: Row =>
- row
}
}
override def deserialize(datum: Any): Vector = {
datum match {
- case row: Row =>
+ case row: InternalRow =>
require(row.length == 4,
s"VectorUDT.deserialize given row with length ${row.length} but requires length == 4")
val tpe = row.getByte(0)
@@ -215,11 +210,6 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
val values = row.getAs[Iterable[Double]](3).toArray
new DenseVector(values)
}
- // TODO: There are bugs in UDT serialization because we don't have a clear separation between
- // TODO: internal SQL types and language specific types (including UDT). UDT serialize and
- // TODO: deserialize may get called twice. See SPARK-7186.
- case v: Vector =>
- v
}
}