8 Commits

Author SHA1 Message Date
6aab4fb44b 120 timeout if only one upstream 2023-11-08 20:42:36 +08:00
b932331bdc 20s timeout for large request 2023-11-08 20:18:55 +08:00
a2c6fa32ed default 60s timeout 2023-11-08 18:42:35 +08:00
9d8e76bd2d do not parse unused body 2023-11-08 18:40:31 +08:00
ebc04228c5 update readme 2023-11-02 09:40:10 +08:00
3c4c2b5660 fix error response code 2023-11-01 17:53:57 +08:00
a3fff93f2e hasResponse on error 2023-11-01 17:34:11 +08:00
c90a18d380 record model name 2023-11-01 17:02:30 +08:00
5 changed files with 47 additions and 57 deletions

View File

@@ -1,13 +1,16 @@
# openai-api-route 文档 # openai-api-route 文档
这是一个 OpenAI API 负载均衡的简易工具,使用 golang 原生 reverse proxy 方法转发请求到 OpenAI 上游 这是一个 OpenAI API 负载均衡的简易工具,使用 golang 原生 reverse proxy 方法转发请求到 OpenAI 上游。遇到上游返回报错或请求超时会自动按顺序选择下一个上游进行重试,直到所有上游都请求失败。
功能包括: 功能包括:
- 更改 Authorization 验证头 - 自定义 Authorization 验证头
- 多种负载均衡策略 - 支持所有类型的接口 (`/v1/*`)
- 记录完整的请求内容、IP 地址、响应时间以及 GPT 回复文本 - 提供 Prometheus Metrics 统计接口 (`/v1/metrics`)
- 上游返回错误时发送 飞书 或 Matrix 消息通知 - 按照定义顺序请求 OpenAI 上游
- 识别 ChatCompletions Stream 请求,针对 Stream 请求使用 5 秒超时。对于其他请求使用60秒超时。
- 记录完整的请求内容、使用的上游、IP 地址、响应时间以及 GPT 回复文本
- 请求出错时发送 飞书 或 Matrix 消息通知
本文档详细介绍了如何使用负载均衡和能力 API 的方法和端点。 本文档详细介绍了如何使用负载均衡和能力 API 的方法和端点。
@@ -91,22 +94,4 @@ Usage of ./openai-api-route:
./openai-api-route -add -sk sk-xxxxx -endpoint https://api.openai.com/v1 ./openai-api-route -add -sk sk-xxxxx -endpoint https://api.openai.com/v1
``` ```
您也可以使用 `/admin/upstreams` 的 HTTP 接口进行控制 另外,您还可以直接编辑数据库中的 `openai_upstreams` 表进行 OpenAI 上游的增删改查管理。改动的上游需要重启负载均衡服务后才能生效
另外,您还可以直接编辑数据库中的 `openai_upstreams` 表。
## 身份验证
### 身份验证中间件流程
1. 从请求头中获取`Authorization`字段的值。
2. 检查`Authorization`字段的值是否以`"Bearer"`开头。
- 如果不是,则返回错误信息:"authorization header should start with 'Bearer'"HTTP 状态码 403
3. 去除`Authorization`字段值开头的`"Bearer"`和前后的空格。
4. 将剩余的值与预先设置的身份验证配置进行比较。
- 如果不匹配,则返回错误信息:"wrong authorization header"HTTP 状态码 403
5. 如果身份验证通过,则返回`nil`。
## 上游管理
没什么好说的,直接操作数据库 `openai_upstreams` 表,改动立即生效

View File

