diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-06-13 19:59:53 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2016-06-13 19:59:53 -0700 |
commit | baa3e633e18c47b12e79fe3ddc01fc8ec010f096 (patch) | |
tree | 83c91014b9d46fc9efc4bf1f5dcef5cee1fe184a /mllib/src/main | |
parent | 5827b65e28da168286c771c53a38620d79f5e74f (diff) | |
download | spark-baa3e633e18c47b12e79fe3ddc01fc8ec010f096.tar.gz spark-baa3e633e18c47b12e79fe3ddc01fc8ec010f096.tar.bz2 spark-baa3e633e18c47b12e79fe3ddc01fc8ec010f096.zip |
[SPARK-15364][ML][PYSPARK] Implement PySpark picklers for ml.Vector and ml.Matrix under spark.ml.python
## What changes were proposed in this pull request?
Now we have PySpark picklers for new and old vector/matrix, individually. However, they are all implemented under `PythonMLlibAPI`. To separate spark.mllib from spark.ml, we should implement the picklers of new vector/matrix under `spark.ml.python` instead.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #13219 from viirya/pyspark-pickler-ml.
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala | 224 | ||||
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 309 |
2 files changed, 292 insertions, 241 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala new file mode 100644 index 0000000000..1279c901c5 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLSerDe.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.python + +import java.io.OutputStream +import java.nio.{ByteBuffer, ByteOrder} +import java.util.{ArrayList => JArrayList} + +import scala.collection.JavaConverters._ + +import net.razorvine.pickle._ + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.python.SerDeUtil +import org.apache.spark.ml.linalg._ +import org.apache.spark.mllib.api.python.SerDeBase +import org.apache.spark.rdd.RDD + +/** + * SerDe utility functions for pyspark.ml. + */ +private[spark] object MLSerDe extends SerDeBase with Serializable { + + override val PYSPARK_PACKAGE = "pyspark.ml" + + // Pickler for DenseVector + private[python] class DenseVectorPickler extends BasePickler[DenseVector] { + + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val vector: DenseVector = obj.asInstanceOf[DenseVector] + val bytes = new Array[Byte](8 * vector.size) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + val db = bb.asDoubleBuffer() + db.put(vector.values) + + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(bytes.length)) + out.write(bytes) + out.write(Opcodes.TUPLE1) + } + + def construct(args: Array[Object]): Object = { + require(args.length == 1) + if (args.length != 1) { + throw new PickleException("should be 1") + } + val bytes = getBytes(args(0)) + val bb = ByteBuffer.wrap(bytes, 0, bytes.length) + bb.order(ByteOrder.nativeOrder()) + val db = bb.asDoubleBuffer() + val ans = new Array[Double](bytes.length / 8) + db.get(ans) + Vectors.dense(ans) + } + } + + // Pickler for DenseMatrix + private[python] class DenseMatrixPickler extends BasePickler[DenseMatrix] { + + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val m: DenseMatrix = obj.asInstanceOf[DenseMatrix] + val bytes = new Array[Byte](8 * m.values.length) + val order = ByteOrder.nativeOrder() + val isTransposed = if (m.isTransposed) 1 else 0 + ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values) + + out.write(Opcodes.MARK) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(m.numRows)) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(m.numCols)) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(bytes.length)) + out.write(bytes) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(isTransposed)) + out.write(Opcodes.TUPLE) + } + + def construct(args: Array[Object]): Object = { + if (args.length != 4) { + throw new PickleException("should be 4") + } + val bytes = getBytes(args(2)) + val n = bytes.length / 8 + val values = new Array[Double](n) + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values) + val isTransposed = args(3).asInstanceOf[Int] == 1 + new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values, isTransposed) + } + } + + // Pickler for SparseMatrix + private[python] class SparseMatrixPickler extends BasePickler[SparseMatrix] { + + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val s = obj.asInstanceOf[SparseMatrix] + val order = ByteOrder.nativeOrder() + + val colPtrsBytes = new Array[Byte](4 * s.colPtrs.length) + val indicesBytes = new Array[Byte](4 * s.rowIndices.length) + val valuesBytes = new Array[Byte](8 * s.values.length) + val isTransposed = if (s.isTransposed) 1 else 0 + ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().put(s.colPtrs) + ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().put(s.rowIndices) + ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().put(s.values) + + out.write(Opcodes.MARK) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(s.numRows)) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(s.numCols)) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(colPtrsBytes.length)) + out.write(colPtrsBytes) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(indicesBytes.length)) + out.write(indicesBytes) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(valuesBytes.length)) + out.write(valuesBytes) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(isTransposed)) + out.write(Opcodes.TUPLE) + } + + def construct(args: Array[Object]): Object = { + if (args.length != 6) { + throw new PickleException("should be 6") + } + val order = ByteOrder.nativeOrder() + val colPtrsBytes = getBytes(args(2)) + val indicesBytes = getBytes(args(3)) + val valuesBytes = getBytes(args(4)) + val colPtrs = new Array[Int](colPtrsBytes.length / 4) + val rowIndices = new Array[Int](indicesBytes.length / 4) + val values = new Array[Double](valuesBytes.length / 8) + ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().get(colPtrs) + ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().get(rowIndices) + ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().get(values) + val isTransposed = args(5).asInstanceOf[Int] == 1 + new SparseMatrix( + args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], colPtrs, rowIndices, values, + isTransposed) + } + } + + // Pickler for SparseVector + private[python] class SparseVectorPickler extends BasePickler[SparseVector] { + + def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { + val v: SparseVector = obj.asInstanceOf[SparseVector] + val n = v.indices.length + val indiceBytes = new Array[Byte](4 * n) + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices) + val valueBytes = new Array[Byte](8 * n) + ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values) + + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(v.size)) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(indiceBytes.length)) + out.write(indiceBytes) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(valueBytes.length)) + out.write(valueBytes) + out.write(Opcodes.TUPLE3) + } + + def construct(args: Array[Object]): Object = { + if (args.length != 3) { + throw new PickleException("should be 3") + } + val size = args(0).asInstanceOf[Int] + val indiceBytes = getBytes(args(1)) + val valueBytes = getBytes(args(2)) + val n = indiceBytes.length / 4 + val indices = new Array[Int](n) + val values = new Array[Double](n) + if (n > 0) { + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices) + ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values) + } + new SparseVector(size, indices, values) + } + } + + var initialized = false + // This should be called before trying to serialize any above classes + // In cluster mode, this should be put in the closure + override def initialize(): Unit = { + SerDeUtil.initialize() + synchronized { + if (!initialized) { + new DenseVectorPickler().register() + new DenseMatrixPickler().register() + new SparseMatrixPickler().register() + new SparseVectorPickler().register() + initialized = true + } + } + } + // will not called in Executor automatically + initialize() +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e43469bf1c..7df61601fb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -30,7 +30,6 @@ import net.razorvine.pickle._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.python.SerDeUtil -import org.apache.spark.ml.linalg.{DenseMatrix => NewDenseMatrix, DenseVector => NewDenseVector, SparseMatrix => NewSparseMatrix, SparseVector => NewSparseVector, Vectors => NewVectors} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ import org.apache.spark.mllib.evaluation.RankingMetrics @@ -1205,23 +1204,21 @@ private[python] class PythonMLLibAPI extends Serializable { } /** - * SerDe utility functions for PythonMLLibAPI. + * Basic SerDe utility class. */ -private[spark] object SerDe extends Serializable { +private[spark] abstract class SerDeBase { - val PYSPARK_PACKAGE = "pyspark.mllib" - val PYSPARK_ML_PACKAGE = "pyspark.ml" + val PYSPARK_PACKAGE: String + def initialize(): Unit /** * Base class used for pickle */ - private[python] abstract class BasePickler[T: ClassTag] + private[spark] abstract class BasePickler[T: ClassTag] extends IObjectPickler with IObjectConstructor { - protected def packageName: String = PYSPARK_PACKAGE - private val cls = implicitly[ClassTag[T]].runtimeClass - private val module = packageName + "." + cls.getName.split('.')(4) + private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4) private val name = cls.getSimpleName // register this to Pickler and Unpickler @@ -1268,45 +1265,73 @@ private[spark] object SerDe extends Serializable { private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler) } - // Pickler for (mllib) DenseVector - private[python] class DenseVectorPickler extends BasePickler[DenseVector] { + def dumps(obj: AnyRef): Array[Byte] = { + obj match { + // Pickler in Python side cannot deserialize Scala Array normally. See SPARK-12834. + case array: Array[_] => new Pickler().dumps(array.toSeq.asJava) + case _ => new Pickler().dumps(obj) + } + } - def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { - val vector: DenseVector = obj.asInstanceOf[DenseVector] - val bytes = new Array[Byte](8 * vector.size) - val bb = ByteBuffer.wrap(bytes) - bb.order(ByteOrder.nativeOrder()) - val db = bb.asDoubleBuffer() - db.put(vector.values) + def loads(bytes: Array[Byte]): AnyRef = { + new Unpickler().loads(bytes) + } - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(bytes.length)) - out.write(bytes) - out.write(Opcodes.TUPLE1) + /* convert object into Tuple */ + def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = { + rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int])) + } + + /* convert RDD[Tuple2[,]] to RDD[Array[Any]] */ + def fromTuple2RDD(rdd: RDD[(Any, Any)]): RDD[Array[Any]] = { + rdd.map(x => Array(x._1, x._2)) + } + + /** + * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + initialize() // let it called in executor + new SerDeUtil.AutoBatchedPickler(iter) } + } - def construct(args: Array[Object]): Object = { - require(args.length == 1) - if (args.length != 1) { - throw new PickleException("should be 1") + /** + * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark. + */ + def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { + pyRDD.rdd.mapPartitions { iter => + initialize() // let it called in executor + val unpickle = new Unpickler + iter.flatMap { row => + val obj = unpickle.loads(row) + if (batched) { + obj match { + case list: JArrayList[_] => list.asScala + case arr: Array[_] => arr + } + } else { + Seq(obj) + } } - val bytes = getBytes(args(0)) - val bb = ByteBuffer.wrap(bytes, 0, bytes.length) - bb.order(ByteOrder.nativeOrder()) - val db = bb.asDoubleBuffer() - val ans = new Array[Double](bytes.length / 8) - db.get(ans) - Vectors.dense(ans) - } + }.toJavaRDD() } +} - // Pickler for (new) DenseVector - private[python] class NewDenseVectorPickler extends BasePickler[NewDenseVector] { +/** + * SerDe utility functions for PythonMLLibAPI. + */ +private[spark] object SerDe extends SerDeBase with Serializable { + + override val PYSPARK_PACKAGE = "pyspark.mllib" - override protected def packageName = PYSPARK_ML_PACKAGE + // Pickler for DenseVector + private[python] class DenseVectorPickler extends BasePickler[DenseVector] { def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { - val vector: NewDenseVector = obj.asInstanceOf[NewDenseVector] + val vector: DenseVector = obj.asInstanceOf[DenseVector] val bytes = new Array[Byte](8 * vector.size) val bb = ByteBuffer.wrap(bytes) bb.order(ByteOrder.nativeOrder()) @@ -1330,11 +1355,11 @@ private[spark] object SerDe extends Serializable { val db = bb.asDoubleBuffer() val ans = new Array[Double](bytes.length / 8) db.get(ans) - NewVectors.dense(ans) + Vectors.dense(ans) } } - // Pickler for (mllib) DenseMatrix + // Pickler for DenseMatrix private[python] class DenseMatrixPickler extends BasePickler[DenseMatrix] { def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { @@ -1371,46 +1396,7 @@ private[spark] object SerDe extends Serializable { } } - // Pickler for (new) DenseMatrix - private[python] class NewDenseMatrixPickler extends BasePickler[NewDenseMatrix] { - - override protected def packageName = PYSPARK_ML_PACKAGE - - def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { - val m: NewDenseMatrix = obj.asInstanceOf[NewDenseMatrix] - val bytes = new Array[Byte](8 * m.values.length) - val order = ByteOrder.nativeOrder() - val isTransposed = if (m.isTransposed) 1 else 0 - ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values) - - out.write(Opcodes.MARK) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(m.numRows)) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(m.numCols)) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(bytes.length)) - out.write(bytes) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(isTransposed)) - out.write(Opcodes.TUPLE) - } - - def construct(args: Array[Object]): Object = { - if (args.length != 4) { - throw new PickleException("should be 4") - } - val bytes = getBytes(args(2)) - val n = bytes.length / 8 - val values = new Array[Double](n) - val order = ByteOrder.nativeOrder() - ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values) - val isTransposed = args(3).asInstanceOf[Int] == 1 - new NewDenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values, isTransposed) - } - } - - // Pickler for (mllib) SparseMatrix + // Pickler for SparseMatrix private[python] class SparseMatrixPickler extends BasePickler[SparseMatrix] { def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { @@ -1465,64 +1451,7 @@ private[spark] object SerDe extends Serializable { } } - // Pickler for (new) SparseMatrix - private[python] class NewSparseMatrixPickler extends BasePickler[NewSparseMatrix] { - - override protected def packageName = PYSPARK_ML_PACKAGE - - def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { - val s = obj.asInstanceOf[NewSparseMatrix] - val order = ByteOrder.nativeOrder() - - val colPtrsBytes = new Array[Byte](4 * s.colPtrs.length) - val indicesBytes = new Array[Byte](4 * s.rowIndices.length) - val valuesBytes = new Array[Byte](8 * s.values.length) - val isTransposed = if (s.isTransposed) 1 else 0 - ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().put(s.colPtrs) - ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().put(s.rowIndices) - ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().put(s.values) - - out.write(Opcodes.MARK) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(s.numRows)) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(s.numCols)) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(colPtrsBytes.length)) - out.write(colPtrsBytes) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(indicesBytes.length)) - out.write(indicesBytes) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(valuesBytes.length)) - out.write(valuesBytes) - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(isTransposed)) - out.write(Opcodes.TUPLE) - } - - def construct(args: Array[Object]): Object = { - if (args.length != 6) { - throw new PickleException("should be 6") - } - val order = ByteOrder.nativeOrder() - val colPtrsBytes = getBytes(args(2)) - val indicesBytes = getBytes(args(3)) - val valuesBytes = getBytes(args(4)) - val colPtrs = new Array[Int](colPtrsBytes.length / 4) - val rowIndices = new Array[Int](indicesBytes.length / 4) - val values = new Array[Double](valuesBytes.length / 8) - ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().get(colPtrs) - ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().get(rowIndices) - ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().get(values) - val isTransposed = args(5).asInstanceOf[Int] == 1 - new NewSparseMatrix( - args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], colPtrs, rowIndices, values, - isTransposed) - } - } - - // Pickler for (mllib) SparseVector + // Pickler for SparseVector private[python] class SparseVectorPickler extends BasePickler[SparseVector] { def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { @@ -1564,50 +1493,6 @@ private[spark] object SerDe extends Serializable { } } - // Pickler for (new) SparseVector - private[python] class NewSparseVectorPickler extends BasePickler[NewSparseVector] { - - override protected def packageName = PYSPARK_ML_PACKAGE - - def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = { - val v: NewSparseVector = obj.asInstanceOf[NewSparseVector] - val n = v.indices.length - val indiceBytes = new Array[Byte](4 * n) - val order = ByteOrder.nativeOrder() - ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices) - val valueBytes = new Array[Byte](8 * n) - ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values) - - out.write(Opcodes.BININT) - out.write(PickleUtils.integer_to_bytes(v.size)) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(indiceBytes.length)) - out.write(indiceBytes) - out.write(Opcodes.BINSTRING) - out.write(PickleUtils.integer_to_bytes(valueBytes.length)) - out.write(valueBytes) - out.write(Opcodes.TUPLE3) - } - - def construct(args: Array[Object]): Object = { - if (args.length != 3) { - throw new PickleException("should be 3") - } - val size = args(0).asInstanceOf[Int] - val indiceBytes = getBytes(args(1)) - val valueBytes = getBytes(args(2)) - val n = indiceBytes.length / 4 - val indices = new Array[Int](n) - val values = new Array[Double](n) - if (n > 0) { - val order = ByteOrder.nativeOrder() - ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices) - ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values) - } - new NewSparseVector(size, indices, values) - } - } - // Pickler for MLlib LabeledPoint private[python] class LabeledPointPickler extends BasePickler[LabeledPoint] { @@ -1654,7 +1539,7 @@ private[spark] object SerDe extends Serializable { var initialized = false // This should be called before trying to serialize any above classes // In cluster mode, this should be put in the closure - def initialize(): Unit = { + override def initialize(): Unit = { SerDeUtil.initialize() synchronized { if (!initialized) { @@ -1662,10 +1547,6 @@ private[spark] object SerDe extends Serializable { new DenseMatrixPickler().register() new SparseMatrixPickler().register() new SparseVectorPickler().register() - new NewDenseVectorPickler().register() - new NewDenseMatrixPickler().register() - new NewSparseMatrixPickler().register() - new NewSparseVectorPickler().register() new LabeledPointPickler().register() new RatingPickler().register() initialized = true @@ -1674,58 +1555,4 @@ private[spark] object SerDe extends Serializable { } // will not called in Executor automatically initialize() - - def dumps(obj: AnyRef): Array[Byte] = { - obj match { - // Pickler in Python side cannot deserialize Scala Array normally. See SPARK-12834. - case array: Array[_] => new Pickler().dumps(array.toSeq.asJava) - case _ => new Pickler().dumps(obj) - } - } - - def loads(bytes: Array[Byte]): AnyRef = { - new Unpickler().loads(bytes) - } - - /* convert object into Tuple */ - def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = { - rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int])) - } - - /* convert RDD[Tuple2[,]] to RDD[Array[Any]] */ - def fromTuple2RDD(rdd: RDD[(Any, Any)]): RDD[Array[Any]] = { - rdd.map(x => Array(x._1, x._2)) - } - - /** - * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by - * PySpark. - */ - def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { - jRDD.rdd.mapPartitions { iter => - initialize() // let it called in executor - new SerDeUtil.AutoBatchedPickler(iter) - } - } - - /** - * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark. - */ - def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = { - pyRDD.rdd.mapPartitions { iter => - initialize() // let it called in executor - val unpickle = new Unpickler - iter.flatMap { row => - val obj = unpickle.loads(row) - if (batched) { - obj match { - case list: JArrayList[_] => list.asScala - case arr: Array[_] => arr - } - } else { - Seq(obj) - } - } - }.toJavaRDD() - } } |