6-27 更新:已中选,能从海内外名校候选者中脱颖而出是一件很荣幸的事情,感谢导师的信任,Apache 邮箱我来了!

项目申请书

课题名称

eventmesh admin 模块优化完善

申请人:夏天

导师:薛炜明 | mikexue@apache.org

社区简介

Apache EventMesh 是一个用于解耦应用和后端中间件层的动态云原生事件驱动架构基础设施。它支持广泛的用例,包括复杂的混合云、使用了不同技术栈的分布式架构。

  • 轻松构建松耦合、分布式事件驱动架构
  • 开放兼容,原生支持 CloudEvents SDK 和 API,易扩展
  • 打破数据孤岛,支持企业全联接、数字化、多云协同

题目简介

eventmesh admin 模块为 eventmesh 项目的管理模块,目前 eventmesh admin 模块包含了不同协议的查询接口、客户端操作接口、webhook 配置操作接口,以及对事件存储的操作等,现有接口需补充相关注释,并测试是否功能正常,同时输出 admin 模块对应的文档与功能展示,针对事件存储的操作需进行扩展补充更多功能。

编码任务

  • 采用 checkStyle.xml 代码风格配置中的格式,为现有的 admin 模块接口编写 JavaDoc 注释
  • 在官方 Docusaurus 文档仓库中编写 admin 模块的接口文档
  • 测试现有接口功能并完善优化
  • 现有的 admin 模块对事件存储的管理接口耦合在 runtime 运行时中,且支持的管理范围也局限在单机。为了让 admin 模块具有集群管理能力,需要将 org.apache.eventmesh.runtime.admin 包中的所有管理接口提取到 eventmesh-admin 模块,以便于支持单独部署
  • 为 admin 管理中心增加一键启停等管理命令,支持向集群节点发送控制命令
  • 更新 eventmesh-runtime/scripts 中的管理脚本
  • 完成 admin 模块中 TBD(待决定)和 TODO(待开发)的功能

如果上述任务完成进度快于预期,可以在活动结束前和后续社区贡献中继续完成以下任务:

  • admin 模块与 eventmesh-registry-plugin 注册中心交互,runtime 把信息暴露给 registry,用 registry 的 API,将对注册中心的调用整合到 admin 管理中心
  • Data Mesh: 支持在不同的事件存储之间传输数据,使用 EventMesh SDK 打通各事件存储
  • 完善事件存储接口,增加支持的事件存储
  • 开发一个支持统一各事件存储的通用连接器,减少连接器插件的使用方法更新时业务侧的代码变动
  • 支持 MQTT 协议

核心模块分析

解耦架构

Apache EventMesh 可以将应用中的业务逻辑与基础设施的强绑定解耦,其解耦原理与另一开源项目 Apache Linkis 类似,通过 eventmesh-storage-plugin 模块调用 eventmesh-connectors 模块中的 sink connector 和 source connector,以插件形式提供对不同事件存储的支持能力。

image-20230613214715150

image-20230613214912291

插件化的方式最主要的优点在于方便,可以让所有的使用者根据具体的基础设施方案去使用不同的插件。eventmesh-runtime 是一种微内核的设计,只需要给 runtime 设计好它的运行态,其它的功能都从 API 去使用。定义好不同的 API 之后,就可以进一步定义不同的插件了,比如注册中心模块、存储模块等。

eventmesh-connectors 模块是三月份近期新提取的,在此之前,有一个名为 eventmesh-connector-plugin 的模块,这两者的功能是不完全相同的。eventmesh-connector-plugin 模块转型为了目前的 eventmesh-storage-plugin 模块,原先的 eventmesh-connector-plugin 的角色是 EventMesh 的事件存储,现在的 eventmesh-connectors 模块是更纯粹的连接器,承担数据同步。也就是说,source 和 sink 两端进行了解耦处理,比如 source 端是 RocketMQ,sink 端可以是其他的业务,但在这两端中间会经过 eventmesh-storage-plugin 模块处理。

SPI 反射机制

通过 org.apache.eventmesh.starter.StartUp 的 main () 方法启动 eventmesh-runtime 后,请求的处理是利用 SPI 反射机制,外加调用其它模块的 API 来实现的。如果需要支持某种事件存储,除了在代码层面强依赖的方式,更好的方式便是将插件的 jar 包加载后使用 SPI 的反射机制来将该插件实例化,这一点与 Apache Dubbo 的 SPI 机制是比较类似的。

以 eventmesh-storage-rocketmq 事件存储插件为例,通过在定义接口时添加 @EventMeshSPI 注解,并定义其单例模式与插件类型,可以在 classpath 加载这个类的时候,识别注解并寻找对应的实现类。在实现类关联的文件中定义了 RocketMQ 的别名,也就是实现类的全路径:

1
rocketmq=org.apache.eventmesh.storage.rocketmq.producer.RocketMQProducerImpl

于是,加载存储层的 API,例如 org.apache.eventmesh.api.producer.Producer 接口时,便可以映射该接口有一个 RocketMQ 的实现,且找到该实现对应的实现类,再通过对实现类进行反射加载,就可以将其实例化。

消息的处理流程

Apache EventMesh 目前支持接收 TCP、HTTP、gRPC 三种协议的请求。eventmesh-runtime 相当于 Apache EventMesh 的服务端,eventmesh-example 使用了 SDK,就相当于客户端。

以 TCP 为例,在收到业务侧发送的请求后,将首先由 Server 层的 org.apache.eventmesh.runtime.boot.EventMeshTCPServer 处理。这是一个基于 Netty 框架编写的 TCP 服务器,Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。它的 API 易用、性能高且社区活跃,在 Apache Dubbo、RocketMQ 等项目中都有使用。

EventMeshTCPServer 的入口在 startServer () 方法,在这个方法中通过管道的方式一步步添加对 TCP 数据包的处理。在经过 Handler 处理后,数据包进入 EventMeshTcpMessageDispatcher 分发器,使用 TCP 命令字区分数据包类型,并以异步回调的方式将数据包分发到对应任务的线程池中进行处理。从客户端的角度来讲,关键字对应了握手、心跳、订阅、取消订阅、开始监听消息、发送异步事件、发送广播事件等多种场景,完整信息可以参考 TCP 文档。

在异步回调中,EventMesh 作为发送方,在向基础设施的事件存储发送请求后,不会同步等待请求的执行结果,会继续发送其它请求,或者去执行其它任务。具体来说,虽然发送方可以 “发后不管”,但它需要一个网络 IO 线程对接收方发回的处理结果保持监听状态。执行业务逻辑的线程将请求的上下文托管给网络 IO 线程后,便可以继续处理业务逻辑,等到网络 IO 线程收到响应时,再交给业务逻辑线程。两者是错开运行的,虽然请求的延迟在上下文切换上会产生一定的性能损耗,但这种异步模式对吞吐量和高可用性的提升是更为可观的。

在异步的基础上,EventMesh 对事件存储有很多 Callback 回调的机制。以 RocketMQ 为例,EventMesh 用 RocketMQ 插件向基础设施发送请求时,EventMesh 的业务逻辑线程本身在网络 IO 线程发送完毕之后就已经离开了,等到收到基础设施中的 RocketMQ 服务端的响应之后,再去调 RocketMQ 客户端的回调,这个回调会再回到 EventMesh 里。回调机制整体是一个递归的流程,一层一层往深处走,然后一层一层往外返回,得出最终的结果。

