在C++中 如何实现java中的Stream

文章目录

关于Steam

举个例子

List<Integer> list =Arrays.asList(1,2,3);// map将int的流,转换为string 的流, 然后对流进行过滤,最后收集到一个listList<String> list2=list.stream().map(x -> x +"123").filter(x -> x.startsWith("11")).collect(Collectors.toList());

Java 8中的Stream API是一种全新的处理集合数据的方式,它提供了一种非常便捷的方法来对集合中的元素进行筛选、排序、聚合等操作。下面是一些Stream API的特点和应用场景,以及其关键特性:

特点:

  • Stream是一种延迟计算的集合,它只有在真正需要使用时才会执行计算操作,可以极大地提高效率。
  • Stream可以处理大量数据,其内部使用了多线程的技术,可以自动并行化处理。
  • Stream可以实现非常复杂的操作,比如filter、map、reduce等等。

应用场景:

  • 数据处理:Stream API可以用于处理大量的数据,比如从数据库或者文件中读取数据,并对其进行处理。
  • 并发编程:Stream API内部使用了多线程技术,可以实现并发编程,提高程序的执行效率。
  • 功能扩展:Stream API提供了大量的中间操作和终止操作,可以方便地对集合中的元素进行筛选、排序、聚合等操作。

关键特性:

  • 流(Stream):Stream是一个数据流,可以看做是一种集合,但是它并不会存储数据,而是通过函数式编程的方式来对数据进行处理。
  • 中间操作:Stream提供了大量的中间操作,比如filter、map、distinct、sorted、limit等等,这些中间操作会返回一个新的Stream。
  • 终止操作:Stream提供了一些终止操作,比如forEach、count、reduce、collect等等,这些操作会触发Stream的执行,返回一个结果。
  • 并行流:Stream提供了并行流的功能,可以利用多线程的技术来提高程序的执行效率。可以通过parallelStream()方法获取一个并行流。
  • 支持函数式编程:Stream API使用函数式编程的方式来处理数据,可以大大简化程序的编写过程。

总之,Stream API是Java 8中非常重要的一个新特性,它可以让我们以一种更加简洁、高效的方式来处理集合中的数据,应用场景非常广泛。

对于java转C++的小伙伴来说,这个api实在太好用了,完全离不开它,能否在c++中也实现一套类似的api呢

实现思路

java的实现分析, 详细参考:Java中牛X的Stream流水线操作是怎样实现的 这里简单说明一下:

如下java代码:

list.stream().map(x->x+“123”).filter(x->x.startsWith(“test”)).collect(Collectors.toList());

抽象来看 创建了一些列的StatelessOp/StateFullOp ,通过类似链表的方式串联了起来,最终执行的时候,遍历链表将 StatelessOp/StateFullOp 中用户定义操作,通过Sink这个类,一层层wapper,最终在收集的时候执行了这个wrapper。

类似如下过程:

在这里插入图片描述

本质来说就是通过一层一层的回调函数的方式将多个操作串联了起来。 那么C++也可以通过回调函数的方式直接实现一个简易的类似java 中Stream的API。

也可以通过生产消费者模型来解释,本质都是一样的:
有AB两个模块,A负责生产数据,B负责消费数据,B不关心A怎么生产,A不关心B怎么消费,可能需要先过滤,转换,或者聚合什么的,这种情况下,传统的做法就是在A提供一个接口,注册一个回调函数,B负责调用,当A生产出数据的时候,调用B注册的回调函数进行消费。

思路打开,如果我们 有ABCD。。。 N 个模块呢,一级级注册回调函数,将这些回调函数级联起来,是不是就很像Stream的api呢,而且由于都是回调函数天生是懒加载的或者说天生支持反应式编程中的背压机制,就是说会根据消费者的消费速度去生产数据。

stream API 的本质就是注册回调函数,并且在合适的时候触发这个回调函数的调用

接口定义

template<typename T> class Flow { protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 */ std::function<void(std::function<void(T)>)> consume; public: explicit Flow(const std::function<void(std::function<void(T)>)> c) : consume(c) {} } 

