timeout for out going request
This commit is contained in:
27
process.go
27
process.go
@@ -11,8 +11,10 @@ import (
|
|||||||
"net/http/httputil"
|
"net/http/httputil"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, shouldResponse bool) error {
|
func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, shouldResponse bool) error {
|
||||||
@@ -29,11 +31,24 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
|
|||||||
c.AbortWithError(500, errors.New("can't parse reverse proxy remote URL"))
|
c.AbortWithError(500, errors.New("can't parse reverse proxy remote URL"))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set timeout, default is 5 second
|
||||||
|
timeout := 5 * time.Second
|
||||||
|
haveResponse := false
|
||||||
|
if upstream.Timeout > 0 {
|
||||||
|
// convert upstream.Timeout(second) to nanosecond
|
||||||
|
timeout = time.Duration(upstream.Timeout) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
proxy := httputil.NewSingleHostReverseProxy(remote)
|
proxy := httputil.NewSingleHostReverseProxy(remote)
|
||||||
proxy.Director = nil
|
proxy.Director = nil
|
||||||
var inBody []byte
|
var inBody []byte
|
||||||
proxy.Rewrite = func(proxyRequest *httputil.ProxyRequest) {
|
proxy.Rewrite = func(proxyRequest *httputil.ProxyRequest) {
|
||||||
in := proxyRequest.In
|
in := proxyRequest.In
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
proxyRequest.Out = proxyRequest.Out.WithContext(ctx)
|
||||||
|
|
||||||
out := proxyRequest.Out
|
out := proxyRequest.Out
|
||||||
|
|
||||||
// read request body
|
// read request body
|
||||||
@@ -46,6 +61,16 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
|
|||||||
// record chat message from user
|
// record chat message from user
|
||||||
record.Body = string(inBody)
|
record.Body = string(inBody)
|
||||||
|
|
||||||
|
// timeout out request
|
||||||
|
go func() {
|
||||||
|
time.Sleep(timeout)
|
||||||
|
if !haveResponse {
|
||||||
|
log.Println("Timeout", upstream.Endpoint)
|
||||||
|
errCtx = errors.New("timeout")
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
out.Body = io.NopCloser(bytes.NewReader(inBody))
|
out.Body = io.NopCloser(bytes.NewReader(inBody))
|
||||||
|
|
||||||
out.Host = remote.Host
|
out.Host = remote.Host
|
||||||
@@ -64,6 +89,8 @@ func processRequest(c *gin.Context, upstream *OPENAI_UPSTREAM, record *Record, s
|
|||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
var contentType string
|
var contentType string
|
||||||
proxy.ModifyResponse = func(r *http.Response) error {
|
proxy.ModifyResponse = func(r *http.Response) error {
|
||||||
|
haveResponse = true
|
||||||
|
log.Println("haveResponse set to true")
|
||||||
record.Status = r.StatusCode
|
record.Status = r.StatusCode
|
||||||
if !shouldResponse && r.StatusCode != 200 {
|
if !shouldResponse && r.StatusCode != 200 {
|
||||||
log.Println("upstream return not 200 and should not response", r.StatusCode)
|
log.Println("upstream return not 200 and should not response", r.StatusCode)
|
||||||
|
|||||||
@@ -9,4 +9,5 @@ type OPENAI_UPSTREAM struct {
|
|||||||
gorm.Model
|
gorm.Model
|
||||||
SK string `gorm:"index:idx_sk_endpoint,unique"` // key
|
SK string `gorm:"index:idx_sk_endpoint,unique"` // key
|
||||||
Endpoint string `gorm:"index:idx_sk_endpoint,unique"` // endpoint
|
Endpoint string `gorm:"index:idx_sk_endpoint,unique"` // endpoint
|
||||||
|
Timeout int64 // timeout in seconds
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user