跳到主要内容

流式消息

本文介绍如何在 HarmonyOS IMLib SDK 中接收并拉取流式消息,涵盖消息监听、拉取流程、事件回调、常见结果码和摘要获取。

提示

流式消息从 26.5.0 版本开始支持。

流式消息简介

流式消息由业务服务器触发,融云 IM 服务器生成消息并下发到客户端 SDK。客户端收到流式消息后,可以根据业务需求发起内容拉取,并通过事件回调持续接收增量内容。

下图展示了完整的交互流程:

IMLib SDK 使用 StreamMessage 表示流式消息内容。StreamMessage 继承自 MessageContent,完整属性请参考 API 文档。

关键字段如下:

字段类型说明
contentstring流式消息内容。拉取过程中,该字段为当前已累加到此刻的完整文本内容。
typestring流式消息的文本格式或内容类型标记,由服务端定义并透传给业务层。
isCompleteboolean服务端是否已结束流式内容生成。
completeReasonnumber业务服务器下发的结束原因码,0 表示正常。
stopReasonnumber融云服务器下发的结束原因码,0 表示正常。
isSyncboolean客户端是否已完成流式内容拉取。
提示
  • 流式消息的消息类型标识为 RC:StreamMsg
  • SDK 不支持发送流式消息。流式消息只能由服务器下发,SDK 接收后通过 IMEnginesetMessageReceivedListener(listener) 回调传递给业务层。
  • 收到流式消息后,需要调用 IMEnginerequestStreamMessage(params) 接口拉取完整内容。

接收流式消息

SDK 不支持发送流式消息。开发者需要通过服务器接口触发流式消息,对应的接收者会收到该消息。

添加消息监听

应用可以通过 setMessageReceivedListener 设置消息接收监听,并在回调中判断是否收到流式消息。

TypeScript
IMEngine.getInstance().setMessageReceivedListener((message: Message, info: ReceivedInfo) => {
if (message.content instanceof StreamMessage) {
const streamMessage = message.content as StreamMessage;
// TODO: 根据 streamMessage.isSync 判断是否需要拉取完整内容
}
});

历史流式消息

SDK 接收到流式消息后,会将其存储到本地数据库。开发者调用获取历史消息相关接口时,可以查询到流式消息。

拉取流式消息

流式消息的 content 默认是首包携带的内容片段。开发者获取到流式消息后,可以调用 requestStreamMessage(params) 拉取完整内容。

添加流式消息拉取事件监听

在调用 requestStreamMessage(params) 前,建议先调用 addStreamMessageRequestEventListener(listener) 添加流式消息拉取事件监听,用于接收初始化、增量内容和完成事件。

TypeScript
const listener: StreamMessageRequestEventListener = {
onInit: (messageUid: string): void => {
// 流准备完成。若该消息之前异常中止,SDK 会在此时清空本地已累加的异常数据。
},
onDelta: (message: Message, chunkInfo: StreamMessageChunkInfo): void => {
// 收到一片增量数据。chunkInfo.content 为本片增量字符串。
// message.content 为 StreamMessage,content 字段已包含到此片为止的全量文本。
},
onComplete: (messageUid: string, code: EngineError): void => {
// 流结束(成功或失败),可根据 code 处理结果。
}
};

IMEngine.getInstance().addStreamMessageRequestEventListener(listener);

流式消息的拉取过程包含 onInitonDeltaonComplete 三类事件,分别对应监听器的以下方法:

TypeScript
interface StreamMessageRequestEventListener {
onInit?(messageUid: string): void;
onDelta?(message: Message, chunkInfo: StreamMessageChunkInfo): void;
onComplete?(messageUid: string, code: EngineError): void;
}

参数说明如下:

参数类型说明
messageUidstring流式消息的服务端 UID。
messageMessage最新累加后的流式消息。
chunkInfoStreamMessageChunkInfo当前流式消息片段信息,content 为本片增量内容。
codeEngineError流式消息拉取结束结果码。
提示
  • SDK 全局共用同一个流式消息拉取事件监听队列。所有通过 requestStreamMessage 触发的事件都会广播给已添加的监听器。
  • 重复添加同一个监听器对象时,SDK 只会添加一次。
  • addStreamMessageRequestEventListenerremoveStreamMessageRequestEventListener 需要配对使用,避免监听器长期持有导致内存泄漏。

