Skip to content

Commit

Permalink
Client gr leak (#17)
Browse files Browse the repository at this point in the history
* fix: add multi params support

* fix : remove ci auto build

* fix: rm ql

* fix: init triple status api

* fix: fix linter

* fix: update gost version

* fix: fmt

* fix: remove unused file

* fix client gr leak
  • Loading branch information
LaurenceLiZhixin authored Aug 11, 2021
1 parent 41ab629 commit afec806
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions pkg/http2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ func (h *Client) Post(addr, path string, data []byte, opts *config.PostConfig) (

fromFrameHeaderDataSize := uint32(0)

splitedDataChain := make(chan message.Message)
splitedDataChan := make(chan message.Message)

go func() {
defer close(splitedDataChan)
for {
select {
case <-readDone:
// [timeout close], timeout and close by force
return
default:
}
Expand All @@ -161,13 +163,13 @@ func (h *Client) Post(addr, path string, data []byte, opts *config.PostConfig) (
if err != nil {
if err.Error() != "EOF" {
h.logger.Errorf("dubbo3 unary invoke read error = %v\n", err)
return
}
continue
// [normal close], read finished or no read body, return
return
}
splitedData := make([]byte, n)
copy(splitedData, readBuf[:n])
splitedDataChain <- message.Message{
splitedDataChan <- message.Message{
Buffer: bytes.NewBuffer(splitedData),
}
}
Expand All @@ -176,17 +178,21 @@ func (h *Client) Post(addr, path string, data []byte, opts *config.PostConfig) (
// get trailer chan from http2
trailerChan := rsp.Body.(*h2Triple.ResponseBody).GetTrailerChan()
var trailer http.Header
recvTrailer := false
Loop:
for {
select {
case dataMsg := <-splitedDataChain:
case dataMsg := <-splitedDataChan:
if dataMsg.Buffer == nil {
// read finished with empty body, maybe error status
// [normal close]
break Loop
}
splitedData := dataMsg.Buffer.Bytes()
if fromFrameHeaderDataSize == 0 {
// should parse data frame header first
var totalSize uint32
if splitedData, totalSize = h.frameHandler.Frame2PkgData(splitedData); totalSize == 0 {
close(readDone)
// [normal close]
break Loop
} else {
fromFrameHeaderDataSize = totalSize
Expand All @@ -200,34 +206,42 @@ Loop:
}

if splitBuffer.Len() == int(fromFrameHeaderDataSize) {
close(readDone)
// [normal close]
break Loop
}
case trailer = <-trailerChan:
recvTrailer = true
//http2StatusCode, _ := strconv.Atoi(tra.Get(constant.TrailerKeyHttp2Status))
//if http2StatusCode != 0 {
// // todo deal with http2 error
//}
break Loop
case <-timeoutTicker:
// close reading loop ablove
// timeout is a design of graceful shutdown
// 1. close readDone chan, to make sure read go routine would exist after next loop
close(readDone)
// set timeout flag

// 2. drain splitedDataChan, to make sure read go routine would not block by splitedDataChan,
// as there would not be read action from it.
select {
case <-splitedDataChan:
default:
}

// 3. set timeout flag
timeoutFlag = true
break Loop
}
}

if timeoutFlag {
h.logger.Error("unary call" + path + " with addr = " + addr + " timeout")
return nil, nil, perrors.Errorf("unary call %s timeout", path)
h.logger.Error("http2 unary call" + path + " with addr = " + addr + " timeout")
return nil, nil, perrors.Errorf("http2 unary call %s timeout", path)
}

// todo start ticker to avoid trailer timeout
if !recvTrailer {
// if not receive err trailer, wait until recv
trailer = rsp.Body.(*h2Triple.ResponseBody).GetTrailer()
select {
case trailer = <-trailerChan:
break
case <-timeoutTicker:
timeoutFlag = true
}

if timeoutFlag {
h.logger.Error("http2 unary call" + path + " with addr = " + addr + " timeout")
return nil, nil, perrors.Errorf("http2 unary call %s timeout", path)
}

return splitBuffer.Bytes(), trailer, nil
Expand Down

0 comments on commit afec806

Please sign in to comment.