From 8ed2c84b68fcbb9d36c01c6e16b3461b7e086e8c Mon Sep 17 00:00:00 2001 From: Liujian <824010343@qq.com> Date: Mon, 9 Dec 2024 00:17:50 +0800 Subject: [PATCH] update loki publish --- gateway/apinto/dynamic.go | 1 - gateway/profession.go | 40 ++------------ log-driver/factory.go | 2 +- log-driver/loki/loki.go | 17 +++--- module/log/iml.go | 109 ++++++++++++++++++++++++++++++++++++-- service/log/iml.go | 16 +----- 6 files changed, 121 insertions(+), 64 deletions(-) diff --git a/gateway/apinto/dynamic.go b/gateway/apinto/dynamic.go index d0e891dc..a734a49c 100644 --- a/gateway/apinto/dynamic.go +++ b/gateway/apinto/dynamic.go @@ -20,7 +20,6 @@ func NewDynamicClient(client admin_client.Client, resource string) (*DynamicClie cfg, has := gateway.GetDynamicResourceDriver(resource) if !has { return nil, errors.New("resource not found") - } return &DynamicClient{client: client, profession: cfg.Profession, driver: cfg.Driver}, nil diff --git a/gateway/profession.go b/gateway/profession.go index 3596f388..606a4904 100644 --- a/gateway/profession.go +++ b/gateway/profession.go @@ -57,42 +57,10 @@ var dynamicResourceMap = map[string]Worker{ Profession: ProfessionCertificate, Driver: "server", }, - //"openai": { - // Profession: ProfessionAIProvider, - // Driver: "openai", - //}, - //"google": { - // Profession: ProfessionAIProvider, - // Driver: "google", - //}, - //"anthropic": { - // Profession: ProfessionAIProvider, - // Driver: "anthropic", - //}, - //"moonshot": { - // Profession: ProfessionAIProvider, - // Driver: "moonshot", - //}, - //"tongyi": { - // Profession: ProfessionAIProvider, - // Driver: "tongyi", - //}, - //"zhipuai": { - // Profession: ProfessionAIProvider, - // Driver: "zhipuai", - //}, - //"fireworks": { - // Profession: ProfessionAIProvider, - // Driver: "fireworks", - //}, - //"novita": { - // Profession: ProfessionAIProvider, - // Driver: "novita", - //}, - //"mistralai": { - // Profession: ProfessionAIProvider, - // Driver: "mistralai", - //}, + "loki": { + Profession: ProfessionOutput, + Driver: "loki", + }, } type Worker struct { diff --git a/log-driver/factory.go b/log-driver/factory.go index 49bc3855..ed13eb4d 100644 --- a/log-driver/factory.go +++ b/log-driver/factory.go @@ -7,7 +7,7 @@ var ( ) type IFactory interface { - Create(config string) (ILogDriver, error) + Create(config string) (ILogDriver, map[string]interface{}, error) } type factoryManager struct { diff --git a/log-driver/loki/loki.go b/log-driver/loki/loki.go index 7fe2ddfa..c79cefd4 100644 --- a/log-driver/loki/loki.go +++ b/log-driver/loki/loki.go @@ -21,7 +21,7 @@ func init() { type factory struct { } -func (f *factory) Create(config string) (log_driver.ILogDriver, error) { +func (f *factory) Create(config string) (log_driver.ILogDriver, map[string]interface{}, error) { return NewDriver(config) } @@ -35,24 +35,27 @@ type Driver struct { headers map[string]string } -func NewDriver(config string) (*Driver, error) { +func NewDriver(config string) (*Driver, map[string]interface{}, error) { cfg := new(DriverConfig) err := json.Unmarshal([]byte(config), cfg) if err != nil { - return nil, err + return nil, nil, err } err = cfg.Check() if err != nil { - return nil, err + return nil, nil, err } headers := map[string]string{} for _, h := range cfg.Header { headers[h.Key] = h.Value } return &Driver{ - url: cfg.URL, - headers: headers, - }, nil + url: cfg.URL, + headers: headers, + }, map[string]interface{}{ + "url": cfg.URL, + "headers": headers, + }, nil } func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, error) { diff --git a/module/log/iml.go b/module/log/iml.go index eb3d39d5..ded0a013 100644 --- a/module/log/iml.go +++ b/module/log/iml.go @@ -4,6 +4,13 @@ import ( "context" "encoding/json" "errors" + "time" + + log_driver "github.com/APIParkLab/APIPark/log-driver" + + "github.com/APIParkLab/APIPark/gateway" + + "github.com/eolinker/go-common/store" "gorm.io/gorm" @@ -18,10 +25,58 @@ import ( var _ ILogModule = (*imlLogModule)(nil) type imlLogModule struct { - service log.ILogService `autowired:""` + service log.ILogService `autowired:""` + clusterService cluster.IClusterService `autowired:""` + transaction store.ITransaction `autowired:""` +} + +var labels = map[string]string{ + "cluster": "$cluster", + "node": "$node", +} +var logFormatter = map[string]interface{}{ + "fields": []string{ + "$msec", + "$service", + "$provider", + "$scheme as request_scheme", + "$url as request_uri", + "$host as request_host", + "$header as request_header", + "$remote_addr", + "$request_body", + "$proxy_body", + "$proxy_method", + "$proxy_scheme", + "$proxy_uri", + "$api", + "$proxy_host", + "$proxy_header", + "$proxy_addr", + "$response_headers", + "$status", + "$content_type", + "$proxy_status", + "$request_time", + "$response_time", + "$node", + "$cluster", + "$application", + "$src_ip", + "$block_name as strategy", + "$request_id", + "$request_method", + "$authorization", + "$response_body", + "$proxy_response_body", + }, } func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.Save) error { + factory, has := log_driver.GetFactory(driver) + if !has { + return errors.New("driver not found") + } input.Cluster = cluster.DefaultClusterID var cfg *string if input.Config != nil { @@ -29,10 +84,54 @@ func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.S tmp := string(data) cfg = &tmp } - return i.service.UpdateLogSource(ctx, driver, &log.Save{ - ID: input.ID, - Cluster: &input.Cluster, - Config: cfg, + return i.transaction.Transaction(ctx, func(txCtx context.Context) error { + err := i.service.UpdateLogSource(ctx, driver, &log.Save{ + ID: input.ID, + Cluster: &input.Cluster, + Config: cfg, + }) + if err != nil { + return err + } + info, err := i.service.GetLogSource(ctx, driver) + if err != nil { + return err + } + d, c, err := factory.Create(info.Config) + if err != nil { + return err + } + + client, err := i.clusterService.GatewayClient(ctx, input.Cluster) + if err != nil { + return err + } + dynamicClient, err := client.Dynamic(driver) + if err != nil { + return err + } + attr := make(map[string]interface{}) + attr["driver"] = driver + attr["formatter"] = logFormatter + attr["labels"] = labels + attr["method"] = "POST" + for k, v := range c { + attr[k] = v + } + err = dynamicClient.Online(ctx, &gateway.DynamicRelease{ + BasicItem: &gateway.BasicItem{ + ID: driver, + Description: "collect access log", + Version: time.Now().Format("20060102150405"), + Resource: gateway.ProfessionOutput, + }, + Attr: attr, + }) + if err != nil { + return err + } + log_driver.SetDriver(driver, d) + return nil }) } diff --git a/service/log/iml.go b/service/log/iml.go index 2daeb9a8..e0ff4b56 100644 --- a/service/log/iml.go +++ b/service/log/iml.go @@ -35,7 +35,7 @@ func (i *imlLogService) OnComplete() { if err != nil { continue } - driver, err := factory.Create(s.Config) + driver, _, err := factory.Create(s.Config) if err != nil { continue } @@ -45,10 +45,6 @@ func (i *imlLogService) OnComplete() { } func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, input *Save) error { - factory, has := log_driver.GetFactory(driver) - if !has { - return errors.New("driver not found") - } s, err := i.store.First(ctx, map[string]interface{}{"driver": driver}) if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { @@ -83,15 +79,7 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu s.Updater = utils.UserId(ctx) s.UpdateAt = time.Now() } - newDriver, err := factory.Create(s.Config) - if err != nil { - return err - } - err = i.store.Save(ctx, s) - if err != nil { - return err - } - log_driver.SetDriver(driver, newDriver) + return nil }