发起拉取流式消息请求

获取到流式消息后,可以通过 isSync 判断客户端是否已完成流式内容拉取。如果 isSyncfalse,可以按需发起拉取请求。客户端 SDK 最多允许同时发起 3 个流式数据请求,超出后会返回错误码。

TypeScript
const params = new StreamMessageRequestParams();
params.messageUid = message.messageUid;

IMEngine.getInstance().requestStreamMessage(params).then(result => {
if (EngineError.Success !== result.code) {
// 启动失败:参数错误、并发请求数量超限、消息已同步等。
return;
}

// 启动成功,等待 StreamMessageRequestEventListener 推送 onInit / onDelta / onComplete。
});

StreamMessageRequestParams 字段说明如下:

字段类型说明
messageUidstring流式消息服务端 UID,必填。
提示
  • requestStreamMessage(params) 返回的 Promise 仅表示请求是否通过连接状态、参数和并发数量等启动校验。真正的流式内容与最终结果请通过 StreamMessageRequestEventListener 获取。
  • 调用 requestStreamMessage(params) 前,请先调用 addStreamMessageRequestEventListener(listener) 添加监听器,否则业务层可能无法接收流式事件。
  • 如果 messageUid 为空,SDK 会直接返回 EngineError.InvalidArgumentMessageUid,不会下沉到 Native 层发起请求。

移除流式消息拉取事件监听

页面销毁或不再需要接收流式事件时,请调用 removeStreamMessageRequestEventListener(listener) 移除监听器。

TypeScript
IMEngine.getInstance().removeStreamMessageRequestEventListener(listener);

常见结果码

结果码说明
EngineError.StreamMessageDisable流式消息功能未开启。
EngineError.StreamMessageRequestCountExceeded同时发起的流式消息请求超过 3 个。
EngineError.StreamMessageRequestReset拉取流式消息时被重置。
EngineError.StreamMessageRequestFail拉取流式消息请求失败。
EngineError.StreamMessageRequestInProgress流式消息正在请求中。
EngineError.StreamMessageSynced流式消息已同步完成。
EngineError.StreamMessageDeltaTimeOver超过 30 秒未收到数据,拉取已结束。
EngineError.StreamMessageDeltaTimeLimit流式数据持续时间超过 30 分钟,已被截断。
EngineError.StreamMessageReviewFailed审核未通过,流数据被删除。
EngineError.StreamMessageDataTruncated流式数据大于 128 KB,已被截断。
EngineError.StreamMessageServerInternalError流式消息服务内部异常。

流式消息摘要

当服务器完成流式内容生成后,会通过消息扩展的方式通知客户端消息摘要。流式消息摘要默认使用扩展 key RC_Ext_StreamMsgSummary

注册消息扩展更新监听

开发者可以通过 setMessageExpansionListener(listener) 注册消息扩展更新监听,并在收到扩展更新时读取流式消息摘要。

TypeScript
IMEngine.getInstance().setMessageExpansionListener({
onMessageExpansionUpdate: (expansion: Map<string, string>, message: Message): void => {
if (message.content instanceof StreamMessage) {
const summary = expansion.get('RC_Ext_StreamMsgSummary');
// TODO: 处理流式消息摘要
}
},
onMessageExpansionRemove: (keyArray: Array<string>, message: Message): void => {
}
});

获取历史流式消息摘要

客户端收到消息扩展更新后,会将摘要信息存储到本地数据库。开发者可以结合流式消息的 isComplete 字段,从消息的 expansion 中读取历史消息摘要。

TypeScript
if (message.content instanceof StreamMessage) {
const streamMessage = message.content as StreamMessage;
if (streamMessage.isComplete) {
const summary = message.expansion?.get('RC_Ext_StreamMsgSummary');
// TODO: 处理历史流式消息摘要
}
}