关键在于 这个成员变量函数,consume: 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数

将一个Vector转换成流:

static Flow<T> of(std::vector<T> vector) { return Flow<std::string>([=](const std::function<void(std::string)> &c) { for (const auto &item: vector) { c(item); } }); } 

map

对于map其实就是消费一个流,并且 产生一个新的流。

template<typename R> Flow<R> map(std::function<R(T)> dataMapFun) { Flow<T> mapFLow = Flow([=](std::function<void(R)> c) { //定义了上一个流消费消费逻辑,消费的同时转换数据,产生下一个流 this->consume([=](T data) { c(dataMapFun(data)); //生产数据 }); }); return mapFLow; } 

记住我们对一个conume函数的定义,调用入参的回调函数即是生产数据

flatmap

template<typename R> Flow<R> flatMap(std::function<Flow<R>(T)> function) { return Flow([=](std::function<void(R)> c) { consume([=](T data) { Flow<R> flow = function(data); flow.consume(c); }); }); } 

是不是很简单,只要牢记我们对一个conume函数的定义,调用入参的回调函数即是生产数据

takeWhile

Flow<T> takeWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consumeTillStop([=](T data) { if (predicate(data)) { stop(); } c(data); }); } ); } void stop() { throw StopException{}; } void consumeTillStop(std::function<void(T)> consumer) { try { consume(consumer); } catch (StopException exception) { } } 

因为都是回调函数,我们只能通过异常通知生产者停止生产

filter

Flow<T> filter(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { if (predicate(data)) { c(data); } }); } ); } 

sorted

Flow<T> sorted(std::function<bool(T, T)> comparator) { auto list = this->toVector(); std::sort(list.begin(), list.end(), comparator); return Flow::of(list); } 

toVector

类似java中toList,这里是最终的操作,不要产生一个新的流了。

std::vector<T> toVector() { std::vector<T> list; this->consume([&list](T data) { list.push_back(data); }); return list; } 

扩展

上面只给出了部分api的实现,按照这个思路stream中所有api都是可以实现的,上面还给出了java9中才支持的takeWhile。

上面也只给出了Vector转换为流,只要牢记我们那个关键函数的定义 consume:当流生产出数据时就调用入参的回调函数那么就可以实现所有集合的流式操作

那么它仅限于此么?当然不是

并行流

并发多线程调用回调函数,那么这个流就变成了并发的了

文件流

当读出文件时,就调用回调函数,那么它就变成了文件流,而且是基于回调函数的,不必读取所有的文件内容,读一部分,处理一部分,占用内存十分的小

二元流

看看下面这个函数的定义,它又变成了二元流了,当然N元的也不是不可以,是吧

template<typename T1,typename T2> class Flow { protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 */ std::function<void(std::function<void(T1,T2)>)> consume; } 

业务流

最后回到生成者消费者模型,仔细想想也满足上述模型,而且是背压的,少了一个存放数据队列

总结

我们通过级联回调函数的方式实现了java中stream相关api,比起java jdk的实现要简单很多,但是该有的功能都有了。

这些代码的实现我觉得正是 业务逻辑与控制逻辑的分离的体现,api的调用者只关心逻辑,不关系怎么去控制。比如filter你只需要告诉我过滤的的条件是什么,怎么样过滤不需要关心。

也正是封装易变点思想的体现,通过上述api的封装,对于map ,filter等等操作,不用重复去写for循环,if语句,只需要告诉api的逻辑是什么,也就是传入的func

也是函数式编程思想的体现,无状态的,函数作为参数传递,惰性求值和并性处理等

代码demo