具体来说,在 EventMeshTcpMessageDispatcher.channelRead0 () 中使用 dispatch () 方法根据 TCP 命令字分发数据包时,会使用 MessageTransferTask () 方法处理消息的收发任务,先把 TCP 协议转换为 EventMesh 内部通用的 CloudEvent 协议,经过 Traceback 和 TCP 限流等处理后,调用 createSendCallback () 方法,开始递归。递归返回的响应经过 AbstractTask 类抽象的 upstreamMsg () 方法包装,此时 upstreamMsg () 就做好了发送给事件存储基础设施的准备。再以 SessionSender 的 send () 方法为起点,通过 API 层 org.apache.eventmesh.api.producer.Producer 类中的 publish () 方法将消息异步地广播出去,使订阅者收到消息。执行结果保存在 sendStatus 中。

其中,createSendCallback () 方法是最为核心的,它创建了一个回调任务,并使用 SendCallback () 方法,在 onSuccess 时,将消息的内容、开始时间、执行时间、上下文等数据通过 writeAndFlush () 方法写给发送方。

什么时候调用 onSuccess () 方法是最重要的,它是在存储层被调用的,如 eventmesh-storage-kafka 等。此时已经使用 Kafka 客户端 KafkaProducer.send () 的 doSend () 方法向基础设施发送了 Kafka 形式的回调命令,并通过 onCompletion () 返回结果。

publish () 方法的每一种实现都对应着一个支持特定事件存储的实现类,其具体采用哪种实现就是利用了 SPI 机制加载。每种实现都有一些差别,例如,在 RocketMQ 的实现中,就在 sendAsync () 方法中使用了 RocketMQMessageFactory.createWriter () 方法和 sendCallbackConvert () 方法将 CloudEvent 和 EventMesh 自定义的 SendCallback 回调类型转换为 RocketMQ 支持的 org.apache.rocketmq.common.message.Message 消息类型和 org.apache.rocketmq.client.producer.SendCallBack 回调类型,而在 Kafka 的实现中,回调函数类型的转换是通过匿名函数 (metadata, exception) -> { ... } 实现的。

总而言之,从整体的、消息的角度看,一个请求的流程,是先由 EventMesh 客户端发给 EventMesh,EventMesh 再用 RocketMQ 客户端发给 RocketMQ,并且传一个 SendCallback 回调类型给 RocketMQ,并要求 RocketMQ 在请求处理结束、调用 RocketMQ 的 onSuccess () 方法时,使用 EventMesh 的 onSuccess () 方法返回回调结果。

从 EventMesh 内部层次的角度看,消息一开始会进入 Server 层,然后利用 SPI 反射机制,在 API 层利用异步回调去调用存储层的事件存储插件,一方面向事件存储发送消息和回调,另一方面向 API 层回调,等到事件存储返回执行的结果之后,将结果等信息返回给 API,然后再由 API 把执行结果原路返回调用方。

admin 模块分析

模块现状

课题任务所指的 admin 模块包含了 eventmesh-admin 模块和 org.apache.eventmesh.runtime.admin 包两大部分,不同协议的查询接口、客户端操作接口、webhook 配置操作接口,以及对事件存储的操作等目前都包含在后者中,前者目前只提供了 RocketMQ 事件存储的管理功能,并不能管理 EventMesh 节点,且没有提供 SPI 反射机制的插件加载能力。

接口功能

此小节列举并分析了 org.apache.eventmesh.runtime.admin 包中所有接口的功能,详细接口文档示例可跳转示例:获取系统配置信息

/configuration

位于 ConfigurationHandler 类,对应 eventmesh-dashboard 路径 /

用于获取当前 EventMesh 节点的基本配置信息,包括服务名称、服务环境和各协议监听端口地址等。

接口中定义了三个方法:

  1. preflight(HttpExchange httpExchange) 方法处理 OPTIONS 请求,用于在实际请求之前发送 CORS(跨源资源共享)响应头,允许跨域请求访问。它将响应头添加到 httpExchange 对象中,并发送 200 状态码。
  2. get(HttpExchange httpExchange) 方法处理 GET 请求,用于获取配置信息。它将配置信息组装成 GetConfigurationResponse 对象,并将其序列化为 JSON 字符串,并将其作为响应的内容返回给客户端。
  3. handle(HttpExchange httpExchange) 方法是接口的主要处理方法,根据请求的方法类型(OPTIONS 或 GET)分别调用上述两个方法进行处理。

其中,handle() 方法是由 com.sun.net.httpserver.HttpHandler 实现的,如图所示:

image-20230617152925838

/client

此接口用于连接 EventMesh 节点,但代码中没有此路径的请求映射,需要修复。对应 eventmesh-dashboard 按钮 /#Save

/metrics

位于 MetricsHandler 类,对应 eventmesh-dashboard 路径 /metrics

用于获取指标数据的汇总信息,包含以下指标:

HTTP 指标:

  • maxHTTPTPS(): 最大 HTTP 事务数 / 秒
  • avgHTTPTPS(): 平均 HTTP 事务数 / 秒
  • maxHTTPCost(): 最大 HTTP 请求耗时
  • avgHTTPCost(): 平均 HTTP 请求耗时
  • avgHTTPBodyDecodeCost(): 平均 HTTP 请求体解析耗时
  • getHttpDiscard(): HTTP 请求丢弃数
  • maxSendBatchMsgTPS(): 最大批量发送消息数 / 秒
  • avgSendBatchMsgTPS(): 平均批量发送消息数 / 秒
  • getSendBatchMsgNumSum(): 发送批量消息总数
  • getSendBatchMsgFailNumSum(): 发送批量消息失败总数
  • getSendBatchMsgFailRate(): 发送批量消息失败率
  • getSendBatchMsgDiscardNumSum(): 发送批量消息丢弃总数
  • maxSendMsgTPS(): 最大单条消息发送数 / 秒
  • avgSendMsgTPS(): 平均单条消息发送数 / 秒
  • getSendMsgNumSum(): 发送单条消息总数
  • getSendMsgFailNumSum(): 发送单条消息失败总数
  • getSendMsgFailRate(): 发送单条消息失败率
  • getReplyMsgNumSum(): 回复消息总数
  • getReplyMsgFailNumSum(): 回复消息失败总数
  • maxPushMsgTPS(): 最大推送消息数 / 秒
  • avgPushMsgTPS(): 平均推送消息数 / 秒
  • getHttpPushMsgNumSum(): HTTP 推送消息总数
  • getHttpPushFailNumSum(): HTTP 推送消息失败总数
  • getHttpPushMsgFailRate(): HTTP 推送消息失败率
  • maxHTTPPushLatency(): 最大 HTTP 推送延迟
  • avgHTTPPushLatency(): 平均 HTTP 推送延迟
  • getBatchMsgQueueSize(): 批量消息队列大小
  • getSendMsgQueueSize(): 发送消息队列大小
  • getPushMsgQueueSize(): 推送消息队列大小
  • getHttpRetryQueueSize(): HTTP 重试队列大小
  • avgBatchSendMsgCost(): 平均批量发送消息耗时
  • avgSendMsgCost(): 平均单条消息发送耗时
  • avgReplyMsgCost(): 平均回复消息耗时

