OpenAI SDK开发(2) okhttp3的sse流式应答设计

本次开发完成了流式应答,主要使用的就是okhttp3的eventsource

流程验证

跟之前的主要区别就是需要使用eventsource来监听流式应答,要将ChatCompletionRequest中的stream参数设置为true,以okHttpClient开启EventSource Factory,以全新的request格式传递数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Test
public void test_client_stream() throws JsonProcessingException, InterruptedException {
HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor();
httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);

OkHttpClient okHttpClient = new OkHttpClient
.Builder()
.addInterceptor(httpLoggingInterceptor)
.addInterceptor(chain -> {
Request original = chain.request();

// 从请求中获取 token 参数,并将其添加到请求路径中
HttpUrl url = original.url().newBuilder()
.addQueryParameter("token", "xxx")
.build();

Request request = original.newBuilder()
.url(url)
.header(Header.AUTHORIZATION.getValue(), "Bearer " + "xxx")
.header(Header.CONTENT_TYPE.getValue(), ContentType.JSON.getValue())
.method(original.method(), original.body())
.build();
return chain.proceed(request);
})
.build();

Message message = Message.builder().role(Constants.Role.USER).content("写一个java冒泡排序").build();
ChatCompletionRequest chatCompletion = ChatCompletionRequest
.builder()
.messages(Collections.singletonList(message))
.model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getCode())
.stream(true)
.build();

EventSource.Factory factory = EventSources.createFactory(okHttpClient);
String requestBody = new ObjectMapper().writeValueAsString(chatCompletion);//将chatCompletion转为json格式

Request request = new Request.Builder()
.url("xxx")
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
//MediaType.parse用来声明浏览器将以什么形式、什么编码对requestBody进行解析
.build();

EventSource eventSource = factory.newEventSource(request, new EventSourceListener() {
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
log.info("测试结果:{}", data);
}
});

// 等待
new CountDownLatch(1).await();
}

添加方法

在OpenAiSession接口中添加如下方法,在传入参数为ChatCompletionRequest和EventSourceListener时采用流式应答

1
2
3
4
5
6
7
/**
* 问答模型 GPT-3.5/4.0 & 流式反馈
* @param chatCompletionRequest 请求信息
* @param eventSourceListener 实现监听;通过监听的 onEvent 方法接收数据
* @return 返回结果
*/
EventSource chatCompletions(ChatCompletionRequest chatCompletionRequest, EventSourceListener eventSourceListener) throws JsonProcessingException;

实现

添加DefaultOpenAiSession类中的chatCompletions方法,实现流式应答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public EventSource chatCompletions(ChatCompletionRequest chatCompletionRequest, EventSourceListener eventSourceListener) throws JsonProcessingException {
// 核心参数校验;不对用户的传参做更改,只返回错误信息。
if (!chatCompletionRequest.isStream()) {
throw new RuntimeException("illegal parameter stream is false!");
}

// 构建请求信息
Request request = new Request.Builder()
// url: https://api.openai.com/v1/chat/completions - 通过 IOpenAiApi 配置的 POST 接口,用这样的方式从统一的地方获取配置信息
.url(configuration.getApiHost().concat(IOpenAiApi.v1_chat_completions))
// 封装请求参数信息,如果使用了 Fastjson 也可以替换 ObjectMapper 转换对象
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), new ObjectMapper().writeValueAsString(chatCompletionRequest)))
.build();

// 返回结果信息;EventSource 对象可以取消应答
return factory.newEventSource(request, eventSourceListener);
}