#include <vector> #include <list> #include <set> #include <map> #include <functional> /** * c++ 版本流的实现,通过回调函数的方式实现类似java中stream 的api,目前是demo版本,支持部分功能,看需要可能会支持并发流,二元流等功能 * * 由于是回调函数实现,天然是懒加载,被压的方式 * * @tparam T 流的类型 */ template<typename T> class Flow { private: void stop() { throw StopException{}; } void consumeTillStop(std::function<void(T)> consumer) { try { consume(consumer); } catch (StopException exception) { } } protected: /** * 关键函数定义: * 此函数的实现定义了何时调用入参的回调函数(数据的生产),当流生产出数据时就调用入参的回调函数, * 例如:对于一个vector的流就是循环调用回调函数 * 对于一个无限流,那么就是死循环调用回调函数 * * */ std::function<void(std::function<void(T)>)> consume; public: explicit Flow(const std::function<void(std::function<void(T)>)> c) : consume(c) {} using StopException = std::exception; void forEach(std::function<void(T)> consumer) { this->consume(consumer); } /** * 流的转换,消费一个流,产生一个新的流, * * 定义了上一个流的消费逻辑,以及本流的产生逻辑 * @tparam R 新的流的类型 * @param dataMapFun * @return */ template<typename R> Flow<R> map(std::function<R(T)> dataMapFun) { Flow<T> mapFLow = Flow([=](std::function<void(R)> c) { consume([=](T data) { c(dataMapFun(data)); }); }); return mapFLow; } template<typename R> Flow<R> flatMap(std::function<Flow<R>(T)> function) { return Flow([=](std::function<void(R)> c) { consume([=](T data) { Flow<R> flow = function(data); flow.consume(c); }); }); } Flow<T> takeWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consumeTillStop([=](T data) { if (predicate(data)) { stop(); } c(data); }); } ); } Flow<T> filter(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { if (predicate(data)) { c(data); } }); } ); } Flow<T> dropWhile(std::function<bool(T)> predicate) { return Flow( [=](std::function<void(T)> c) { bool drop = false; consume([=, &drop](T data) { if (drop) { c(data); } if (!drop) { drop = predicate(data); } }); } ); } Flow<T> skip(int n) { return Flow( [=](std::function<void(T)> c) { int count = n; consume([=, &count](T data) { if (count <= 0) { c(data); } else { count--; } }); } ); } Flow<T> peek(std::function<void(T)> consumer) { return Flow( [=](std::function<void(T)> c) { consume([=](T data) { consumer(data); c(data); }); } ); } Flow<T> sorted(std::function<bool(T, T)> comparator) { auto list = this->toVector(); std::sort(list.begin(), list.end(), comparator); return Flow::of(list); } Flow<T> sorted() { auto list = this->toVector(); std::sort(list.begin(), list.end()); return Flow::of(list); } std::vector<T> toVector() { std::vector<T> list; this->consume([&list](T data) { list.push_back(data); }); return list; } template<typename K> std::unordered_map<K, std::vector<T>> groupBy(std::function<K(T)> keyFun) { std::unordered_map<K, std::vector<T>> map; consume([=, &map](T data) { K key = keyFun(data); auto iter = map.find(key); if (iter != map.end()) { iter->second.push_back(data); } else { std::vector<T> vector; vector.push_back(data); map.insert(std::make_pair(key, vector)); } }); return map; } std::set<T> toSet() { std::set<T> set; this->consume([&set](T data) { set.insert(data); }); return set; } static Flow<T> of(std::vector<T> vector) { return Flow<std::string>([=](const std::function<void(std::string)> &c) { for (const auto &item: vector) { c(item); } }); } }; 
std::vector<std::string> vec; vec.push_back("12_3"); vec.push_back("12"); vec.push_back("11"); vec.push_back("a"); vec.push_back("b"); Flow<std::string> strFlow = Flow<std::string>::of(vec); std::cout << "demo1<<<<<<<<<<<<<<<<<<:" << std::endl; std::vector<std::string> mapVec = strFlow .map<std::string>([=](const std::string &data) { return data + "123"; }) .peek([=](const std::string &data) { std::cout << "peek:" + data << std::endl; }) .skip(3) .toVector(); 

tips

给上面的那些函数起个好名字,可能会让java的同学感到更有亲切感

using Consumer = std::function<void(T)>; using Function = std::function<T(T)>; using Predicate = std::function<bool(T)>; using Comparator = std::function<bool(T,T)>; 

Read more

Java各大厂实习面试题面经新鲜出炉!---壹

