Spark: How to transform LabeledPoint features values from int to 0/1?

By : m3ta
Date : November 22 2020, 09:00 AM
With these it helps I want to run Naive Bayes in Spark, but to do this I have to transform features values from my LabeledPoint to 0/1. My LabeledPoint looks like this: , I guess you're looking for something like this:
code :
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

val transformedData = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.sparse(5, Array(1, 3), Array(9.0, 3.2))),
  LabeledPoint(5.0, Vectors.sparse(5, Array(0, 2, 4), Array(1.0, 2.0, 3.0)))

def binarizeFeatures(rdd: RDD[LabeledPoint]) = rdd.map{
  case LabeledPoint(label, features) => {
    val v = features.toSparse
      Vectors.sparse(v.size, v.indices, Array.fill(v.numNonzeros)(1.0)))}}


// Array[org.apache.spark.mllib.regression.LabeledPoint] = Array(
//   (1.0,(5,[1,3],[1.0,1.0])),
//   (1.0,(5,[0,2,4],[1.0,1.0,1.0])))

Spark: How to run logistic regression using only some features from LabeledPoint?

By : Keith Bauer
Date : March 29 2020, 07:55 AM
should help you out Feature selection allows selecting the most relevant features for use in model construction. Feature selection reduces the size of the vector space and, in turn, the complexity of any subsequent operation with vectors. The number of features to select can be tuned using a held-out validation set.
One way to do what you are seeking is using the ElementwiseProduct.
code :
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors

// Creating dummy LabeledPoint RDD
val data = sc.parallelize(Array(LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0,5.0,1.0)), LabeledPoint(1.0,Vectors.dense(4.0, 5.0, 6.0,1.0,2.0)),LabeledPoint(0.0,Vectors.dense(4.0, 2.0, 3.0,0.0,2.0))))


// +-----+--------------------+
// |label|            features|
// +-----+--------------------+
// |  1.0|[1.0,0.0,3.0,5.0,...|
// |  1.0|[4.0,5.0,6.0,1.0,...|
// |  0.0|[4.0,2.0,3.0,0.0,...|
// +-----+--------------------+

// You'll need to know how many features you have, I have used 5 for the example
val numFeatures = 5

// The indices represent the features we want to keep 
// Note : indices start with 0 so actually here you are keeping features 4 and 5.
val indices = List(3, 4).toArray

// Now we can create our weights vectors
val weights = Array.fill[Double](indices.size)(1)

// Create the sparse vector of the features we need to keep.
val transformingVector = Vectors.sparse(numFeatures, indices, weights)

// Init our vector transformer
val transformer = new ElementwiseProduct(transformingVector)

// Apply it to the data.
val transformedData = data.map(x => LabeledPoint(x.label,transformer.transform(x.features).toSparse))


// +-----+-------------------+
// |label|           features|
// +-----+-------------------+
// |  1.0|(5,[3,4],[5.0,1.0])|
// |  1.0|(5,[3,4],[1.0,2.0])|
// |  0.0|      (5,[4],[2.0])|
// +-----+-------------------+
Spark: How to transform values of some selected features in LabeledPoint?

By : Gokul
Date : March 29 2020, 07:55 AM
should help you out It is possible but not exactly straightforward. If you can transform values before you assemble vectors and labeled points then answer provided by @eliasah should do the trick. Otherwise you have to do things the hard way. Lets assume your data looks like this
code :
import org.apache.spark.mllib.linalg.{Vector, Vectors, SparseVector, DenseVector}
import org.apache.spark.mllib.regression.LabeledPoint

val points = sc.parallelize(Seq(
  LabeledPoint(1.0, Vectors.sparse(6, Array(1, 4, 5), Array(2.0, 6.0, 3.0))),
  LabeledPoint(2.0, Vectors.sparse(6, Array(2, 3), Array(0.1, 1.0)))
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}

def toBreeze(v: Vector): BV[Double] = v match {
  case DenseVector(values) => new BDV[Double](values)
  case SparseVector(size, indices, values) => {
    new BSV[Double](indices, values, size)
val pairs = points.map(lp => (lp.label, toBreeze(lp.features)))
def transform(indices: Seq[Int])(v: BV[Double]) = {
  for(i <- indices) v(i) = breeze.numerics.log(v(i) + 1.0)
val indices = Array(2, 4)
val transformed = pairs.mapValues(transform(indices))
val maxV = transformed.values.reduce(breeze.linalg.max(_, _))

def divideByMax(m: BV[Double], indices: Seq[Int])(v: BV[Double]) = {
  for (i <- indices) if(m(i) != 0) v(i) /= m(i) 

val divided = transformed.mapValues(divideByMax(maxV, indices))
def toSpark(v: BV[Double]) = v match {
  case v: BDV[Double] => new DenseVector(v.toArray)
  case v: BSV[Double] => new SparseVector(v.length, v.index, v.data)

divided.map{case (l, v) => LabeledPoint(l, toSpark(v))}
Spark MLib - Create LabeledPoint from RDD[Vector] features and RDD[Vector] label

By : david ramos
Date : March 29 2020, 07:55 AM
Does that help I am building a training set using two text files representing documents and labels. , One way to handle this is to join based on indices:
code :
import org.apache.spark.RangePartitioner

// Add indices
val idfIndexed = idf.zipWithIndex.map(_.swap)
val labelsIndexed = labels.zipWithIndex.map(_.swap)

// Create range partitioner on larger RDD
val partitioner = new RangePartitioner(idfIndexed.partitions.size, idfIndexed)

// Join with custom partitioner
labelsIndexed.join(idfIndexed, partitioner).values
spark cannot create LabeledPoint

By : deadjay
Date : March 29 2020, 07:55 AM
Any of those help I think you what to create LabeledPoint in dataframe. So you can:
def parse_points(df):
code :
df2=df1.map(lambda seq: LabeledPoint(float(seq[0][0]),seq[0][1:])) # since map applies lambda in each tuple
return df2.toDF() #this will convert pipelinedRDD to dataframe
Spark 2.2: Load org.apache.spark.ml.feature.LabeledPoint from file

By : user7843963
Date : March 29 2020, 07:55 AM
I wish did fix the issue. With the ml package you won't need to put the data into a LabeledPoint since you can specify which columns to use for labels/features in all transformations/algorithms. For example:
code :
val gbt = new GBTClassifier()
val df = spark.read.format("libsvm").load(s"$path${File.separator}${fileName}_data_sparse")
  • Spark DataFrame filtering: retain element belonging to a list