TCP 指标:

  • getRetrySize(): 重试队列大小
  • getClient2eventMeshTPS(): 客户端到 EventMesh 的消息数 / 秒
  • getEventMesh2mqTPS(): EventMesh 到 MQ 的消息数 / 秒
  • getMq2eventMeshTPS(): MQ 到 EventMesh 的消息数 / 秒
  • getEventMesh2clientTPS(): EventMesh 到客户端的消息数 / 秒
  • getAllTPS(): 所有消息数 / 秒
  • getAllConnections(): 所有连接数
  • getSubTopicNum(): 订阅主题数

接口中定义了三个方法:

  1. preflight() 方法处理 OPTIONS 请求,与 /configuration 接口类似,以下接口将不再赘述。
  2. get(HttpExchange httpExchange) 方法处理 GET 请求,用于获取指标数据的汇总信息。该方法将 HTTP 和 TCP 的汇总指标数据封装进 GetMetricsResponse 对象,并将其序列化为 JSON 字符串,然后作为响应体返回。
  3. handle() 方法是接口的主要处理方法,与 /configuration 接口类似,以下接口将不再赘述。

/registry

位于 MetricsHandler 类,对应 eventmesh-dashboard 路径 /registry

用于获取 EventMesh 集群列表。

主要功能为从 Registry 对象中获取 EventMesh 集群信息,然后封装成 GetRegistryResponse 对象列表,并按照 EventMeshClusterName 进行排序。

/topic

位于 TopicHandler 类,对应 eventmesh-dashboard 路径 /topicCreate TopicRemove 按钮。

用于处理对主题的管理操作,包括获取主题列表 (GET)、创建主题 (POST) 和删除主题 (DELETE)。

/event

位于 EventHandler 类,对应 eventmesh-dashboard 路径 /event 及下拉框和 Create Event 按钮。

用于处理对事件的管理操作,包括获取事件列表 (GET) 和创建事件 (POST)。

其中 GET 方法支持以 topicName 为查询参数,并使用 offsetlength 参数分页查询。

/workflow

此接口用于获取、新增或删除工作流,但代码中没有此路径的请求映射,需要修复。对应 eventmesh-dashboard 路径 /workflow

/undefined/catalog

此接口用于获取、新增或删除活动目录,但代码中没有此路径的请求映射,需要修复。对应 eventmesh-dashboard 路径 /eventCatalogs

/client/tcp

位于 TCPClientHandler 类,对应 eventmesh-dashboard 路径 /tcp

用于处理与 TCP 客户端相关的管理操作,包括获取已连接的 TCP 客户端列表和删除 TCP 客户端(断开指定客户端的连接)。

list() 方法:处理 GET 请求

  • 设置 Content-Type 和跨域访问响应头
  • 遍历 EventMeshTCPServer 对象的 ClientSessionGroupMapping 和 Session 对象映射关系,将每个 Session 对象转换为 GetClientResponse 对象,并添加到列表 getClientResponseList 中。
  • 对 getClientResponseList 按主机和端口进行排序。
  • 将 getClientResponseList 转换为 JSON (result) 并发送响应。

delete() 方法:处理 DELETE 请求

  • 将请求体字符串 (request) 解析为 DeleteTCPClientRequest 对象。
  • 从 deleteTCPClientRequest 中获取要删除的主机 (host) 和端口 (port)。
  • 遍历 EventMeshTCPServer 对象的 ClientSessionGroupMapping 和 Session 对象映射关系,找到与要删除的主机和端口匹配的 Session 对象,并调用 EventMeshTcp2Client 的 serverGoodby2Client 方法,断开与该客户端的连接。
  • 设置 Content-Type 和跨域访问响应头,然后发送 200 状态码的空响应。

/client/http

位于 HTTPClientHandler 类,对应 eventmesh-dashboard 路径 /http

用于处理与 HTTP 客户端相关的管理操作,包括获取已连接的 HTTP 客户端列表和删除 HTTP 客户端。

/client/tcp 接口类似,不再赘述。

/client/grpc

位于 GrpcClientHandler 类,对应 eventmesh-dashboard 路径 /grpc

用于处理与 gRPC 客户端相关的管理操作,包括获取已连接的 gRPC 客户端列表和删除 gRPC 客户端。

/client/tcp 接口类似,不再赘述。

/webhook/insertWebHookConfig

位于 InsertWebHookConfigHandler 类。以下接口均暂无 eventmesh-dashboard 的对应功能。

用于将 WebHook 配置插入到系统中。

在处理 HTTP 请求时,首先发送成功的响应头信息,然后解析请求体中的数据为 WebHookConfig 对象,接着调用 WebHookConfigOperation 对象的 insertWebHookConfig 方法将配置插入系统中,并将操作结果序列化为字符串并返回。

/webhook/updateWebHookConfig/webhook/deleteWebHookConfig 接口功能与此接口类似,分别对应了更新和删除操作,不再赘述。

/webhook/queryWebHookConfigById

位于 QueryWebHookConfigByIdHandler 类。

用于根据 WebHook 配置 ID 查询相应的配置信息。

在处理 HTTP 请求时,首先发送成功的响应头信息,接着解析请求体中的数据为 WebHookConfig 对象,然后调用 WebHookConfigOperation 对象的 queryWebHookConfigById 方法查询配置信息,并将查询结果转换为 JSON 字符串写入输出流中返回给客户端。

/webhook/queryWebHookConfigByManufacturer 接口与此接口类似,用于根据 WebHook 提供方(如 Github)分页查询相应的配置信息,不再赘述。

/eventMesh/recommend

位于 QueryRecommendEventMeshHandler 类。

用于查询推荐的 EventMesh 节点。根据传入的 group 和 purpose 参数,计算推荐的 EventMesh 节点,并返回结果。

在处理 HTTP 请求时,首先获取请求 URI 中的查询字符串,解析为键值对形式的 Map。然后检查注册中心是否启用,如果未启用则抛出异常。接着检查 group 和 purpose 参数是否为空,不为空就在计算推荐结果后,将结果序列化并写入输出流中返回。

/clientManage/redirectClientByIpPort

位于 RedirectClientByIpPortHandler 类。

用于根据传入的 IP 地址和端口将匹配的客户端重定向到目标的 EventMesh 节点。

在处理 HTTP 请求时,首先获取请求 URI 中的查询字符串,解析为键值对形式的 Map。然后检查参数的合法性,接着获取 EventMeshTCPServer 对象的 ClientSessionGroupMapping 对象和 Session 对象的映射关系。然后遍历 sessionMap 中的每个 Session 对象,如果 Session 对象的客户端的主机和端口与传入的 ip 和 port 匹配,就调用重定向方法,并将结果追加到重定向结果中。

/clientManage/redirectClientByPath/clientManage/redirectClientBySubSystem 接口功能与此接口类似,分别以路径和子系统进行匹配,不再赘述。