Java各大厂实习面试题面经新鲜出炉!---壹

🌟 Hello,我是Java学习通! 🌈 在彩虹般绚烂的技术栈中,我是那个永不停歇的色彩收集者。 🦋 每一个优化都是我培育的花朵,每一个特性都是我放飞的蝴蝶。 🔬 每一次代码审查都是我的显微镜观察,每一次重构都是我的化学实验。 🎵 在编程的交响乐中,我既是指挥家也是演奏者。让我们一起,在技术的音乐厅里,奏响属于程序员的华美乐章。 目录 1.MySQL事务机制(阿里巴巴) 2.有做过SQL优化的实现么(阿里巴巴) 3.Nacos底层是如何实现注册中心功能的:(阿里巴巴) 4.RocketMQ如何持久化(阿里巴巴) 5.介绍一下websocket(阿里巴巴) 6.如何判断是http是长连接还是短连接,怎么设置长连接(阿里巴巴) 7.HashMap的实现原理(快手) 8.HashMap承载的元素越来越少,什么时候会退化成链表,为什么两者设置的这个值不对称(快手) 9.mysql和redis的一致性怎么保证的(快手) 10.数据库有哪些隔离级别 默认的隔离级别是什么(快手) 11.缓存击穿

By Ne0inhk
【Java 开发日记】我们来说一下 Mybatis 的缓存机制

【Java 开发日记】我们来说一下 Mybatis 的缓存机制

目录 核心概览 一级缓存 1. 作用域 2. 工作机制 3. 示例说明 4. 注意事项 二级缓存 1. 作用域 2. 开启与配置 3. 工作机制 4. 示例说明 5. 注意事项 缓存顺序与总结 使用建议 核心概览 * 一级缓存:默认开启,作用范围在 同一个 SqlSession 内。 * 二级缓存:需要手动配置开启,作用范围在 同一个 Mapper 命名空间(即同一个 Mapper 接口)内,可以被多个 SqlSession 共享。 一级缓存 1. 作用域 * SqlSession 级别:当同一个

By Ne0inhk
UnityMCP+Claude+VSCode,构建最强AI游戏开发环境

UnityMCP+Claude+VSCode,构建最强AI游戏开发环境

* 前言 * 一、UnityMCP+Claude+VSCode,构建最强AI 游戏开发环境 * 1.1 介绍 * 1.2 使用说明及下载 * 二、VSCode配置 * 2.1 连接UnityMCP * 2.2 在VSCode中添加插件 * 2.3 Claude安装 * 2.4 VSCode MCP配置 * 2.5 使用Claude开发功能 * 三、相关问题 * 总结 前言 * 本篇文章来介绍使用 UnityMCP+Claude+VSCode,打造一个更智能、高效的游戏开发工作流。 * 借助MCP工具,Claude可以直接与Unity编辑器进行双向指令交互,开发者则可以直接使用自然语言进行Unity游戏开发。 * 这一组合充分利用了AI的代码生成、问题诊断与创意辅助能力,极大提升了Unity项目的开发效率与质量。 一、UnityMCP+Claude+

By Ne0inhk

更新后的主流AI IDE/工具对比表

更新后的主流AI IDE/工具对比表 工具名称 类型 / 开发商 核心定位与特点 定价(个人) 适用场景与人群 Cursor AI原生IDE (Anysphere) 项目级上下文理解最强,支持多Agent并行协作,自研Composer模型,响应快。 $20/月 追求极致生产力、处理中大型复杂项目的全栈开发者。 Windsurf AI原生IDE (Codeium) 多文件智能体(Agent)能力突出,终端集成好,执行复杂任务流畅,被视为“Cursor平替”。 $15/月 需要深度处理大型代码库、注重重构和终端工作流的开发者。 GitHub Copilot IDE插件 (微软/GitHub) 生态最成熟,与VS Code、JetBrains等IDE无缝集成,补全准确稳定,用户基数最大。 $10/月起 大多数开发者的稳妥选择,尤其适合GitHub生态用户和团队协作。

By Ne0inhk