@@ -133,6 +133,10 @@ func main() {
shouldResponse := index == len(upstreams)-1 shouldResponse := index == len(upstreams)-1
if len(upstreams) == 1 {
upstream.Timeout = 120
}
err = processRequest(c, &upstream, &record, shouldResponse) err = processRequest(c, &upstream, &record, shouldResponse)
if err != nil { if err != nil {
log.Println("Error from upstream", upstream.Endpoint, "should retry", err) log.Println("Error from upstream", upstream.Endpoint, "should retry", err)
@@ -147,8 +151,8 @@ func main() {
if db.Create(&record).Error != nil { if db.Create(&record).Error != nil {
log.Println("Error to save record:", record) log.Println("Error to save record:", record)
} }
if record.Status != 200 && record.Response != "context canceled" { if record.Status != 200 {
errMessage := fmt.Sprintf("IP: %s request all upstreams error %d with %s", record.IP, record.Status, record.Response) errMessage := fmt.Sprintf("IP: %s request %s error %d with %s", record.IP, record.Model, record.Status, record.Response)
go sendFeishuMessage(errMessage) go sendFeishuMessage(errMessage)
go sendMatrixMessage(errMessage) go sendMatrixMessage(errMessage)
} }

View File

@@ -55,16 +55,23 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
// record chat message from user // record chat message from user
record.Body = string(inBody) record.Body = string(inBody)
requestBody, requestBodyOK := ParseRequestBody(inBody) requestBody, requestBodyOK := ParseRequestBody(inBody)
// record if parse success
if requestBodyOK == nil {
record.Model = requestBody.Model
}
// set timeout, default is 5 second // set timeout, default is 60 second
timeout := 5 * time.Second timeout := 60 * time.Second
if requestBodyOK == nil && requestBody.Stream {
timeout = 5 * time.Second
}
if len(inBody) > 1024*128 {
timeout = 20 * time.Second
}
if upstream.Timeout > 0 { if upstream.Timeout > 0 {
// convert upstream.Timeout(second) to nanosecond // convert upstream.Timeout(second) to nanosecond
timeout = time.Duration(upstream.Timeout) * time.Second timeout = time.Duration(upstream.Timeout) * time.Second
} }
if requestBodyOK == nil && !requestBody.Stream {
timeout = 60 * time.Second
}
// timeout out request // timeout out request
go func() { go func() {
@@ -72,6 +79,9 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
if !haveResponse { if !haveResponse {
log.Println("Timeout upstream", upstream.Endpoint) log.Println("Timeout upstream", upstream.Endpoint)
errCtx = errors.New("timeout") errCtx = errors.New("timeout")
if shouldResponse {
c.AbortWithError(502, errCtx)
}
cancel() cancel()
} }
}() }()
@@ -95,7 +105,6 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
var contentType string var contentType string
proxy.ModifyResponse = func(r *http.Response) error { proxy.ModifyResponse = func(r *http.Response) error {
haveResponse = true haveResponse = true
log.Println("haveResponse set to true")
record.Status = r.StatusCode record.Status = r.StatusCode
if !shouldResponse && r.StatusCode != 200 { if !shouldResponse && r.StatusCode != 200 {
log.Println("upstream return not 200 and should not response", r.StatusCode) log.Println("upstream return not 200 and should not response", r.StatusCode)
@@ -124,10 +133,9 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
return nil return nil
} }
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
haveResponse = true
log.Println("Error", err, upstream.SK, upstream.Endpoint) log.Println("Error", err, upstream.SK, upstream.Endpoint)
log.Println("debug", r)
errCtx = err errCtx = err
// abort to error handle // abort to error handle

View File

@@ -15,6 +15,7 @@ type Record struct {
CreatedAt time.Time CreatedAt time.Time
IP string IP string
Body string `gorm:"serializer:json"` Body string `gorm:"serializer:json"`
Model string
Response string Response string
ElapsedTime time.Duration ElapsedTime time.Duration
Status int Status int

View File

@@ -4,23 +4,15 @@ import (
"encoding/json" "encoding/json"
) )
type Message struct {
Content string `json:"content"`
Role string `json:"role"`
}
type RequestBody struct { type RequestBody struct {
Model string `json:"model"` Model string `json:"model"`
Messages []Message `json:"messages"` Stream bool `json:"stream"`
Stream bool `json:"stream"`
Temperature float64 `json:"temperature"`
TopP int64 `json:"top_p"`
PresencePenalty float64 `json:"presence_penalty"`
FrequencyPenalty float64 `json:"frequency_penalty"`
} }
func ParseRequestBody(data []byte) (RequestBody, error) { func ParseRequestBody(data []byte) (RequestBody, error) {
ret := RequestBody{} ret := RequestBody{
Stream: false,
}
var requestBody RequestBody var requestBody RequestBody
err := json.Unmarshal(data, &requestBody) err := json.Unmarshal(data, &requestBody)