From 30c3783a79a3708e6cfee8ae4ea943e09ca93567 Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Wed, 6 Sep 2023 13:39:16 +0800 Subject: [PATCH 1/3] fix(docker compose): add environment to elasticsearch --- docker-compose.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 048ec03..c6b9c27 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -104,6 +104,7 @@ services: - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node + - cluster.routing.allocation.disk.threshold_enabled=false ulimits: memlock: soft: -1 From ceffd5c03416bdb56ef055fa20eabe58e7aa78b0 Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Sat, 9 Sep 2023 01:17:10 +0800 Subject: [PATCH 2/3] fix(msgconsummer): duplicated message saving --- src/services/msgconsumer/main.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/services/msgconsumer/main.go b/src/services/msgconsumer/main.go index cfc8749..47b3362 100644 --- a/src/services/msgconsumer/main.go +++ b/src/services/msgconsumer/main.go @@ -214,6 +214,20 @@ func saveMessage(channel *amqp.Channel) { ctx, span := tracing.Tracer.Start(ctx, "MessageSendService") logger := logging.LogService("MessageSend").WithContext(ctx) + // Check if it is a re-publish message + retry, ok := body.Headers["x-retry"].(int32) + if ok || retry >= 1 { + err := body.Ack(false) + if err != nil { + logger.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when dealing with the message...") + logging.SetSpanError(span, err) + } + span.End() + continue + } + if err := json.Unmarshal(body.Body, &message); err != nil { logger.WithFields(logrus.Fields{ "from_id": message.FromUserId, @@ -280,10 +294,10 @@ func saveMessage(channel *amqp.Channel) { if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("Error when dealing with the message...3") + }).Errorf("Error when dealing with the message...") logging.SetSpanError(span, err) - } + span.End() } } @@ -369,15 +383,15 @@ func chatWithGPT(channel *amqp.Channel) { } logger.Infof("Successfully send the reply to user") - span.End() err = body.Ack(false) if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("Error when dealing with the message...4") + }).Errorf("Error when dealing with the message...") logging.SetSpanError(span, err) } + span.End() } } @@ -470,6 +484,7 @@ func saveAuditAction(channel *amqp.Channel) { }).Errorf("Error when dealing with the action...") logging.SetSpanError(span, err) } + span.End() } } @@ -496,7 +511,7 @@ func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger * if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("Error when dealing with the message event...1") + }).Errorf("Error when dealing with the message event...") } } else { curRetry++ @@ -508,7 +523,7 @@ func errorHandler(channel *amqp.Channel, d amqp.Delivery, requeue bool, logger * if err != nil { logger.WithFields(logrus.Fields{ "err": err, - }).Errorf("Error when dealing with the message event...2") + }).Errorf("Error when dealing with the message event...") } logger.Debugf("Retrying %d times", curRetry) From 2ddbf6975feea16364c213659ed85231b0191cca Mon Sep 17 00:00:00 2001 From: yangfeng <1719957182@qq.com> Date: Sat, 9 Sep 2023 01:18:41 +0800 Subject: [PATCH 3/3] fix(publish): limit maximum video size to 200MB Add error code when the upload video exceed the maximum video size --- src/constant/config/service.go | 2 ++ src/constant/strings/err.go | 2 ++ src/services/publish/main.go | 2 +- src/web/publish/handler.go | 11 +++++++++++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/constant/config/service.go b/src/constant/config/service.go index 62859d2..2b30e02 100644 --- a/src/constant/config/service.go +++ b/src/constant/config/service.go @@ -38,3 +38,5 @@ const Event = "GuGoTik-Recommend" const MsgConsumer = "GuGoTik-MgsConsumer" const BloomRedisChannel = "GuGoTik-Bloom" + +const MaxVideoSize = 200 * 1024 * 1024 diff --git a/src/constant/strings/err.go b/src/constant/strings/err.go index c84ebf4..0e02092 100644 --- a/src/constant/strings/err.go +++ b/src/constant/strings/err.go @@ -90,4 +90,6 @@ const ( FollowLimited = "关注频繁,请稍后再试!" UserDoNotExistedCode = 10013 UserDoNotExisted = "查询用户不存在!" + OversizeVideoCode = 10014 + OversizeVideo = "上传视频超过了200MB" ) diff --git a/src/services/publish/main.go b/src/services/publish/main.go index 408c62b..bc2b15c 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -56,7 +56,7 @@ func main() { ) reg := prom.Client reg.MustRegister(srvMetrics) - maxSize := 500 * 1024 * 1024 + maxSize := config.MaxVideoSize s := grpc.NewServer( grpc.MaxRecvMsgSize(maxSize), diff --git a/src/web/publish/handler.go b/src/web/publish/handler.go index 46f1576..deafa72 100644 --- a/src/web/publish/handler.go +++ b/src/web/publish/handler.go @@ -110,6 +110,17 @@ func ActionPublishHandle(c *gin.Context) { } }(opened) + if file.Size > config.MaxVideoSize { + logger.WithFields(logrus.Fields{ + "FileSize": file.Size, + }).Errorf("Maximum file size is 200MB") + c.JSON(http.StatusOK, models.ActionPublishRes{ + StatusCode: strings.OversizeVideoCode, + StatusMsg: strings.OversizeVideo, + }) + return + } + var data = make([]byte, file.Size) readSize, err := opened.Read(data) if err != nil {