SpringMVC流式返回数据
2025-04-06 00:28:51 5
@GetMapping("/get")
public ResponseBodyEmitter get(@RequestParam String question) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
threadPool.submit(() -> {
try {
String prompt = "...";
Flux<ChatResponse> responseFlux = chatclient.prompt(prompt)
.user(question)
.stream().chatResponse();
responseFlux.subscribe(chatResponse -> {
try {
AssistantMessage output = chatResponse.getResult().getOutput();
emitter.send(output);
} catch (Exception e) {
emitter.completeWithError(e);
log.error(e.getMessage(), e);
}
}, emitter::completeWithError, emitter::complete);
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
前端根据数据的格式处理即可
const eventSource = new EventSource("/get?question=Hello");
eventSource.onmessage = function(event) {
const chatOutput = document.getElementById("chat-output");
chatOutput.innerHTML += `<p>${event.data}</p>`;
};
eventSource.onerror = function(event) {
console.error("EventSource failed:", event);
eventSource.close();
};
fetch api
export const postStream = async () => {
const abortController = new AbortController(); // 创建 AbortController 实例
const signal = abortController.signal; // 获取 signal 对象
try {
const postData = {
key1: 'value1',
key2: 'value2',
};
let response = await fetch("/api/admin/common/testStream", {
method: 'POST', // 使用 POST 方法
headers: {
'Content-Type': 'application/json', // 设置请求头
'Authorization': 'Bearer your_token_here', // 如果需要认证,可以添加 Authorization 头
// 其他自定义请求头
},
body: JSON.stringify(postData), // 将 JSON 数据转换为字符串
signal: signal, // 传递 signal 以支持取消
});
console.log(response);
if (!response.ok) {
throw new Error("Network response was not ok");
}
const body = response.body as ReadableStream<Uint8Array<ArrayBufferLike>>;
const reader = body.getReader();
const textDecoder = new TextDecoder();
let result = true;
let output = "";
while (result) {
const { done, value } = await reader.read();
if (done) {
console.log("Stream ended");
result = false;
break;
}
const chunkText = textDecoder.decode(value);
output += chunkText;
console.log("Received chunk:", chunkText);
}
} catch (e) {
if (e.name === 'AbortError') {
console.log("请求已被取消"); // 捕获取消请求的错误
} else {
console.log(e); // 捕获其他错误
}
}
// 暴露 abort 方法,供外部调用取消请求
return () => abortController.abort();
};
如果经过了代理层, 比如nginx, 需要配置禁用缓冲
proxy_buffering off;