$PySpark$ 在遇到$map$类型的列的一些处理

在$spark$中,有时会遇到$column$的类型是$array$和$map$类型的,这时候需要将它们转换为多行数据

$Explode\ array\ and\ map\ columns\ to\ rows$

 1import pyspark
 2from pyspark.sql import SparkSession
 3
 4spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate()
 5
 6arrayData = [
 7        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
 8        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
 9        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
10        ('Washington',None,None),
11        ('Jefferson',['1','2'],{}) ]
12
13df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
14df.printSchema()
15df.show()
 1root
 2 |-- name: string (nullable = true)
 3 |-- knownLanguages: array (nullable = true)
 4 |    |-- element: string (containsNull = true)
 5 |-- properties: map (nullable = true)
 6 |    |-- key: string
 7 |    |-- value: string (valueContainsNull = true)
 8
 9+----------+--------------+--------------------+
10|      name|knownLanguages|          properties|
11+----------+--------------+--------------------+
12|     James| [Java, Scala]|[eye -> brown, ha...|
13|   Michael|[Spark, Java,]|[eye ->, hair -> ...|
14|    Robert|    [CSharp, ]|[eye -> , hair ->...|
15|Washington|          null|                null|
16| Jefferson|        [1, 2]|                  []|
17+----------+--------------+--------------------+

$explode – array\ column\ example$

$PySpark\ function$ explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the rows.

$spark$提供$explode$函数explode(e: Column), 当传入的column是array类型时,它会新建一个列,默认列名为col;当传入的column是map类型时,则会新建两个列,一个列为key,另一个为value

1from pyspark.sql.functions import explode
2df3 = df.select(df.name, explode(df.knownLanguages))
3df3.printSchema()
4df3.show()

$output:$

 1
 2root
 3 |-- name: string (nullable = true)
 4 |-- col: string (nullable = true)
 5
 6+---------+------+
 7|     name|   col|
 8+---------+------+
 9|    James|  Java|
10|    James| Scala|
11|  Michael| Spark|
12|  Michael|  Java|
13|  Michael|  null|
14|   Robert|CSharp|
15|   Robert|      |
16|Jefferson|     1|
17|Jefferson|     2|
18+---------+------+

注意:

Washington对应的$knownLanguages$字段是null,explode会忽略这种值,可以看到,结果集里并没有Washington的记录,如果需要保留,使用explode_outer函数

$explode – map\ column\ example$

1from pyspark.sql.functions import explode
2df3 = df.select(df.name,explode(df.properties))
3df3.printSchema()
4df3.show()

$output:$

 1root
 2 |-- name: string (nullable = true)
 3 |-- key: string (nullable = false)
 4 |-- value: string (nullable = true)
 5
 6+-------+----+-----+
 7|   name| key|value|
 8+-------+----+-----+
 9|  James| eye|brown|
10|  James|hair|black|
11|Michael| eye| null|
12|Michael|hair|brown|
13| Robert| eye|     |
14| Robert|hair|  red|
15+-------+----+-----+

$How\ to\ covert\ Map\ into\ multiple\ columns$

有时候需要把$Map$类型的$colum$n进行以$key$为列名,$value$为列值的处理。如下:

1from pyspark.sql import functions as F
2
3df.select(F.col("name"),
4   F.col("properties").getItem("hair").alias("hair_color"),
5   F.col("properties").getItem("eye").alias("eye_color")).show()

$output:$

1+----------+----------+---------+
2|      name|hair_color|eye_color|
3+----------+----------+---------+
4|     James|     black|    brown|
5|   Michael|     brown|     null|
6|    Robert|       red|         |
7|Washington|      null|     null|
8| Jefferson|      null|     null|
9+----------+----------+---------+