/clientManage/rejectClientByIpPort

位于 RejectClientByIpPortHandler 类。

用于根据传入的 IP 地址和端口将匹配的客户端连接拒绝。

在处理 HTTP 请求时,首先获取请求 URI 中的查询字符串,解析为键值对形式的 Map。然后检查参数的合法性,接着遍历映射关系,如果匹配就拒绝客户端连接,并将成功拒绝的远程地址加入成功远程地址列表。

/clientManage/rejectClientByPath/clientManage/rejectClientBySubSystem 接口功能与此接口类似,分别以路径和子系统进行匹配,不再赘述。

/clientManage/rejectAllClient 接口则免去了匹配规则,直接拒绝所有客户端连接,同样不再赘述。

/clientManage/showClient

用于查询所有客户端信息,并统计每个子系统中的客户端数量。

/clientManage/showListenClientByTopic

位于 ShowListenClientByTopicHandler 类。

用于查询订阅特定主题的客户端信息。

对于每个客户端组,获取订阅了所查询的主题的会话集合 (listenSessionSet),并将组名和每个会话的进程 ID、IP 地址、端口号、路径和版本作为结果返回。

/clientManage/showClientBySystem 接口功能与此接口类似,以子系统进行匹配,不再赘述。

方法分析

此小节以 org.apache.eventmesh.admin.rocketmq 包中 /topicManage 接口的所有方法为例,进行关键代码逐行分析:

AdminController

  1. @Slf4j: Lombok 注解,用于在编译时自动为类生成一个名为 log 的静态日志记录器。
  2. public void run(HttpServer server) throws IOException: 在该方法中,首先通过调用 server.createContext(TOPIC_MANAGE_PATH, new TopicsHandler()) 创建上下文,并将其绑定到指定的路径 TOPIC_MANAGE_PATH 上。

TopicsHandler

handle()

此方法用于处理 HttpExchange 对象。

  1. 如果请求的 URL 匹配到 TOPIC_MANAGE_PATH,即满足 RequestMapping.postMapping(TOPIC_MANAGE_PATH, httpExchange) 条件,会调用 createTopicHandler(httpExchange) 方法来创建一个新的主题,并直接返回,不再执行后续的代码。
  2. 如果不匹配,就会首先获取 httpExchange 的输出流 out,用于向客户端发送响应数据。
  3. 然后调用 httpExchangesendResponseHeaders 方法,将响应状态码设置为 500,表示服务器内部错误。
  4. 接着通过 String.format 方法构造一个错误提示信息。
  5. 使用日志记录器以 error 级别打印错误信息。
  6. 最后将错误提示信息转换为字节数组,通过输出流 out 将其发送给客户端。
createTopicHandler()

此方法用于处理创建主题的请求。

  1. 使用 try-with-resources 语句,创建一个输出流 out,并获取 httpExchange 对象的响应体输出流。
  2. 调用 NetUtils.parsePostBody(httpExchange) 方法解析 HTTP 请求体,并将结果存储在 params 变量中。
  3. 使用 JsonUtils.parseObject(params, TopicCreateRequest.class) 方法将 params 转换为 TopicCreateRequest 对象,随后获取要创建的主题 topic
  4. 目前 topicResponse 对象的赋值为 null,因为这部分逻辑将来会实现一个新的 RocketMQ 服务来创建主题。
  5. topicResponse 转换为 JSON 字符串,写入输出流 out 中,并将结果赋值给 result

TopicCreateRequest

此类是用于表示创建主题的请求对象。

  1. @JsonInclude(JsonInclude.Include.NON_EMPTY): 此注解用于指定在将该对象转换为 JSON 格式时,忽略值为空的属性。
  2. @JsonIgnoreProperties(ignoreUnknown = true): 此注解用于指定在将 JSON 转换为该对象时,忽略除 topic 以外的其他属性。
  3. TopicCreateRequest(@JsonProperty("topic") String topic): 构造方法,通过 @JsonProperty("topic") 注解表示将传入的参数与 topic 属性进行绑定。

TopicResponse

此类是用于表示主题响应的对象,提供了对主题名和创建时间的操作方法,并重写了 toString 方法。

  1. topiccreatedTime 分别用于存储主题名和创建时间。
  2. 构造函数使用 @JsonCreator 注解进行标记,意为该构造函数可以用于反序列化 JSON 对象并创建 TopicResponse 实例。
  3. 构造函数的参数使用 @JsonProperty 注解进行标记,指定了对应 JSON 属性的名称。

RequestMapping

用于判断请求是否匹配。

  1. @UtilityClass: Lombok 注解,用于生成一个带有私有构造方法和静态方法的工具类。
  2. postMapping(String value, HttpExchange httpExchange): 用于处理 POST 请求,value 参数表示要匹配的 URL 路径,HttpExchange 参数表示 HTTP 请求的交换对象。返回一个布尔值,表示请求的 URL 路径是否与给定的 value 匹配。
  3. getMapping(),putMapping(),deleteMapping()postMapping() 方法类似。
  4. isUrlMatch(String value, HttpExchange httpExchange, String methodType): 用于判断请求的 URL 路径和方法类型是否与给定的 valuemethodType 匹配。
  5. 如果类型匹配,就通过 httpExchange.getRequestURI().getPath() 获取请求的 URL 路径。
  6. 使用 UrlMappingPattern 对象 matcher 来进行请求路径与 value 的匹配。

UrlMappingPattern

用于处理 URL 匹配和提取路径参数值的辅助类。

  1. Map<String, String> extractPathParameterValues(String url): 用于从给定的 URL 中提取路径参数的值。它使用 matcher.matches() 方法来判断给定的 URL 是否与模式匹配。如果匹配成功,则调用 extractParameters() 方法提取路径参数的值并返回一个包含参数名和对应值的 Map
  2. Map<String, String> extractParameters(Matcher matcher): 用于从 Matcher 对象中提取路径参数的值。通过 for 循环遍历 matcheri+1 个分组的值,即路径参数的值,随后将参数名和对应值存储在 values 中,最后返回包含路径参数和对应值的 Map 对象。

TopicResponseTest

用于对 TopicResponse 类进行单元测试的测试类。

  1. @Test: 用于标识测试方法的 JUnit 框架注解,JUnit 会自动识别并执行它们。
  2. testTopicResponse(): 用于测试 TopicResponse 类的构造函数和 getter 方法。首先创建一个 TopicResponse 对象,传入 topiccreatedTime 参数。然后使用 assertEquals() 断言来验证对象的 getTopic()getCreatedTime() 方法返回的值是否与输入的参数相等。
  3. testTopicResponseSerialization(): 用于测试 TopicResponse 类的序列化和反序列化。使用 ObjectMappertopicResponse 对象序列化为 JSON 字符串,接着使用 assertTrue() 断言来验证 JSON 字符串中是否包含”topic” 和”created_time” 字段,然后使用 ObjectMapper 对象将 JSON 字符串反序列化为 TopicResponse 对象 deserializedResponse,最后使用 assertEquals() 断言来验证反序列化后的对象的 getTopic()getCreatedTime() 方法返回的值是否与初始对象的值相等。

RequestMappingTest

