diff options
author | Davies Liu <davies@databricks.com> | 2015-07-08 18:22:53 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-07-08 18:22:53 -0700 |
commit | 74d8d3d928cc9a7386b68588ac89ae042847d146 (patch) | |
tree | 0248cc711322eb4a7a6966e9cfbf3a90ca886733 /mllib | |
parent | 2a4f88b6c16f2991e63b17c0e103bcd79f04dbbc (diff) | |
download | spark-74d8d3d928cc9a7386b68588ac89ae042847d146.tar.gz spark-74d8d3d928cc9a7386b68588ac89ae042847d146.tar.bz2 spark-74d8d3d928cc9a7386b68588ac89ae042847d146.zip |
[SPARK-8450] [SQL] [PYSARK] cleanup type converter for Python DataFrame
This PR fixes the converter for Python DataFrame, especially for DecimalType
Closes #7106
Author: Davies Liu <davies@databricks.com>
Closes #7131 from davies/decimal_python and squashes the following commits:
4d3c234 [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python
20531d6 [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python
7d73168 [Davies Liu] fix conflit
6cdd86a [Davies Liu] Merge branch 'master' of github.com:apache/spark into decimal_python
7104e97 [Davies Liu] improve type infer
9cd5a21 [Davies Liu] run python tests with SPARK_PREPEND_CLASSES
829a05b [Davies Liu] fix UDT in python
c99e8c5 [Davies Liu] fix mima
c46814a [Davies Liu] convert decimal for Python DataFrames
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala | 10 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala | 16 |
2 files changed, 7 insertions, 19 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala index 75e7004464..0df0766340 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala @@ -24,9 +24,9 @@ import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHash import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ /** * Trait for a local matrix. @@ -147,7 +147,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] { )) } - override def serialize(obj: Any): Row = { + override def serialize(obj: Any): InternalRow = { val row = new GenericMutableRow(7) obj match { case sm: SparseMatrix => @@ -173,9 +173,7 @@ private[spark] class MatrixUDT extends UserDefinedType[Matrix] { override def deserialize(datum: Any): Matrix = { datum match { - // TODO: something wrong with UDT serialization, should never happen. - case m: Matrix => m - case row: Row => + case row: InternalRow => require(row.length == 7, s"MatrixUDT.deserialize given row with length ${row.length} but requires length == 7") val tpe = row.getByte(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index c9c27425d2..e048b01d92 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -28,7 +28,7 @@ import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV} import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mllib.util.NumericParser -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @@ -175,7 +175,7 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) } - override def serialize(obj: Any): Row = { + override def serialize(obj: Any): InternalRow = { obj match { case SparseVector(size, indices, values) => val row = new GenericMutableRow(4) @@ -191,17 +191,12 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { row.setNullAt(2) row.update(3, values.toSeq) row - // TODO: There are bugs in UDT serialization because we don't have a clear separation between - // TODO: internal SQL types and language specific types (including UDT). UDT serialize and - // TODO: deserialize may get called twice. See SPARK-7186. - case row: Row => - row } } override def deserialize(datum: Any): Vector = { datum match { - case row: Row => + case row: InternalRow => require(row.length == 4, s"VectorUDT.deserialize given row with length ${row.length} but requires length == 4") val tpe = row.getByte(0) @@ -215,11 +210,6 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { val values = row.getAs[Iterable[Double]](3).toArray new DenseVector(values) } - // TODO: There are bugs in UDT serialization because we don't have a clear separation between - // TODO: internal SQL types and language specific types (including UDT). UDT serialize and - // TODO: deserialize may get called twice. See SPARK-7186. - case v: Vector => - v } } |