分类 spark articles

Spark调优实战

Spark性能调优:合理设置并行度 1. Spark的并行度指的是什么? 并行度其实就是指的是spark作业中,各个stage的同时运行的task的数量,也就代表了spark作业在各个阶段stage的并行度! $$ 并行度 = executor\_number * executor\_cores $$ 理解: sparkApplication的划分: $job –> stage –> task$ 一般每个task一次处理一个分区。 可以将task理解为比赛中的跑道:每轮比赛中,每个跑道上都会有一位运动员(分区,即处理的数据……

阅读全文

Spark解析DataFrame中的json字段

How to parse a column of json string in Pyspark 在用$spark.sql(\ )$从Table读入数据时,DataFrame的列有时是这样一种类型:json形式的string。此时,我们通常需要去解析这个json string,从而提取我们想要的数据。 数据准备 1# Sample Data Frame 2jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' 3jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' 4jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' 5df = spark.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)]) 1+--------------------+ 2| json| 3+--------------------+ 4|{"header":{"id":1...| 5|{"header":{"id":1...| 6|{"header":{"id":4...| 7+--------------------+ 如上所示,我们模拟一个DataFrame,其中只有一列,列名为json,类型为string。可以看到,json中的值为j……

阅读全文

Spark map字段处理

$PySpark$ 在遇到$map$类型的列的一些处理 在$spark$中,有时会遇到$column$的类型是$array$和$map$类型的,这时候需要将它们转换为多行数据 $Explode\ array\ and\ map\ columns\ to\ rows$ 1import pyspark 2from pyspark.sql import SparkSession 3 4spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate() 5 6arrayData = [ 7 ('James',['Java','Scala'],{'hair':'black','eye':'brown'}), 8 ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}), 9 ('Robert',['CSharp',''],{'hair':'red','eye':''}), 10 ('Washington',None,None), 11 ('Jefferson',['1','2'],{}) ] 12 13df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties']) 14df.printSchema() 15df.show() 1root 2 |-- name: string (nullable = true) 3 |-- knownLanguages: array (nullable = true) 4 | |-- element: string (containsNull = true) 5 |-- properties: map (nullable = true) 6 | |-- key: string 7 | |-- value: string (valueContainsNull = true) 8 9+----------+--------------+--------------------+ 10| name|knownLanguages| properties| 11+----------+--------------+--------------------+ 12| James| [Java, Scala]|[eye -> brown, ha...| 13| Michael|[Spark, Java,]|[eye ->, hair -> ...| 14| Robert| [CSharp, ]|[eye -> , hair ->...| 15|Washington| null| null| 16| Jefferson| [1, 2]| []| 17+----------+--------------+--------------------+ $explode –……

阅读全文

cache和persist比较

Spark中cache和persist的作用 Spark开发高性能的大数据计算作业并不是那么简单。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。 有一些代码开发基本的原则,避免创建重复的RDD,尽可能复用同一个RDD,如下,我们可以直接用一个RDD进行多……

阅读全文

Spark运行内存超出

Container killed by YARN for exceeding memory limits? 运行spark脚本时,经常会碰到Container killed by YARN for exceeding memory limits的错误,导致程序运行失败。 这个的意思是指executor的外堆内存超出了。默认情况下,这个值被设置为executor_memory的10%或者384M,以较大者为准,即max(executor_memory*.1, 384M). 解决办法 提高内存开销 减少执行程序内核的数量 增加分区数量 提高驱动程序和执行程序内存 提……

阅读全文

repartition和coalesce区别

简介 $repartition(numPartitions:Int)$ 和 $coalesce(numPartitions:Int,shuffle:Boolean=false)$ 作用:对RDD的分区进行重新划分,repartition内部调用了coalesce,参数$shuffle=true$ 分析 例:RDD有N个分区,需要重新划分成M个分区 N小于M 一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shu……

阅读全文

RDD算子总结

RDD算子总结 从功能上分: 转换算子(transformer): lazy执行,生成新的rdd,只有在调用action算子时,才会真正的执行。 如:map 、flatmap、filter、 union、 join、 ruduceByKey、 cache 行动算子(action): 触发任务执行,产生job,返回值不再是rdd。 如:count 、collect、top、 take、 reduce 从作用上分: 通用的: map、 flatMap、 di……

阅读全文

Spark RDD入门

RDD简介 RDD–弹性分布式数据集(Resilient Distributed Dataset)是spark的核心概念。RDD其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD,转化已有的RDD以及调用RDD操作进行求值。而在这一切的背后,spark会自动讲RDD中的数据分发到集群上,并将操作并行化执行。 RDD基础 RDD是一个不可变的分布式对象集合.每个RDD被分为多个分区,这些分区运……

阅读全文

Spark2.0新特性

Spark2.0 Spark直接从1.6跨入2.0版本,带来一些新的特性。最大的变化便是SparkSession整合了各种环境。 Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户不但可以使用DataFrame和Dataset的各种API,学习Spark的难度也会大大降低。 SparkSession 在Spark的早期版本,SparkContext是进入Spark的切入点。……

阅读全文

Spark各种概念理解

Spark中的各种概念的理解 Application:通俗讲,用户每次提交的所有的代码为一个application。 Job:一个application可以分为多个job。如何划分job?通俗讲,触发一个final RDD的实际计算(action)为一个job Stage:一个job可以分为多个stage。根据一个job中的RDD的宽依赖和窄依赖关系进行划分 Task:task是最小的基本的计算单位。一般是……

阅读全文