testPostMapping()
  1. testPostMapping(): 用于测试 RequestMapping 类的 postMapping 方法。在该方法中,首先创建了一个 HttpExchange 对象的 mock 实例,用来模拟 HTTP 请求和响应的交互,然后使用 when() 方法和 thenReturn() 方法设置模拟对象的行为。
  2. testGetMapping(),testPutMapping(),testDeleteMapping()testPostMapping() 方法类似。

实施方案

注释

范围

对于课题预期任务而言,需要编写注释的范围在于 org.apache.eventmesh.admin.rocketmq 和 org.apache.eventmesh.runtime.admin 这两个包,对象为所有的方法。

格式

对于任何一个项目而言,尤其是开源项目,在撰写 JavaDoc 注释时,都需要注意以下方面,以确保注释全面且易于理解:

  1. 摘要(Summary):提供一个简洁但清晰的摘要,概括该方法或接口的主要功能和作用。
  2. 参数(Parameters):列出方法或接口接受的所有参数,并为每个参数提供描述。包括参数的名称、类型、是否可为空以及对参数的期望值或用法的说明。
  3. 返回值(Return Value):描述方法或接口的返回值。指明返回值的类型、可能的返回结果、异常情况或特殊条件等。
  4. 抛出(Throws):列出方法或接口可能会抛出的异常,并提供每个异常的类型、触发条件和处理建议。
  5. 示例(Examples):提供一个或多个示例,展示如何使用该方法或接口。可以包括参数设置、方法调用和预期结果的演示。
  6. 注意事项(Notes):说明任何与方法或接口相关的重要注意事项或限制。
  7. 作者(Author):标明编写该方法或接口的作者。
  8. 参考(See Also):指向与该方法或接口相关的其他文档、资源或类。
  9. 版本(Version):指明该方法或接口首次出现的版本号,并注明修改历史和版本更新。
  10. 修饰符(Modifiers):指明方法或接口的访问修饰符(例如 public、private、protected)和其他修饰符(例如 static、final)。
  11. 参数范围(Parameter Ranges):为每个参数提供有效范围或允许的取值范围。
  12. 线程安全性(Thread Safety):指明该方法或接口的线程安全性信息。例如是否可以在多线程环境中安全地调用。
  13. 依赖关系(Dependencies):列出方法或接口依赖的其他类、接口或资源。

具体来说,Apache EventMesh 的项目仓库中,在 style/checkStyle.xml 路径提供了代码样式检查文件,需要在 IntelliJ IDEA 中安装 CheckStyle-IDEA 插件配合使用,通过以下方式导入检查样式文件:

1
Editor -> Code Style -> Java -> Scheme -> Import Scheme -> CheckStyle Configuration

然后使用./gradlew check 来检查代码风格。

在这个代码样式文件中,规定了 Apache EventMesh 项目所偏好的 JavaDoc 注释风格,需要:

  1. 对齐形参说明

  2. 对齐抛出异常说明

  3. 在描述后空行

  4. 保留无效标签

  5. 保留空 @param 标签

  6. 保留空 @return 标签

  7. 保留空 @throws 标签

  8. 在右页边距处换行

  9. 启用前导星号

  10. 用 @throws 而不是 @exception

  11. 在空行中生成 <p>

  12. 保留空行

不需要:

  1. 在形参描述后空行
  2. 在 return 后空行
  3. 一行注释不分行
  4. 保留换行
  5. 在新行描述形参
  6. 缩进连续线

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package sample;

public class Sample {
/**
* 这是一个方法的描述,如果其长度长到超出右边界,
* <p>
* 就需要另起一行,在新的段落继续描述。
* <p/>
* 可以手动换行。
*
* @param i 简短命名的参数描述
* @param longParameterName 长命名的参数描述
* @param missingDescription 缺少描述
* @return 返回描述
* @throws XXXException 异常描述
* @throws YException 异常描述
* @throws ZException
* @invalidTag
*/
public abstract String sampleMethod(int i,
int longParameterName,
int missingDescription)
throws XXXException, YException, ZException;

/**
* 单行注释
*/
public abstract String sampleMethod2();

/**
* 简单方法描述
*
* @return
*/
public abstract String sampleMethod3();
}

提交形式

以一个功能所包含的文件与类为单位,在 apache/eventmesh 仓库新建一个 Issue,声明正在为哪个模块的哪个功能撰写注释,然后向 Pil0tXia/eventmesh 仓库的 pil0txia_doc_{ISSUE ID} 分支提交 Git Commit。当一个主要功能的全部接口和方法的 JavaDoc 注释均已撰写完成时,从该分支向 apache/eventmesh 仓库发起 Pull Request,并请求 Commiters 和 Maintainers 进行 Code Review,进行代码合并。

当拉取合并请求处于 Review 阶段时,我将从 apache/eventmesh 仓库 master 分支最新的提交拉取一个新的分支,并继续按照上述工作流新建 Issue、撰写注释,形成一个 Contributor 与 Reviewer 异步的贡献形式。

文档

范围

对于课题预期任务而言,需要编写文档的范围在于 org.apache.eventmesh.admin.rocketmq 和 org.apache.eventmesh.runtime.admin 这两个包,对象为所有的接口。

格式

Apache EventMesh 项目使用 Docusaurus 架构作为其官网文档的 Web 前端框架。此框架使用 Markdown 语言作为文档的撰写与解析语言。

在正式创建 md 文件之前,需要先思考接口文档在官网文档的侧边栏目录中的位置和组织形式,并且在 eventmesh-site/i18n/zh/docusaurus-plugin-content-docs/current.json 文件中添加属于接口文档的栏目。

eventmesh.apache.org 支持英文与中文两种语言,这两种语言的 Markdown 文件是分开存放的。在 eventmesh-site 仓库中:

  • docs/design-document 目录:该目录是英文版的文档目录,用于存放英文版的设计文档和其他相关文档。
  • i18n/zh/docusaurus-plugin-content-docs/current/design-document 目录:该目录是中文版的文档目录,用于存放中文版的设计文档和其他相关文档。i18n 表示国际化,zh 表示中文,docusaurus-plugin-content-docs 是 Docusaurus 生成的目录结构,current 表示当前版本的文档。

这两个目录中的层级是一样的,是为了支持多语言的文档展示。

