Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • ElasticSearch

  • Spark

    • 十亿条数据需要每天计算怎么办?Spark快速入门
    • 学会RDD就学会了Spark,Spark数据结构RDD快速入门
    • 像写SQL一样去处理内存中的数据,SparkSQL入门教程
    • Spark算子实战Java版,学到了
      • (一)概述
      • (二)转换算子
        • map
        • filter
        • flatMap
        • mapPartitions
        • union
        • intersection
        • groupByKey
        • reduceByKey
        • aggregateByKey
      • (三)行动算子
        • reduce
        • collect()
        • count()
        • first()
        • take
        • takeOrdered
        • foreach
      • (四)总结
  • kafka

  • AI聚集地

  • 大数据技术
  • Spark
CodeEase
2023-11-21
目录

Spark算子实战Java版,学到了

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)概述

算子从功能上可以分为Transformations转换算子和Action行动算子。转换算子用来做数据的转换操作,比如map、flatMap、reduceByKey等都是转换算子,这类算子通过懒加载执行。行动算子的作用是触发执行,比如foreach、collect、count等都是行动算子,只有程序运行到行动算子时,转换算子才会去执行。

本文将介绍开发过程中常用的转换算子和行动算子,Spark代码基于Java编写,前置代码如下:

public class SparkTransformationTest {
    public static void main(String[] args) {
        // 前置准备
        SparkConf conf = new SparkConf();
        conf.setMaster("local[1]");
        conf.setAppName("SPARK ES");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
    }
}
1
2
3
4
5
6
7
8
9
10

# (二)转换算子

# map

map(func):通过函数func传递的每个元素,返回一个新的RDD。

JavaRDD<Object> map = javaRdd.map((Function<String, Object>) 
        item -> "new" + item);
map.foreach(x -> System.out.println(x));
1
2
3

返回一个新的RDD,数据是newa、newb、newc、newd、newe

# filter

filter(func):筛选通过func处理后返回 true 的元素,返回一个新的RDD。

JavaRDD<String> filter = javaRdd.filter(item -> item.equals("a") || item.equals("b"));
filter.foreach(x -> System.out.println(x));
1
2

返回的新RDD数据是a和b。

# flatMap

flatMap(func):类似于 map,但每个输入项可以映射到 0 个或更多输出项。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a,b", "c,d,e", "f,g"));
JavaRDD<String> flatMap = javaRdd.flatMap((FlatMapFunction<String, String>) 
        s -> Arrays.asList(s.split(",")).iterator());
flatMap.foreach(x -> System.out.println(x));
1
2
3
4

入参只有3个,经过flatMap映射后返回了长度为7的RDD。

# mapPartitions

mapPartitions(func):类似于map,但该函数是在RDD每个partition上单独运行,因此入参会是Iterator<Object>。

JavaRDD<String> mapPartitions = javaRdd.mapPartitions((FlatMapFunction<Iterator<String>, String>) stringIterator -> {
    ArrayList<String> list = new ArrayList<>();
    while (stringIterator.hasNext()) {
        list.add(stringIterator.next());
    }
    return list.iterator();
});
mapPartitions.foreach(x -> System.out.println(x));
1
2
3
4
5
6
7
8

除了是对Iterator进行处理之外,其他的都和map一样。

# union

union(otherDataset):返回一个新数据集,包含两个数据集合的并集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
JavaRDD<String> unionRdd = javaRdd.union(newJavaRdd);
unionRdd.foreach(x-> System.out.println(x));
1
2
3

# intersection

intersection(otherDataset):返回一个新数据集,包含两个数据集合的交集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("a", "b", "3", "4", "5"));
JavaRDD<String> intersectionRdd = javaRdd.intersection(newJavaRdd);
intersectionRdd.foreach(x-> System.out.println(x));
1
2
3

# groupByKey

groupByKey ([ numPartitions ]):在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集,可以传递一个可选numPartitions参数来设置不同数量的任务。

这里需要了解Java中的另外一种RDD,JavaPairRDD。JavaPairRDD是一种key-value类型的RDD,groupByKey就是针对JavaPairRDD的API。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupByKey = javaPairRDD.groupByKey();
groupByKey.foreach(x -> System.out.println(x._1()+x._2()));
1
2
3
4
5
6
7

最终输出结果:

a[1, 2]
b[1]
c[3]
1
2
3

# reduceByKey

reduceByKey(func, [numPartitions]):在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合。和groupByKey不同的地方在于reduceByKey对value进行了聚合处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> reduceRdd = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
reduceRdd.foreach(x -> System.out.println(x._1()+x._2()));
1
2
3
4
5
6
7

最终输出结果:

a3
b1
c3
1
2
3

# aggregateByKey

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):aggregateByKey这个算子相比上面这些会复杂很多,主要参数有zeroValue、seqOp、combOp,numPartitions可选。

zeroValue是该算子设置的初始值,seqOp函数是将rdd中的value值和zeroValue进行处理,combOp是将相同key的数据进行处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> aggregateRdd = javaPairRDD.aggregateByKey(1,
        // seqOp函数中的第一个入参是 zeroValue,第二个入参是rdd的value,这里对所有的value+1
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2,
        // combOp函数是对同一个key的value进行处理,这里对相同key的value进行相加
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
aggregateRdd.foreach(x -> System.out.println(x._1()+":"+x._2()));
1
2
3
4
5
6
7
8
9
10
11

最终输出结果如下:

a:4
b:2
c:4
1
2
3

# (三)行动算子

# reduce

reduce(func):使用函数func聚合数据集的元素(它接受两个参数并返回一个)。 下面这段代码对所有rdd进行相加:

String reduce = javaRdd.reduce((Function2<String, String, String>) (v1, v2) -> {
    System.out.println(v1 + ":" + v2);
    return v1+v2;
});
System.out.println("result:"+reduce);
1
2
3
4
5

最终结果如下,从结果可以看出,每次对v1都是上一次reduce运行之后的结果:

a:b
ab:c
abc:d
abcd:e
result:abcde
1
2
3
4
5

# collect()

collect():将driver中的所有元素数据通过集合的方式返回,适合小数据量的场景,大数据量会导致内存溢出。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> collect = javaRdd.collect();
1
2

# count()

count():返回一个RDD中元素的数量。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
long count = javaRdd.count();
1
2

# first()

first():返回第一个元素。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
String first = javaRdd.first();
1
2

# take

take(n):返回前N个元素,take(1)=first()。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.take(3);
1
2

# takeOrdered

takeOrdered(n, [ordering]):返回自然排序的前N个元素,或者指定排序方法后的前N个元素。首先写一个排序类。

public class MyComparator implements Comparator<String>, Serializable {
    @Override
    public int compare(String o1, String o2) {
        return o2.compareTo(o1);
    }
}
1
2
3
4
5
6

接着是调用方式:

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.takeOrdered(3, new MyComparator());
1
2

# foreach

foreach(func):该函数对数据集的每个RDD运行func函数,foreach算子在上面的代码中已经使用到,这里不再做代码案例展示。

# (四)总结

Spark的开发可以用Java或者Scala,Spark本身使用Scala编写,具体使用哪种语言进行开发需要根据项目情况考虑时间和学习成本。具体的API都可以在Spark官网查询:https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

上次更新: 2025/02/18, 11:30:08
像写SQL一样去处理内存中的数据,SparkSQL入门教程
大数据场景下的消息队列:Kafka3.0快速入门

← 像写SQL一样去处理内存中的数据,SparkSQL入门教程 大数据场景下的消息队列:Kafka3.0快速入门→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式