How to add columns into org.apache.spark.sql.Row inside of mapPartitions

By : Manuel Quesada Moren
Date : November 22 2020, 02:42 PM
To fix this issue Usually there should be no need for that and it is better to use UDFs but here you are:
code :
import org.apache.spark.sql.Row

def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

val  df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0),
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)

val newSchema = StructType(df.schema.fields ++ Array(
  StructField("z", IntegerType, false), StructField("v", IntegerType, false)))

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

// +---+----+---+---+
// |  x|   y|  z|  v|
// +---+----+---+---+
// |1.0| 2.0| -1|  1|
// |0.0|-1.0| -1|  1|
// |3.0| 4.0| -1|  1|
// |6.0|-2.3| -1|  1|
// +---+----+---+---+