在编写文档时,需要注意以下方面,以确保 Markdown 语法可以被正确地解析,并支持多种 Markdown 渲染器的排版:

  1. 目录结构:根据接口的层级结构或逻辑关系,创建一个清晰的目录结构。使用标题和子标题来组织接口文档,且标题层级不超过四级。
  2. 接口概述:对每个接口提供一个简要概述,描述其用途、输入和输出等关键信息。指明接口的名称、路径和 HTTP 方法。
  3. 参数说明:列出每个接口所需的参数,并提供参数的名称、类型、是否必需、取值范围以及示例值等信息。对于复杂的参数结构,可以使用表格或嵌套列表来清晰展示参数的层级关系和说明。
  4. 响应示例:提供一个或多个示例,展示接口的调用和返回结果。示例可以包括请求和响应的数据结构、状态码和消息等信息。对于可选的响应字段,也可以提供示例值。
  5. 异常处理:描述可能的错误情况和异常,以及相应的错误码和错误消息。提供每个异常的名称、描述和处理建议。
  6. 接口详情:为每个接口提供更详细的说明,包括接口的功能、用法、限制、注意事项和最佳实践等。可以使用段落、列表和代码块来组织和展示信息。
  7. 参考资料:提供与该模块或接口相关的其他文档、资源或链接。
  8. 更新记录:在文档中提供更新记录和重要变更,指明版本号、修改内容和日期。
  9. 示例代码:为关键接口或复杂场景提供示例代码。
  10. 格式和排版:使用代码块和强调样式等来保持一致的格式和排版。
  11. 图表和图像:可以使用图表、图像或流程图等可视化工具来说明接口的工作流程或数据流动。
  12. 文档导航:在 Docusaurus 官网上发布时,需要在整个网站上提供简单且直观的目录导航,使访问者能够轻松找到和浏览 admin 模块的接口文档。

在编写 Markdown 文档之前,我应该已经在接口的代码中撰写了注释,以便从代码中对照文档。

提交形式

以一个接口为单位,在 apache/eventmesh-site 仓库新建一个 Issue,声明正在为哪个接口撰写注释,然后向 Pil0tXia/eventmesh-site 仓库的 pil0txia_docs_{ISSUE ID} 分支提交 Git Commit。此处的分支名称与撰写注释任务的分支名称并不相同,发布于 Web 网站上的使用文档的英文通常使用 docs 表示,与承载撰写注释任务的 doc 分支作区分。

当一个主要功能的全部接口的 Markdown 文档均已撰写完成时,从该分支向 apache/eventmesh-site 仓库发起 Pull Request,并请求 Commiters 和 Maintainers 进行 Code Review,进行代码合并。

当拉取合并请求处于 Review 阶段时,我将从 apache/eventmesh-site 仓库 master 分支最新的提交拉取一个新的分支,并继续按照上述工作流新建 Issue、撰写注释,形成一个 Contributor 与 Reviewer 异步的贡献形式。

示例:获取系统配置信息

接口说明
  • 本接口用于获取当前 EventMesh 节点的基本配置信息,包括服务名称、服务环境和各协议监听端口地址等。
请求地址
1
GET http://localhost:10106/configuration
请求参数说明
参数 类型 必填 说明

请求示例:

1

返回参数说明
参数 类型 说明
sysID String 系统 ID,用于标识系统的唯一标识符。
namesrvAddr String Apache RocketMQ 的 NameServer 的 IP 地址和端口号。
eventMeshEnv String EventMesh 运行的环境,如生产环境或开发环境。
eventMeshIDC String EventMesh 所在的数据中心标识
eventMeshCluster String EventMesh 所属的集群名称
eventMeshServerIp String EventMesh 服务所在服务器的 IP 地址
eventMeshName String EventMesh 的名称
eventMeshWebhookOrigin String EventMesh Webhook 的默认来源
eventMeshServerSecurityEnable boolean 是否启用安全功能
eventMeshServerRegistryEnable boolean 是否启用注册功能
eventMeshTcpServerPort int32 监听 TCP 连接的端口号
eventMeshHttpServerPort int32 监听 HTTP 请求的端口号
eventMeshHttpServerUseTls boolean HTTP 请求是否启用 TLS 加密协议
eventMeshGrpcServerPort int32 监听 gRPC 请求的端口号
eventMeshGrpcServerUseTls boolean gRPC 请求是否启用 TLS 加密协议

返回结果示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"sysID": "0000",
"namesrvAddr": "127.0.0.1:8848",
"eventMeshEnv": "PRD",
"eventMeshIDC": "DEFAULT",
"eventMeshCluster": "COMMON",
"eventMeshServerIp": "10.0.34.190",
"eventMeshName": "EVENTMESH-runtime",
"eventMeshWebhookOrigin": "eventmesh.DEFAULT",
"eventMeshServerSecurityEnable": false,
"eventMeshServerRegistryEnable": false,
"eventMeshTcpServerPort": 10000,
"eventMeshHttpServerPort": 10105,
"eventMeshHttpServerUseTls": false,
"eventMeshGrpcServerPort": 10205,
"eventMeshGrpcServerUseTls": false
}
返回码说明
返回码 说明
200 查询成功
500 服务端错误

测试

范围

org.apache.eventmesh.admin.rocketmq 和 org.apache.eventmesh.runtime.admin 两个包,对象为所有的方法。

要求

测试代码的编写可以参考现有的测试文件。在单元测试时,需要注意以下方面:

  1. 输入验证:确保测试覆盖了各种可能的输入情况,包括边界值、无效值、空值和有效值。
  2. 接口状态:测试应该涵盖接口的各种状态和路径。例如,在测试 testTopicCreateRequestSetName 时,应该包括设置名称为 null 的情况以验证接口的响应。
  3. 异常情况:测试接口在异常情况下的行为。例如接口是否正确地处理了错误的输入或不正确的参数。
  4. 序列化和反序列化:对于涉及对象的序列化和反序列化的接口,应该编写测试来验证对象的正确序列化和反序列化。确保序列化后的数据包含所需的字段,并且在反序列化后可以正确还原为对象。
  5. 副作用和一致性:如果接口执行了一些副作用或对系统状态进行了更改,测试应该验证这些副作用是否按预期发生,并确保接口的行为一致。
  6. 测试覆盖率:尽量覆盖接口的各个路径和代码分支,以确保测试足够全面。使用代码覆盖工具(如 JaCoCo 和 Codecov 等)来评估测试的覆盖率,并尽量达到较高的覆盖率。
  7. 并发和性能:如果接口设计要求支持高并发或高性能,需要验证接口在高并发和高负载情况下的表现和性能。
  8. 引入依赖:在测试中正确模拟或提供依赖项。
  9. 可读性和可维护性:编写清晰、简洁、可读性强的测试代码,使用有意义的命名和注释,以便他人能够理解和维护测试。
  10. 持续集成和自动化:将测试集成到持续集成(CI)流程中,在每次代码提交时自动运行测试。

提交形式

以一个功能所包含的文件与类为单位,在 apache/eventmesh 仓库新建一个 Issue,声明正在为哪个模块的哪个功能编写测试代码,然后向 Pil0tXia/eventmesh 仓库的 pil0txia_test_{ISSUE ID} 分支提交 Git Commit。当一个主要功能的全部接口和方法的 JavaDoc 注释均已撰写完成时,从该分支向 apache/eventmesh 仓库发起 Pull Request,并请求 Commiters 和 Maintainers 进行 Code Review,进行代码合并。

当拉取合并请求处于 Review 阶段时,我将从 apache/eventmesh 仓库 master 分支最新的提交拉取一个新的分支,并继续按照上述工作流新建 Issue、撰写注释,形成 Contributor 与 Reviewer 异步的贡献形式。

增强

整合 admin 模块

虽然现有的 admin 模块对事件存储的管理接口耦合在 runtime 运行时中,但是 eventmesh-admin 模块中的接口与 org.apache.eventmesh.runtime.admin 包中的接口都遵循 controller 路由、handler 实现、request 实体、response 实体和 utils 工具类的基本布局,因此在技术层面上的整合更多的需要考虑与主线开发者的冲突问题和接口可用性问题。

