Kafka基本概念 Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 1. 消息队列(MQ) 1.1 优点 解耦 削封 缓冲 异步通信 1.2 两种模式 点对点(一对一,消费者主动拉取数据,消息收到后消息清除) 消息生产者生产消息发送到Queue中,然后消息消费者主动从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的……
阅读全文
Flink 流处理API 1. Environment 1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。 1ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 1.2 createLocalEnvironment 1LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); 1.3 createRemoteEnvironment 1StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar"); 2. Source 2.1 集合 1StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2env.setParallelism(1); 3 4// 从集合里读取数……
阅读全文
Spark性能调优:合理设置并行度 1. Spark的并行度指的是什么? 并行度其实就是指的是spark作业中,各个stage的同时运行的task的数量,也就代表了spark作业在各个阶段stage的并行度! $$ 并行度 = executor\_number * executor\_cores $$ 理解: sparkApplication的划分: $job –> stage –> task$ 一般每个task一次处理一个分区。 可以将task理解为比赛中的跑道:每轮比赛中,每个跑道上都会有一位运动员(分区,即处理的数据……
阅读全文
How to parse a column of json string in Pyspark 在用$spark.sql(\ )$从Table读入数据时,DataFrame的列有时是这样一种类型:json形式的string。此时,我们通常需要去解析这个json string,从而提取我们想要的数据。 数据准备 1# Sample Data Frame 2jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' 3jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' 4jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' 5df = spark.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)]) 1+--------------------+ 2| json| 3+--------------------+ 4|{"header":{"id":1...| 5|{"header":{"id":1...| 6|{"header":{"id":4...| 7+--------------------+ 如上所示,我们模拟一个DataFrame,其中只有一列,列名为json,类型为string。可以看到,json中的值为j……
阅读全文
$PySpark$ 在遇到$map$类型的列的一些处理 在$spark$中,有时会遇到$column$的类型是$array$和$map$类型的,这时候需要将它们转换为多行数据 $Explode\ array\ and\ map\ columns\ to\ rows$ 1import pyspark 2from pyspark.sql import SparkSession 3 4spark = SparkSession.builder.appName('pyspark-by-examples').getOrCreate() 5 6arrayData = [ 7 ('James',['Java','Scala'],{'hair':'black','eye':'brown'}), 8 ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}), 9 ('Robert',['CSharp',''],{'hair':'red','eye':''}), 10 ('Washington',None,None), 11 ('Jefferson',['1','2'],{}) ] 12 13df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties']) 14df.printSchema() 15df.show() 1root 2 |-- name: string (nullable = true) 3 |-- knownLanguages: array (nullable = true) 4 | |-- element: string (containsNull = true) 5 |-- properties: map (nullable = true) 6 | |-- key: string 7 | |-- value: string (valueContainsNull = true) 8 9+----------+--------------+--------------------+ 10| name|knownLanguages| properties| 11+----------+--------------+--------------------+ 12| James| [Java, Scala]|[eye -> brown, ha...| 13| Michael|[Spark, Java,]|[eye ->, hair -> ...| 14| Robert| [CSharp, ]|[eye -> , hair ->...| 15|Washington| null| null| 16| Jefferson| [1, 2]| []| 17+----------+--------------+--------------------+ $explode –……
阅读全文
Spark中cache和persist的作用 Spark开发高性能的大数据计算作业并不是那么简单。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。 有一些代码开发基本的原则,避免创建重复的RDD,尽可能复用同一个RDD,如下,我们可以直接用一个RDD进行多……
阅读全文
Container killed by YARN for exceeding memory limits? 运行spark脚本时,经常会碰到Container killed by YARN for exceeding memory limits的错误,导致程序运行失败。 这个的意思是指executor的外堆内存超出了。默认情况下,这个值被设置为executor_memory的10%或者384M,以较大者为准,即max(executor_memory*.1, 384M). 解决办法 提高内存开销 减少执行程序内核的数量 增加分区数量 提高驱动程序和执行程序内存 提……
阅读全文
简介 $repartition(numPartitions:Int)$ 和 $coalesce(numPartitions:Int,shuffle:Boolean=false)$ 作用:对RDD的分区进行重新划分,repartition内部调用了coalesce,参数$shuffle=true$ 分析 例:RDD有N个分区,需要重新划分成M个分区 N小于M 一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shu……
阅读全文
RDD算子总结 从功能上分: 转换算子(transformer): lazy执行,生成新的rdd,只有在调用action算子时,才会真正的执行。 如:map 、flatmap、filter、 union、 join、 ruduceByKey、 cache 行动算子(action): 触发任务执行,产生job,返回值不再是rdd。 如:count 、collect、top、 take、 reduce 从作用上分: 通用的: map、 flatMap、 di……
阅读全文
RDD简介 RDD–弹性分布式数据集(Resilient Distributed Dataset)是spark的核心概念。RDD其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD,转化已有的RDD以及调用RDD操作进行求值。而在这一切的背后,spark会自动讲RDD中的数据分发到集群上,并将操作并行化执行。 RDD基础 RDD是一个不可变的分布式对象集合.每个RDD被分为多个分区,这些分区运……
阅读全文