Skip to content Skip to sidebar Skip to footer

Pyspark: How To Flatten Nested Arrays By Merging Values In Spark

I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark? EDIT: I have added column name_10000_xvz to ex

Solution 1:

UPDATE

As @werner has mentioned, it's necessary to transform all structs to append the column name into it.

import pyspark.sql.functions as f

names = [column for column in df.columns if column.startswith('name_')]

expressions = []
for name in names:
  expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))

flatten_df = (df
              .withColumn('flatten', f.flatten(f.array(*expressions)))
              .selectExpr('id', 'inline(flatten)'))

output_df = (flatten_df
             .groupBy('id', 'date')
             .pivot('name', names)
             .agg(f.first('val')))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1  |2000|30            |null    |null    |null    |
|1  |2001|31            |1       |4       |21      |
|1  |2002|32            |2       |5       |22      |
|1  |2003|33            |3       |6       |23      |
|2  |1990|39            |null    |null    |null    |
|2  |2000|30            |null    |null    |null    |
|2  |2001|31            |1       |4       |21      |
|2  |2002|32            |2       |5       |22      |
|2  |2003|33            |3       |6       |23      |
|2  |2004|34            |null    |null    |null    |
+---+----+--------------+--------+--------+--------+

OLD

Assuming:

  • date value is always the same value all columns
  • name_1_a, name_1_b, name_2_a their sizes are equals
import pyspark.sql.functions as f

output_df = (df
             .withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> ' \
                                           'STRUCT(name_1_a[i].date AS date, ' \
                                           '       name_1_a[i].val AS name_1_a, ' \
                                           '       name_1_b[i].val AS name_1_b, ' \
                                           '       name_2_a[i].val AS name_2_a))'))
             .selectExpr('id', 'inline(flatten)'))

output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|1  |2002|2       |5       |22      |
|1  |2003|3       |6       |23      |
|2  |2001|1       |4       |21      |
|2  |2002|2       |5       |22      |
|2  |2003|3       |6       |23      |
+---+----+--------+--------+--------+

Solution 2:

How are the naming conventions used?.

Can you try something below using spark-sql?

df.createOrReplaceTempView("df")
spark.sql("""
select id, 
name_1_a.date[0] as date, name_1_a.val[0] as name_1_a, name_1_b.val[0] as name_1_b, name_2_a.val[0] as name_2_a
from df
""").show(false)

+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1  |2001|1       |4       |21      |
|2  |2001|1       |4       |21      |
+---+----+--------+--------+--------+

Here are my assumptions.

  1. The first field is id and the rest are all names..1 to n like name_1_a, name_1_b, name_2_a, etc
  2. The date is same across all "n" names, so I can use the first field for deriving it.

Building up the dataframe.

JSON strings

valjsonstr1="""{  "id":1,"name_1_a": [    {      "date":2001,      "val":1    },    {      "date":2002,      "val":2    },    {      "date":2003,      "val":3    }  ],"name_1_b": [    {      "date":2001,      "val":4    },    {      "date":2002,      "val":5    },    {      "date":2003,      "val":6    }  ],"name_2_a": [    {      "date":2001,      "val":21    },    {      "date":2002,      "val":22    },    {      "date":2003,      "val":23    }  ]}"""valjsonstr2="""{  "id":2,"name_1_a": [    {      "date":2001,      "val":1    },    {      "date":2002,      "val":2    },    {      "date":2003,      "val":3    }  ],"name_1_b": [    {      "date":2001,      "val":4    },    {      "date":2002,      "val":5    },    {      "date":2003,      "val":6    }  ],"name_2_a": [    {      "date":2001,      "val":21    },    {      "date":2002,      "val":22    },    {      "date":2003,      "val":23    }  ]}"""

Dataframes

val df1 = spark.read.json(Seq(jsonstr1).toDS)
val df2 = spark.read.json(Seq(jsonstr2).toDS)
val df = df1.union(df2)

Now create a view on top of df. Im just naming it as "df"

df.createOrReplaceTempView("df")

Show the data:

df.show(false)
df.printSchema

Use the metadata and construct the sql string.

df.columns

Array[String] = Array(id, name_1_a, name_1_b, name_2_a)

val names = df.columns.drop(1)  // drop idval sql1 = for { i <- 0 to 2
                  t1=names.map( x => x + s".val[${i}] as ${x}").mkString(",")
                  t2 = names(0) + ".date[0] as date ," + t1
              _=println(t)
    } yield s""" select id, ${t2} from df """val sql2 = sql1.mkString(" union All ")

Now sql2 contains the below string which is a valid sql

" select id, name_1_a.date[0] as date ,name_1_a.val[0] as name_1_a,name_1_b.val[0] as name_1_b,name_2_a.val[0] as name_2_a from df  union All  select id, name_1_a.date[0] as date ,name_1_a.val[1] as name_1_a,name_1_b.val[1] as name_1_b,name_2_a.val[1] as name_2_a from df  union All  select id, name_1_a.date[0] as date ,name_1_a.val[2] as name_1_a,name_1_b.val[2] as name_1_b,name_2_a.val[2] as name_2_a from df "

Pass it to spark.sql(sql2) and get the required result

spark.sql(sql2).orderBy("id").show(false)

+---+----+--------+--------+--------+|id |date|name_1_a|name_1_b|name_2_a|+---+----+--------+--------+--------+|1|2001|2|5|22||1|2001|1|4|21||1|2001|3|6|23||2|2001|1|4|21||2|2001|3|6|23||2|2001|2|5|22|+---+----+--------+--------+--------+

Post a Comment for "Pyspark: How To Flatten Nested Arrays By Merging Values In Spark"