与主线开发者的冲突问题

将 org.apache.eventmesh.runtime.admin 包中的接口移走必然会导致与其它开发者在此包中的修改产生 Git 冲突。

为了避免冲突数量过多、过于复杂、难以解决,我将在开始此任务前使用 git rebase 同步主线进度,并尽快完成所有迁移整合。

在开发阶段性完成时,我将再次使用变基合并。相比于全部整合完成后再使用 merge 合并,这种方式的好处在于单次合并冲突数量少、分支提交记录线性排列较为清晰、联系另一位开发者解决冲突的缓冲时间长、不容易影响工作进度。

接口可用性问题

接口在整合后可能会产生隐性的问题,尤其是与作用域相关的调用问题。为此,我将利用 IntelliJ IDEA 的 yFiles 图表功能,保留接口整合前各类注解、导入、抽象类和依赖包的引用关系截图,与整合后的引用关系相比较,确保作用域一致。

对于接口本身功能是否正常的自测,将使用 apache/eventmesh-dashboard 前端管理中心、Postman 和测试单元配合完成。

对于 eventmesh-dashboard 前端上具有的接口,将在获取数据格式后,使用 Postman 多次验证不同的传入值,并确保单元测试通过。

对于 eventmesh-dashboard 前端上不具有的接口,我将在上面方法的基础上,仔细分析代码,使用 Postman 得出正确的数据格式。

远程启停集群节点

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.PodResource;

public class EventMeshAdmin {
private static final String EVENTMESH_NAMESPACE = "k8s-namespace"; // Kubernetes命名空间
private static final String EVENTMESH_LABEL = "app=eventmesh"; // EventMesh标签

public static void main(String[] args) {
EventMeshAdmin admin = new EventMeshAdmin();
admin.startEventMeshNode("eventmesh-1"); // 启动指定的EventMesh节点
admin.stopEventMeshNode("eventmesh-2"); // 停止指定的EventMesh节点
}

public void startEventMeshNode(String nodeName) {
try (KubernetesClient client = new DefaultKubernetesClient()) {
// 获取符合标签选择器的Pod列表
PodList podList = client.pods().inNamespace(EVENTMESH_NAMESPACE).withLabel(EVENTMESH_LABEL).list();

// 查找指定节点的Pod并启动
for (Pod pod : podList.getItems()) {
if (pod.getMetadata().getName().equals(nodeName)) {
PodResource<Pod> podResource = client.pods()
.inNamespace(EVENTMESH_NAMESPACE)
.withName(pod.getMetadata().getName());

// 更新Pod的副本数量为1以启动
podResource.scale(1, true);
System.out.println("EventMesh node '" + nodeName + "' started.");
return;
}
}

System.out.println("EventMesh node '" + nodeName + "' not found.");
} catch (KubernetesClientException e) {
System.err.println("Error occurred while starting EventMesh node: " + e.getMessage());
}
}

public void stopEventMeshNode(String nodeName) {
try (KubernetesClient client = new DefaultKubernetesClient()) {
// 获取符合标签选择器的Pod列表
PodList podList = client.pods().inNamespace(EVENTMESH_NAMESPACE).withLabel(EVENTMESH_LABEL).list();

// 查找指定节点的Pod并停止它
for (Pod pod : podList.getItems()) {
if (pod.getMetadata().getName().equals(nodeName)) {
PodResource<Pod> podResource = client.pods()
.inNamespace(EVENTMESH_NAMESPACE)
.withName(pod.getMetadata().getName());

// 更新Pod的副本数量为0以停止
podResource.scale(0, true);
System.out.println("EventMesh node '" + nodeName + "' stopped.");
return;
}
}

System.out.println("EventMesh node '" + nodeName + "' not found.");
} catch (KubernetesClientException e) {
System.err.println("Error occurred while stopping EventMesh node: " + e.getMessage());
}
}
}
方案介绍

当 EventMesh 节点以 docker 方式部署在 Kubernetes 中时,可以使用以下思路远程启停集群节点:

  1. 客户端库:使用 Java Kubernetes 客户端库 fabric8io/kubernetes-client 与 Kubernetes 集群进行交互,管理 Pod、服务和副本集。
  2. 标签选择器:使用 Kubernetes 的标签选择器来选择具有特定标签的 Pod,即可对指定的 EventMesh 节点执行操作。
  3. 节点启停:使用 Pod 的副本数量来控制节点的启停,当 Pod 的副本数量为 1 或更多时就会启动节点,当副本数量为 0 时就会停止节点,有效利用了 Kubernetes 的横向扩展和自动伸缩特性。

在以上示例代码中,使用了 DefaultKubernetesClient 创建一个与 Kubernetes 集群的连接,并通过标签选择器获取与 app=eventmesh 匹配的 Pod 列表。然后遍历 Pod 列表,找到与指定节点名称相匹配的 Pod,并使用 PodResource 对象对该 Pod 进行启动或停止操作。

异常处理方面,使用了 try-with-resources 语句来自动关闭 KubernetesClient 连接,并在 catch 块中捕获 KubernetesClientException。

远程启停单机节点

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class EventMeshAdmin {
private static final String REMOTE_SERVER_IP = "eventmesh-server-ip"; // EventMesh服务器的IP地址
private static final String REMOTE_SERVER_USERNAME = "eventmesh-server-username"; // EventMesh服务器的用户名

public static void main(String[] args) {
EventMeshAdmin admin = new EventMeshAdmin();
admin.startEventMeshNode("eventmesh-1");
admin.stopEventMeshNode("eventmesh-2");
}

public void startEventMeshNode(String nodeName) {
try {
String command = "ssh " + REMOTE_SERVER_USERNAME + "@" + REMOTE_SERVER_IP + " sh /path/to/eventmesh-runtime/bin/start.sh " + nodeName;
executeCommand(command);
System.out.println("EventMesh node '" + nodeName + "' started.");
} catch (IOException e) {
System.err.println("Error occurred while starting EventMesh node: " + e.getMessage());
}
}

public void stopEventMeshNode(String nodeName) {
try {
String command = "ssh " + REMOTE_SERVER_USERNAME + "@" + REMOTE_SERVER_IP + " sh /path/to/eventmesh-runtime/bin/stop.sh " + nodeName;
executeCommand(command);
System.out.println("EventMesh node '" + nodeName + "' stopped.");
} catch (IOException e) {
System.err.println("Error occurred while stopping EventMesh node: " + e.getMessage());
}
}

private void executeCommand(String command) throws IOException {
ProcessBuilder processBuilder = new ProcessBuilder(command.split(" "));
Process process = processBuilder.start();

// 读取命令执行结果
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}

// 检查命令执行是否成功
try {
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new IOException("Command execution failed with exit code: " + exitCode);
}
} catch (InterruptedException e) {
throw new IOException("Command execution interrupted: " + e.getMessage());
}
}
}
方案介绍

在项目代码库中,有 start.sh 和 stop.sh 两个用于单机部署的 Shell 脚本,可以在本地环境启停 EventMesh 服务。相对于前面的 Kubernetes 客户端库方案,此方案适用于单机部署的场景。

