Skip to content

Commit

Permalink
update loki publish
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Dec 8, 2024
1 parent ccd2a20 commit 8ed2c84
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 64 deletions.
1 change: 0 additions & 1 deletion gateway/apinto/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func NewDynamicClient(client admin_client.Client, resource string) (*DynamicClie
cfg, has := gateway.GetDynamicResourceDriver(resource)
if !has {
return nil, errors.New("resource not found")

}

return &DynamicClient{client: client, profession: cfg.Profession, driver: cfg.Driver}, nil
Expand Down
40 changes: 4 additions & 36 deletions gateway/profession.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,10 @@ var dynamicResourceMap = map[string]Worker{
Profession: ProfessionCertificate,
Driver: "server",
},
//"openai": {
// Profession: ProfessionAIProvider,
// Driver: "openai",
//},
//"google": {
// Profession: ProfessionAIProvider,
// Driver: "google",
//},
//"anthropic": {
// Profession: ProfessionAIProvider,
// Driver: "anthropic",
//},
//"moonshot": {
// Profession: ProfessionAIProvider,
// Driver: "moonshot",
//},
//"tongyi": {
// Profession: ProfessionAIProvider,
// Driver: "tongyi",
//},
//"zhipuai": {
// Profession: ProfessionAIProvider,
// Driver: "zhipuai",
//},
//"fireworks": {
// Profession: ProfessionAIProvider,
// Driver: "fireworks",
//},
//"novita": {
// Profession: ProfessionAIProvider,
// Driver: "novita",
//},
//"mistralai": {
// Profession: ProfessionAIProvider,
// Driver: "mistralai",
//},
"loki": {
Profession: ProfessionOutput,
Driver: "loki",
},
}

type Worker struct {
Expand Down
2 changes: 1 addition & 1 deletion log-driver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var (
)

type IFactory interface {
Create(config string) (ILogDriver, error)
Create(config string) (ILogDriver, map[string]interface{}, error)
}

type factoryManager struct {
Expand Down
17 changes: 10 additions & 7 deletions log-driver/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {
type factory struct {
}

func (f *factory) Create(config string) (log_driver.ILogDriver, error) {
func (f *factory) Create(config string) (log_driver.ILogDriver, map[string]interface{}, error) {

return NewDriver(config)
}
Expand All @@ -35,24 +35,27 @@ type Driver struct {
headers map[string]string
}

func NewDriver(config string) (*Driver, error) {
func NewDriver(config string) (*Driver, map[string]interface{}, error) {
cfg := new(DriverConfig)
err := json.Unmarshal([]byte(config), cfg)
if err != nil {
return nil, err
return nil, nil, err
}
err = cfg.Check()
if err != nil {
return nil, err
return nil, nil, err
}
headers := map[string]string{}
for _, h := range cfg.Header {
headers[h.Key] = h.Value
}
return &Driver{
url: cfg.URL,
headers: headers,
}, nil
url: cfg.URL,
headers: headers,
}, map[string]interface{}{
"url": cfg.URL,
"headers": headers,
}, nil
}

func (d *Driver) LogInfo(clusterId string, id string) (*log_driver.LogInfo, error) {
Expand Down
109 changes: 104 additions & 5 deletions module/log/iml.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import (
"context"
"encoding/json"
"errors"
"time"

log_driver "github.com/APIParkLab/APIPark/log-driver"

"github.com/APIParkLab/APIPark/gateway"

"github.com/eolinker/go-common/store"

"gorm.io/gorm"

Expand All @@ -18,21 +25,113 @@ import (
var _ ILogModule = (*imlLogModule)(nil)

type imlLogModule struct {
service log.ILogService `autowired:""`
service log.ILogService `autowired:""`
clusterService cluster.IClusterService `autowired:""`
transaction store.ITransaction `autowired:""`
}

var labels = map[string]string{
"cluster": "$cluster",
"node": "$node",
}
var logFormatter = map[string]interface{}{
"fields": []string{
"$msec",
"$service",
"$provider",
"$scheme as request_scheme",
"$url as request_uri",
"$host as request_host",
"$header as request_header",
"$remote_addr",
"$request_body",
"$proxy_body",
"$proxy_method",
"$proxy_scheme",
"$proxy_uri",
"$api",
"$proxy_host",
"$proxy_header",
"$proxy_addr",
"$response_headers",
"$status",
"$content_type",
"$proxy_status",
"$request_time",
"$response_time",
"$node",
"$cluster",
"$application",
"$src_ip",
"$block_name as strategy",
"$request_id",
"$request_method",
"$authorization",
"$response_body",
"$proxy_response_body",
},
}

func (i *imlLogModule) Save(ctx context.Context, driver string, input *log_dto.Save) error {
factory, has := log_driver.GetFactory(driver)
if !has {
return errors.New("driver not found")
}
input.Cluster = cluster.DefaultClusterID
var cfg *string
if input.Config != nil {
data, _ := json.Marshal(input.Config)
tmp := string(data)
cfg = &tmp
}
return i.service.UpdateLogSource(ctx, driver, &log.Save{
ID: input.ID,
Cluster: &input.Cluster,
Config: cfg,
return i.transaction.Transaction(ctx, func(txCtx context.Context) error {
err := i.service.UpdateLogSource(ctx, driver, &log.Save{
ID: input.ID,
Cluster: &input.Cluster,
Config: cfg,
})
if err != nil {
return err
}
info, err := i.service.GetLogSource(ctx, driver)
if err != nil {
return err
}
d, c, err := factory.Create(info.Config)
if err != nil {
return err
}

client, err := i.clusterService.GatewayClient(ctx, input.Cluster)
if err != nil {
return err
}
dynamicClient, err := client.Dynamic(driver)
if err != nil {
return err
}
attr := make(map[string]interface{})
attr["driver"] = driver
attr["formatter"] = logFormatter
attr["labels"] = labels
attr["method"] = "POST"
for k, v := range c {
attr[k] = v
}
err = dynamicClient.Online(ctx, &gateway.DynamicRelease{
BasicItem: &gateway.BasicItem{
ID: driver,
Description: "collect access log",
Version: time.Now().Format("20060102150405"),
Resource: gateway.ProfessionOutput,
},
Attr: attr,
})
if err != nil {
return err
}
log_driver.SetDriver(driver, d)
return nil
})
}

Expand Down
16 changes: 2 additions & 14 deletions service/log/iml.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (i *imlLogService) OnComplete() {
if err != nil {
continue
}
driver, err := factory.Create(s.Config)
driver, _, err := factory.Create(s.Config)
if err != nil {
continue
}
Expand All @@ -45,10 +45,6 @@ func (i *imlLogService) OnComplete() {
}

func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, input *Save) error {
factory, has := log_driver.GetFactory(driver)
if !has {
return errors.New("driver not found")
}
s, err := i.store.First(ctx, map[string]interface{}{"driver": driver})
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
Expand Down Expand Up @@ -83,15 +79,7 @@ func (i *imlLogService) UpdateLogSource(ctx context.Context, driver string, inpu
s.Updater = utils.UserId(ctx)
s.UpdateAt = time.Now()
}
newDriver, err := factory.Create(s.Config)
if err != nil {
return err
}
err = i.store.Save(ctx, s)
if err != nil {
return err
}
log_driver.SetDriver(driver, newDriver)

return nil
}

Expand Down

0 comments on commit 8ed2c84

Please sign in to comment.