How to add columns into org.apache.spark.sql.Row inside of mapPartitions

How to add columns into org.apache.spark.sql.Row inside of mapPartitions

By : Manuel Quesada Moren
Date : November 22 2020, 02:42 PM
To fix this issue Usually there should be no need for that and it is better to use UDFs but here you are:
code :
import org.apache.spark.sql.Row

def transformRows(iter: Iterator[Row]): Iterator[Row] = ???
def transformRow(row: Row): Row =  Row.fromSeq(row.toSeq ++ Array[Any](-1, 1))
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

val  df = sc.parallelize(Seq(
    (1.0, 2.0), (0.0, -1.0),
    (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)

val newSchema = StructType(df.schema.fields ++ Array(
  StructField("z", IntegerType, false), StructField("v", IntegerType, false)))

sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

// +---+----+---+---+
// |  x|   y|  z|  v|
// +---+----+---+---+
// |1.0| 2.0| -1|  1|
// |0.0|-1.0| -1|  1|
// |3.0| 4.0| -1|  1|
// |6.0|-2.3| -1|  1|
// +---+----+---+---+

Share : facebook icon twitter icon
Apache Spark: map vs mapPartitions?

Apache Spark: map vs mapPartitions?

By : Ahmet Bayraktar
Date : March 29 2020, 07:55 AM
With these it helps
What's the difference between an RDD's map and mapPartitions method?
Sorting an RDD in Apache Spark using mapPartitions and reduce

Sorting an RDD in Apache Spark using mapPartitions and reduce

By : rledda
Date : March 29 2020, 07:55 AM
like below fixes the issue Your class Inc should be marked as serilizable. It seems the the Serilization debugger is trying to help and failing, and in the process masking the serialization error.
Apache Spark: Effectively using mapPartitions in Java

Apache Spark: Effectively using mapPartitions in Java

By : Ahmad Khatib
Date : March 29 2020, 07:55 AM
I think the issue was by ths following , One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator into a Stream, and then using Stream's functional API (e.g. map function).
How to convert an iterator to a stream? suggests a few good ways to convert an Iterator into a Stream, so taking one of the options suggested there we can end up with:
code :
rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
Apache Spark: comparison of map vs flatMap vs mapPartitions vs mapPartitionsWithIndex

Apache Spark: comparison of map vs flatMap vs mapPartitions vs mapPartitionsWithIndex

By : Dev Bishnoi
Date : March 29 2020, 07:55 AM
wish of those help map(func) What does it do? Pass each element of the RDD through the supplied function; i.e. func
flatMap(func) “Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).”
What is the Difference between mapPartitions and foreachPartition in Apache Spark

What is the Difference between mapPartitions and foreachPartition in Apache Spark

By : kris.m
Date : March 29 2020, 07:55 AM
it should still fix some issue The difference is the same as that between map and foreach. Look here for good explanations - Is there a difference between foreach and map?.
mapPartitions and foreachPartitions are transformations/operations that apply to each partition of the Dataframe as opposed to each element. See here for an explanation contrasting map and mapPartitions - Apache Spark: map vs mapPartitions?.
Related Posts Related Posts :
  • How to solve transitive dependencies version conflicts (scala/sbt)
  • Writing DataFrame to MemSQL Table in Spark
  • Set Play Framework Environment
  • Either[A, Future[B]] to Future[Either[A, B]]
  • store a bunch of configuration information in scala
  • Spark: How to transform LabeledPoint features values from int to 0/1?
  • How to get input from Scala after a certain point in time?
  • Where do I put my tests when I split my Play project into sub modules
  • Read Array in sub queries spark sql using scala
  • Scala map with implicit conversion
  • Spark, Scala, DataFrame: create feature vectors
  • Scala Enumeration: Choose some values as type
  • How can I create a Spark DataFrame from a nested array of struct element?
  • why scala lambda with _ can't using && to combine two bool expression
  • Return elements from array, except for repeating
  • Heroku: deploy Spray based service
  • Mass-add an object if it is an instance of a class
  • Automatically convert a case class to an extensible record in shapeless?
  • How to use pipeTo in AKKA correctly
  • Define a common trait for types with different numbers of type parameters
  • RDD Persistence in Spark
  • Scala trait as a method input - type mismatch error
  • Scala compiler optimization for immutability
  • Base class reference in Scala
  • Akka: The order of responses
  • Why does enablePlugins(DockerPlugin) from sbt-docker in Play project give "error: reference to DockerPlugin is ambi
  • Convert a scala list of Strings into a list of Doubles while discarding unconvertable strings
  • Change the contents of a file in scala
  • Akka Actor testing with ScalaTest using Testkit EventListeners
  • Prepare data for MultilayerPerceptronClassifier in scala
  • Immutability and custom deserialization in Scala
  • Play Framework dependency injection Object vs @Singleton Class
  • Slick 3 transaction how to
  • When should avoid usage of Future in scala
  • Which Spark operation returns the elements not matched by a join?
  • How to group incoming events from infinite stream?
  • Converting a List to a Case Class
  • Get names of the variables in an object
  • Generics re: Scala.math.Ordering wrapper instance for java.lang.Comparable
  • implicit Impl method for collections in scala breeze
  • How to have colored REPL for 'sbt console'?
  • Using the squants library, how do I specify a specific unit of measure for a method?
  • Play Framework: Gzip compression/decompression of data over WebSocket
  • Why does mapPartitions print nothing to stdout?
  • Call doAnswer with function as argument in scala play 2.4
  • Get or create child actor by ID
  • Why substracting two Dates gives me an extra hour?
  • Akka, advices on implementation
  • Spark Scala 2.10 tuple limit
  • Scala currying and type inference
  • Spark DataFrame filtering: retain element belonging to a list
  • Scala/IntelliJ: case object X vs case class X vs case class X()
  • Scala performance with functional constructs
  • StringOrInt from Idris -> Scala?
  • Implement generics in trait and method
  • Spray Routing Doesn't match anything
  • Learning Scala without Java
  • Does Akka clustering (2.4) use any ports by default other than 2551?
  • How to enrich a Java library class that has static methods (aka enrich an object in Scala)?
  • SBT-web incremental compilation with file dependencies
  • shadow
    Privacy Policy - Terms - Contact Us © animezone.co