以上示例代码主要通过远程执行脚本的方式实现启动和停止操作,远程服务器需要支持 SSH 服务,并具有脚本文件的可执行权限。其优点在于执行结果的输出将保持原样返回给调用方,可以得到更具体的报错信息,缺点在于无法灵活地运用中间件在集群部署中的优势,也无法利用 Kubernetes 客户端库实现节点的扩容和缩容、监控节点状态,获取节点日志等功能。

更新管理脚本

修改建议
  1. 提取通用的环境变量和 JVM 配置,放在脚本的顶部,便于统一管理和修改。比如 export LANG=en_US.UTF-8export LC_CTYPE=en_US.UTF-8export LC_ALL=en_US.UTF-8 这些语句,和 JAVA_OPT 中的一些通用配置项,可以放在脚本的开头。

  2. 在脚本中加入一些错误处理机制,比如在 JAVA 路径未找到时输出错误信息并退出。

  3. 使用 pgrep 命令优化获取进程 PID 的复杂度。

  4. 可以使用更简洁的方式创建日志文件夹。例如可以使用 mkdir -p "${EVENTMESH_LOG_HOME}" 一行代码替换当前的路径。

  5. 可以在启动命令前后添加一些额外的操作。例如,可以在启动命令执行之前打印一些额外的信息,或在启动命令执行之后将进程 ID 保存到文件中。

示例函数优化
1
2
3
4
5
6
7
8
9
10
11
12
13
function get_pid {
local ppid=""
if [ -f "${EVENTMESH_HOME}/bin/pid.file" ]; then
ppid=$(cat "${EVENTMESH_HOME}/bin/pid.file")
else
if [[ $OS =~ (Msys|Darwin) ]]; then
ppid=$(jps -v | awk -v pattern="org.apache.eventmesh.runtime.boot.EventMeshStartup" '$0 ~ pattern && $0 !~ /grep/ {print $1; exit}')
else
ppid=$(ps -C java -o user,pid,command --cols 99999 --no-header | awk -v home="$EVENTMESH_HOME" -v pattern="org.apache.eventmesh.runtime.boot.EventMeshStartup" '$0 ~ pattern && $0 !~ /grep/ && $0 ~ home {print $2; exit}')
fi
fi
echo "$ppid"
}

变更说明:

  1. 在文件路径中添加双引号,避免可能的空格或特殊字符导致的问题。
  2. 使用更简洁的语法将 $OS 的值匹配到 MsysDarwin
  3. 优化 jps 命令的使用,使用 awk 进行过滤和提取,避免多次使用 grep 命令。
  4. ps 命令中添加了 --no-header 去除标题行选项。
  5. 使用 awk 提取 PID 时,添加 $0 !~ /grep/ 条件来排除 grep 进程。
  6. 对于 ps 命令,添加 $0 ~ home 条件来确保进程命令行中包含 $EVENTMESH_HOME 路径。

开发 TBD 和 TODO

对于新增的功能,我将在熟悉需求后,自行建立业务场景,针对场景中的细节开发每一项对应功能,并编写单元测试,确保接口功能正常、可靠。

例如,在 org.apache.eventmesh.admin.rocketmq.util.UrlMappingPatternTest 中,有一个要求编写测试方法的 TODO:

1
//TODO : Fix me to test the method compile(). It is better using Mockito not PowerMockito.

使用 Mockito,并通过 Java 反射获取私有字段,撰写正则表达式,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testCompile() throws NoSuchFieldException, IllegalAccessException {
// Mock the compiledUrlMappingPattern field with reflection
Pattern mockedPattern = mock(Pattern.class);
Field compiledUrlMappingPatternField = urlMappingPattern.getClass().getDeclaredField("compiledUrlMappingPattern");
compiledUrlMappingPatternField.setAccessible(true);
compiledUrlMappingPatternField.set(urlMappingPattern, mockedPattern);

urlMappingPattern.compile();

// Verify that the compiledUrlMappingPattern field is updated
assertEquals(mockedPattern, compiledUrlMappingPatternField.get(urlMappingPattern));

// Verify that the mocked pattern is compiled with the expected regex
Mockito.verify(mockedPattern).compile("/test/([%\\w-.\\~!$&'\\(\\)\\*\\+,;=:\\[\\]@]+?)/path/([%\\w-.\\~!$&'\\(\\)\\*\\+,;=:\\[\\]@]+?)(?:\\?.*?)?$");
}

调试后,预期与实际相符,测试用例通过。

时间规划

每周时间安排

每周约 32 小时:

  • 周一至周五,每日 3 小时
  • 周末,每日 8 小时
  • 向导师汇报开发进度与安排,1 小时

项目进度

任务 时间
熟悉项目 7.1 - 7.7
编写 JavaDoc 注释 7.8 - 7.14
编写接口文档 7.15 - 7.21
测试现有接口功能 7.21 - 8.4
撰写中期报告 8.5 - 8.11
整合 admin 模块 8.12 - 8.25
增加管理命令 8.25 - 9.7
更新管理脚本 9.8 - 9.14
开发 TBD 和 TODO 9.15 - 9.21
撰写结题报告 9.22 - 10.5
弹性时间安排 10.5 - 10.11

个人简介

我是来自南京信息工程大学的夏天,大三,目前正在联想实习,承担 Spring Cloud + Kafka + Eureka 方面的后端开发工作。这是我的博客文档Github,日均 PV 400 左右,有些文章的谷歌 / 必应排名也比较高。

每每使用开源工具和框架,都很感谢开发者的付出。在我注册 Github 账号的第五年,我意识到自己应该真正地去研究透彻一个框架、参与一个社区、进行贡献,我也非常希望自己能在时间还算充裕的学生时代,多尝试一些新技术,抓住这个契机。

这么几天的探索下来,兴趣越发高涨,衷心希望能参加薛炜明导师您指导的 GLCC 课题。

联系方式:admin@pil0txia.com

未来展望

在后续的社区贡献中,我会深入理解产品定位,设想产品场景,主动发现增长点与增强点,持之以恒地作出贡献。

既然目前 EventMesh 已经支持了比较多的事件存储了,那么不同的事件存储之间的数据也可以支持互相传输,这样使用 EventMesh SDK 即可打通各事件存储。

不过,虽然 EventMesh 可以支持多种事件存储,但每一种事件存储的支持都需要使用连接器插件与事件存储耦合,且不同的事件存储需要使用不同的连接器。为了减少连接器插件的使用方法更新时业务侧的代码变动,可以开发一个支持统一各事件存储的通用连接器。形象化来说,其工作原理类似于 “插座”。业务侧只需要在各个事件存储的 Client SDK 中添加通用的连接器即可,而无需关注各事件存储或连接器插件在更新后的代码改动。EventMesh 方面需要适配不同的插件。也就是说,业务侧只需要对这个通用连接器负责,而原有的连接器插件只需要对 “插座” 负责,插座只需要对所有的事件存储负责,大幅降低了业务侧的代码改动频率。

中期考核答辩

8-15 更新:🆒

image-20230815163241345

结项考核答辩

10-08 更新:💯

image-20231008181351805