Python系列课(11)——PySpark
目录一、前言介绍1. Spark是什么2. PySpark是什么3. 为什么要学习PySpark二、基础准备1. 安装PySpark2. 构建SparkContext入口对象3. PySpark编程模型三、数据输入1. RDD对象2. Python数据容器转RDDparallelize3. 读取文件转RDDtextFile四、数据计算核心算子1. map算子逐个处理2. flatMap算子逐个处理 解除嵌套3. reduceByKey算子按Key聚合4. filter算子过滤5. distinct算子去重6. sortBy算子排序五、数据输出1. 输出为Python对象2. 输出到文件六、综合案例案例1WordCount单词计数案例2销售数据分析案例3搜索引擎日志分析Hadoop运行七、PySpark速查表安装与环境数据输入数据计算算子数据输出代码跨行编写使用\一、前言介绍1. Spark是什么SparkApache Spark是用于大规模数据处理的统一分析引擎用于调度成百上千的服务器集群计算TB、PB乃至EB级别的海量数据。2. PySpark是什么PySparkSpark官方开发的Python语言第三方库Python开发者可以使用pip安装并使用。3. 为什么要学习PySpark大数据开发是Python的高薪就业方向SparkPySpark是大数据开发的核心技术栈二、基础准备1. 安装PySparkpip install pyspark # 使用清华镜像源更快 pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark2. 构建SparkContext入口对象from pyspark import SparkConf, SparkContext # 创建Spark配置 conf SparkConf().setMaster(local[*]).setAppName(MyApp) # 创建SparkContext入口对象 sc SparkContext(confconf) print(PySpark环境已启动)3. PySpark编程模型数据输入 → 数据处理计算 → 数据输出 ↓ ↓ ↓ RDD对象 RDD成员方法 Python对象/文件三、数据输入1. RDD对象RDD弹性分布式数据集PySpark中数据计算的载体提供数据存储和计算方法。2. Python数据容器转RDDparallelizefrom pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(CreateRDD) sc SparkContext(confconf) # list转RDD rdd1 sc.parallelize([1, 2, 3, 4, 5]) print(rdd1.collect()) # [1, 2, 3, 4, 5] # tuple转RDD rdd2 sc.parallelize((1, 2, 3, 4, 5)) print(rdd2.collect()) # [1, 2, 3, 4, 5] # set转RDD rdd3 sc.parallelize({1, 2, 3, 4, 5}) print(rdd3.collect()) # [1, 2, 3, 4, 5] # dict转RDD只有key被保留 rdd4 sc.parallelize({a: 1, b: 2, c: 3}) print(rdd4.collect()) # [a, b, c] # str转RDD每个字符为一个元素 rdd5 sc.parallelize(hello) print(rdd5.collect()) # [h, e, l, l, o] sc.stop()3. 读取文件转RDDtextFilefrom pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(ReadFile) sc SparkContext(confconf) # 读取文本文件 rdd sc.textFile(data.txt) print(rdd.collect()) sc.stop()四、数据计算核心算子1. map算子逐个处理from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(MapDemo) sc SparkContext(confconf) rdd sc.parallelize([1, 2, 3, 4, 5]) # 每个元素乘以2 rdd2 rdd.map(lambda x: x * 2) print(rdd2.collect()) # [2, 4, 6, 8, 10] # 每个元素转为字符串 rdd3 rdd.map(lambda x: f数字: {x}) print(rdd3.collect()) # [数字: 1, 数字: 2, 数字: 3, 数字: 4, 数字: 5] # 链式调用 result sc.parallelize([1, 2, 3, 4, 5]) \ .map(lambda x: x * 2) \ .map(lambda x: x 1) \ .collect() print(result) # [3, 5, 7, 9, 11] sc.stop()2. flatMap算子逐个处理 解除嵌套from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(FlatMapDemo) sc SparkContext(confconf) # map嵌套结构保留 rdd1 sc.parallelize([hello world, python spark]) result1 rdd1.map(lambda x: x.split( )).collect() print(result1) # [[hello, world], [python, spark]] # flatMap解除嵌套 rdd2 sc.parallelize([hello world, python spark]) result2 rdd2.flatMap(lambda x: x.split( )).collect() print(result2) # [hello, world, python, spark] # flatMap常用于单词拆分 words sc.parallelize([apple banana, cat dog, hello world]) \ .flatMap(lambda x: x.split( )) \ .collect() print(words) # [apple, banana, cat, dog, hello, world] sc.stop()3. reduceByKey算子按Key聚合from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(ReduceByKeyDemo) sc SparkContext(confconf) # 按key聚合相同的key进行两两计算 data [(苹果, 10), (香蕉, 5), (苹果, 8), (香蕉, 3), (苹果, 2)] rdd sc.parallelize(data) # 按key求和 result rdd.reduceByKey(lambda a, b: a b).collect() print(result) # [(苹果, 20), (香蕉, 8)] # 按key求最大值 max_result rdd.reduceByKey(lambda a, b: max(a, b)).collect() print(max_result) # [(苹果, 10), (香蕉, 5)] sc.stop()4. filter算子过滤from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(FilterDemo) sc SparkContext(confconf) rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 过滤偶数 evens rdd.filter(lambda x: x % 2 0).collect() print(evens) # [2, 4, 6, 8, 10] # 过滤大于5的数 greater rdd.filter(lambda x: x 5).collect() print(greater) # [6, 7, 8, 9, 10] sc.stop()5. distinct算子去重from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(DistinctDemo) sc SparkContext(confconf) rdd sc.parallelize([1, 2, 2, 3, 3, 3, 4, 4, 4, 4]) unique rdd.distinct().collect() print(sorted(unique)) # [1, 2, 3, 4] sc.stop()6. sortBy算子排序from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(SortByDemo) sc SparkContext(confconf) rdd sc.parallelize([(苹果, 10), (香蕉, 5), (橙子, 8), (葡萄, 3)]) # 按值排序升序 sorted_asc rdd.sortBy(lambda x: x[1], ascendingTrue).collect() print(sorted_asc) # [(葡萄, 3), (香蕉, 5), (橙子, 8), (苹果, 10)] # 按值排序降序 sorted_desc rdd.sortBy(lambda x: x[1], ascendingFalse).collect() print(sorted_desc) # [(苹果, 10), (橙子, 8), (香蕉, 5), (葡萄, 3)] # 全局排序需要设置分区数为1 sorted_global rdd.sortBy(lambda x: x[1], ascendingFalse, numPartitions1).collect() print(sorted_global) sc.stop()五、数据输出1. 输出为Python对象from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(OutputDemo) sc SparkContext(confconf) rdd sc.parallelize([1, 2, 3, 4, 5]) # collect()转换为list result_list rdd.collect() print(result_list) # [1, 2, 3, 4, 5] # count()统计元素个数 count rdd.count() print(count) # 5 # take(n)取出前n个元素 first_three rdd.take(3) print(first_three) # [1, 2, 3] # reduce()自定义聚合 total rdd.reduce(lambda a, b: a b) print(total) # 15 sc.stop()2. 输出到文件import os from pyspark import SparkConf, SparkContext # 配置Hadoop依赖Windows需要 os.environ[HADOOP_HOME] D:/hadoop-3.0.0 conf SparkConf().setMaster(local[*]).setAppName(SaveFile) # 设置分区数为1避免输出多个文件 conf.set(spark.default.parallelism, 1) sc SparkContext(confconf) rdd sc.parallelize([hello world, pyspark is great, data processing]) # 保存为文本文件 rdd.saveAsTextFile(output) sc.stop()六、综合案例案例1WordCount单词计数from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(WordCount) sc SparkContext(confconf) # 读取文件 rdd sc.textFile(words.txt) # 处理流程 result rdd \ .flatMap(lambda line: line.split( )) \ # 拆分单词 .map(lambda word: (word, 1)) \ # 转换为(word, 1) .reduceByKey(lambda a, b: a b) \ # 按单词聚合 .sortBy(lambda x: x[1], ascendingFalse) \ # 按次数降序 .collect() # 输出结果 for word, count in result: print(f{word}: {count}) sc.stop()假设words.txt内容hello world hello python spark hello python pyspark spark hello输出hello: 4 python: 2 spark: 2 world: 1 pyspark: 1案例2销售数据分析import json from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(SalesAnalysis) sc SparkContext(confconf) # 模拟订单数据JSON格式 orders_data [ {id:1,category:平板电脑,areaName:北京,money:1450}, {id:2,category:手机,areaName:北京,money:8412}, {id:3,category:电脑,areaName:上海,money:1513}, {id:4,category:家电,areaName:北京,money:1550}, {id:5,category:电脑,areaName:杭州,money:1550}, {id:6,category:家具,areaName:北京,money:6661}, {id:7,category:书籍,areaName:北京,money:5550}, {id:8,category:食品,areaName:北京,money:5520}, {id:9,category:服饰,areaName:杭州,money:9000} ] # 创建RDD rdd sc.parallelize(orders_data) # 解析JSON def parse_json(line): data json.loads(line) return (data[areaName], data[money], data[category]) parsed rdd.map(parse_json) # 1. 各个城市销售额排名从大到小 city_sales parsed \ .map(lambda x: (x[0], x[1])) \ .reduceByKey(lambda a, b: a b) \ .sortBy(lambda x: x[1], ascendingFalse) \ .collect() print( 各城市销售额排名 ) for city, money in city_sales: print(f{city}: {money}元) # 2. 全部城市有哪些商品类别在售卖 all_categories parsed \ .map(lambda x: x[2]) \ .distinct() \ .collect() print(\n 全部商品类别 ) print(all_categories) # 3. 北京市有哪些商品类别在售卖 beijing_categories parsed \ .filter(lambda x: x[0] 北京) \ .map(lambda x: x[2]) \ .distinct() \ .collect() print(\n 北京市商品类别 ) print(beijing_categories) sc.stop()输出 各城市销售额排名 北京: 23188元 杭州: 10550元 上海: 1513元 全部商品类别 [平板电脑, 手机, 电脑, 家电, 家具, 书籍, 食品, 服饰] 北京市商品类别 [平板电脑, 手机, 家电, 家具, 书籍, 食品]案例3搜索引擎日志分析from pyspark import SparkConf, SparkContext conf SparkConf().setMaster(local[*]).setAppName(LogAnalysis) sc SparkContext(confconf) # 模拟日志数据格式时间|关键词 log_data [ 2024-01-01 10:00:00|Python教程, 2024-01-01 10:05:00|Spark入门, 2024-01-01 10:10:00|黑马程序员, 2024-01-01 10:10:00|Python教程, 2024-01-01 11:00:00|大数据开发, 2024-01-01 11:05:00|黑马程序员, 2024-01-01 11:10:00|Spark入门, 2024-01-01 12:00:00|黑马程序员, 2024-01-01 12:00:00|Python教程 ] rdd sc.parallelize(log_data) # 解析日志 def parse_log(line): time, keyword line.split(|) hour time.split( )[1].split(:)[0] # 提取小时 return (hour, keyword) parsed rdd.map(parse_log) # 1. 热门搜索时间段按搜索次数排名 hour_count parsed \ .map(lambda x: (x[0], 1)) \ .reduceByKey(lambda a, b: a b) \ .sortBy(lambda x: x[1], ascendingFalse) \ .take(3) print( 热门搜索时间段 Top3 ) for hour, count in hour_count: print(f{hour}点: {count}次) # 2. 热门搜索词 Top3 keyword_count parsed \ .map(lambda x: (x[1], 1)) \ .reduceByKey(lambda a, b: a b) \ .sortBy(lambda x: x[1], ascendingFalse) \ .take(3) print(\n 热门搜索词 Top3 ) for keyword, count in keyword_count: print(f{keyword}: {count}次) # 3. 统计黑马程序员在哪个时段被搜索最多 heimakeyword parsed \ .filter(lambda x: x[1] 黑马程序员) \ .map(lambda x: (x[0], 1)) \ .reduceByKey(lambda a, b: a b) \ .sortBy(lambda x: x[1], ascendingFalse) \ .first() print(f\n 黑马程序员搜索最多时段 ) print(f{heimakeyword[0]}点: {heimakeyword[1]}次) sc.stop()输出 热门搜索时间段 Top3 10点: 4次 11点: 3次 12点: 2次 热门搜索词 Top3 Python教程: 3次 黑马程序员: 3次 Spark入门: 2次 黑马程序员搜索最多时段 10点: 1次 11点: 1次 12点: 1次Hadoop运行提交命令 bin/spark-submit --master yarn --num-executors 3 --queue root.teach --executor-cores 4 --executor-memory 4g /home/hadoop/demo.py七、PySpark速查表安装与环境操作命令安装PySparkpip install pyspark创建入口sc SparkContext(confconf)数据输入方法说明sc.parallelize(data)Python容器 → RDDsc.textFile(path)文本文件 → RDD数据计算算子算子说明示例map(func)逐个元素处理rdd.map(lambda x: x*2)flatMap(func)处理并解除嵌套rdd.flatMap(lambda x: x.split())filter(func)过滤True保留rdd.filter(lambda x: x5)reduceByKey(func)按Key聚合rdd.reduceByKey(lambda a,b: ab)distinct()去重rdd.distinct()sortBy(func, ascending)排序rdd.sortBy(lambda x: x[1])数据输出方法说明collect()RDD → listcount()统计元素个数take(n)取前n个元素first()取第一个元素reduce(func)聚合计算saveAsTextFile(path)保存到文件代码跨行编写使用\# 使用反斜杠实现跨行编写 result sc.textFile(data.txt) \ .flatMap(lambda x: x.split( )) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a b) \ .collect()