From efa81b30039abc3a7fe7881d85da7c524c5de7a1 Mon Sep 17 00:00:00 2001 From: Hendrik Schlehlein Date: Sun, 4 Dec 2022 03:02:27 +0100 Subject: [PATCH] feat: add traffic limiter --- cmd/root.go | 14 +- configs/config.yml | 69 +++-- configs/proxies/defaults.yml | 5 +- configs/proxies/java-example.yml | 51 +++- go.mod | 26 +- go.sum | 81 +----- internal/app/infrared/conn.go | 20 +- internal/app/infrared/cpn.go | 20 +- internal/app/infrared/cpn_test.go | 11 +- internal/app/infrared/event.go | 45 +++- internal/app/infrared/gateway.go | 10 +- internal/app/infrared/pool.go | 14 +- internal/app/infrared/proxy.go | 1 + internal/app/infrared/server.go | 11 +- internal/pkg/bedrock/conn.go | 28 +- internal/pkg/config/config.go | 26 +- internal/pkg/config/provider/file.go | 4 + internal/pkg/java/config.go | 70 ++--- internal/pkg/java/conn.go | 5 +- internal/pkg/java/gateway.go | 2 +- internal/pkg/java/server.go | 9 +- internal/pkg/java/status_response.go | 4 +- internal/pkg/storage/postgres/repository.go | 19 -- internal/plugin/api/api.go | 3 - internal/plugin/prometheus/prometheus.go | 12 +- internal/plugin/traffic_limiter/config.go | 67 +++++ internal/plugin/traffic_limiter/storage.go | 147 +++++++++++ .../plugin/traffic_limiter/traffic_limiter.go | 188 +++++++++++++ internal/plugin/webhook/webhook.go | 7 +- pkg/event/bus.go | 248 ++++++++++++------ pkg/event/event.go | 32 ++- test/attacks/dos/main.go | 4 +- 32 files changed, 903 insertions(+), 350 deletions(-) delete mode 100644 internal/pkg/storage/postgres/repository.go create mode 100644 internal/plugin/traffic_limiter/config.go create mode 100644 internal/plugin/traffic_limiter/storage.go create mode 100644 internal/plugin/traffic_limiter/traffic_limiter.go diff --git a/cmd/root.go b/cmd/root.go index 798b23c6..d53cfad0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,6 +2,7 @@ package cmd import ( "embed" + "errors" "fmt" "os" "os/signal" @@ -9,13 +10,13 @@ import ( "sync" "syscall" - "github.com/haveachin/infrared/internal/plugin/api" - "github.com/haveachin/infrared/internal/plugin/prometheus" - "github.com/haveachin/infrared/internal/app/infrared" "github.com/haveachin/infrared/internal/pkg/bedrock" "github.com/haveachin/infrared/internal/pkg/config" "github.com/haveachin/infrared/internal/pkg/java" + "github.com/haveachin/infrared/internal/plugin/api" + "github.com/haveachin/infrared/internal/plugin/prometheus" + "github.com/haveachin/infrared/internal/plugin/traffic_limiter" "github.com/haveachin/infrared/internal/plugin/webhook" "github.com/spf13/cobra" "go.uber.org/zap" @@ -52,8 +53,10 @@ var ( zap.String("config", configPath), ) - if err := safeWriteFromEmbeddedFS("configs", "."); err != nil { - return err + if _, err := os.Stat(configPath); err != nil && errors.Is(err, os.ErrNotExist) { + if err := safeWriteFromEmbeddedFS("configs", "."); err != nil { + return err + } } cfg, err := config.New(configPath, onConfigChange, logger) @@ -94,6 +97,7 @@ var ( &webhook.Plugin{}, &prometheus.Plugin{}, &api.Plugin{}, + &traffic_limiter.Plugin{}, }, Logger: logger, } diff --git a/configs/config.yml b/configs/config.yml index a7124f7b..ecb415cd 100644 --- a/configs/config.yml +++ b/configs/config.yml @@ -26,14 +26,6 @@ providers: # #watch: true - # Storage like a database that Infrared should read from. - # - storage: - # A list of storage IDs that Infrared should read from. - # - ids: - - default - # Config files that Infrared should read from. # file: @@ -45,18 +37,43 @@ providers: # watch: true -# Storage are mainly used to store and read Infrared configs, -# but are also used to enable other features that need persistend storage. -# -storage: - # This is the ID of this database. - # - default: - driver: postgres - address: :5432 - database: infrared - username: username - password: password +#trafficLimiters: + #default: + # Server IDs to watch the traffic + # + #serverIds: + # - default + + # File is used to persistently store data + # + #file: bandwidth.yml + + # This is the amount of traffic until the server gets limited. + # Valid sizes are B, KB, MB, GB, TB, PB and EB. + # + #trafficLimit: 1TB + + # Sets the schedule for the job that resets the consumed bytes from the traffic limiter. + # For more info on the Cron sysntax see here: https://en.wikipedia.org/wiki/Cron + # + #resetCron: "@monthly" + + # The message that is displayed to a client when they try to connect + # but the server is out of bandwidth. + # + #outOfBandwidthMessage: Sorry {{username}}, but the server is out of bandwidth. + + # This is the ping response that clients see of your server when it is out of bandwidth. + # + #outOfBandwidthStatus: + #versionName: Infrared + #protocolNumber: 0 + #maxPlayerCount: 0 + #playerCount: 0 + #iconPath: icons/default.png + #motd: | + # Powered by Infrared + # §6Server at {{serverDomain}} is out of bandwidth. # Enables the API to access real time data. # @@ -94,12 +111,12 @@ storage: # Event Topics to listen for. # Available events are: - # - NewConnEvent - When a new connection to Infrared is made. - # - PreConnProcessingEvent - Before the new connection is processed. - # - PostConnProcessingEvent - After a connection is processed. - # - PreConnConnectingEvent - Before a client is connecting to the target server. - # - PlayerJoinEvent - When a player joins a server. - # - PlayerLeaveEvent - When a player leaves the server. + # - AcceptedConn - When a new connection to Infrared is made. + # - PreProcessing - Before the new connection is processed. + # - PostProcessing - After a connection is processed. + # - PrePlayerJoin - Before a client is connecting to the target server. + # - PlayerJoin - When a player joins a server. + # - PlayerLeave - When a player leaves the server. # #events: # - PlayerJoin diff --git a/configs/proxies/defaults.yml b/configs/proxies/defaults.yml index f2dc50c8..04bb3f70 100644 --- a/configs/proxies/defaults.yml +++ b/configs/proxies/defaults.yml @@ -18,7 +18,7 @@ defaults: # compression: flate - # This is the ping response that clients see of your sever. This cannot be handled at + # This is the ping response that clients see of your server. This cannot be handled at # server level due to a limitation of the bedrock protocol. See https://wiki.vg/Raknet_Protocol#Unconnected_Pong # pingStatus: @@ -63,7 +63,10 @@ defaults: java: gateway: listener: + # The message that is displayed to a client when they try to connect via an invalid domain. + # serverNotFoundMessage: Sorry {{username}}, but {{serverDomain}} was not found. + # The Java Edition has a different approach to handling status pings, than the Bedrock Edition. # This makes it possible to display a custom message to clients if the domain that they want # to connect though has a server configuration behind it. diff --git a/configs/proxies/java-example.yml b/configs/proxies/java-example.yml index 44e68e44..391fdcaa 100644 --- a/configs/proxies/java-example.yml +++ b/configs/proxies/java-example.yml @@ -1,24 +1,65 @@ +# This is an example config for a minecraft java proxy. +# Feel free to use this as a template for other proxy configs +# or adapt it to your fit your setup. +# java: gateways: + # This is just the name of this gateway. + # Since is your only gateways right now I would call it "default". + # These names ARE global. Make sure to not have duplicates. + # default: listeners: + # Same as the gateway. This is just the name of the listener. + # Since is binds to port 25565 it makes sense that it is called + # default for now. + # These names ARE global. Make sure to not have duplicates. + # default: bind: :25565 + + # Optional fields: + # Also look at the defaults.yml file for more optional fields. + # + #receiveProxyProtocol: true + #receiveRealIP: true servers: + # This is again just the name of the server. + # These names ARE global. Make sure to not have duplicates. + # default: + # We link this server to our default gateway. + # That means that players that connect through any listener + # in our "default" gateway can connect to this server. + # gateways: - default + + # This is the domain that players enter in their game client. + # You can have multiple domains here or just one. + # Currently this holds just a wildcard character as a domain + # meaning that is accepts every domain that a player uses. + # More about wildcard characters: https://en.wikipedia.org/wiki/Wildcard_character + # domains: - "*" + + # Address of the server that Infrared sends the players to. + # address: example.com:25565 + + # Optional fields: + # Also look at the defaults.yml file for more optional fields. + # #proxyBind: 0.0.0.0 #sendProxyProtocol: true #sendRealIP: true #overrideStatus: #versionName: Infrared - #maxPlayerCount: 20 + #protocolNumber: 0 + #maxPlayerCount: 0 #playerCount: 0 - # - bandwidth: - trafficLimit: 1TB - resetCron: "0 0 0 1 * *" + #iconPath: icons/default.png + #motd: | + # Powered by Infrared + # §aServer at {{serverDomain}} is online. \ No newline at end of file diff --git a/go.mod b/go.mod index cda4451d..1df96111 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/haveachin/infrared go 1.19 require ( + github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b github.com/cespare/xxhash/v2 v2.1.2 github.com/df-mc/atomic v1.10.0 github.com/docker/docker v20.10.20+incompatible @@ -20,11 +21,9 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/pires/go-proxyproto v0.6.2 github.com/prometheus/client_golang v1.13.0 + github.com/robfig/cron/v3 v3.0.1 github.com/sandertv/go-raknet v1.12.0 github.com/spf13/cobra v1.6.0 - github.com/uptrace/bun v1.1.8 - github.com/uptrace/bun/dialect/sqlitedialect v1.1.8 - github.com/uptrace/bun/driver/sqliteshim v1.1.8 go.uber.org/atomic v1.10.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.23.0 @@ -40,13 +39,8 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect - github.com/jinzhu/inflection v1.0.0 // indirect - github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kr/pretty v0.3.1 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect - github.com/mattn/go-sqlite3 v1.14.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -56,27 +50,13 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect - github.com/uptrace/bun/dialect/pgdialect v1.1.8 - github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect - github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20221017152216-f25eb7ecb193 // indirect golang.org/x/sys v0.1.0 // indirect golang.org/x/tools v0.1.12 // indirect google.golang.org/protobuf v1.28.1 // indirect gotest.tools/v3 v3.4.0 // indirect - lukechampine.com/uint128 v1.2.0 // indirect - modernc.org/cc/v3 v3.36.3 // indirect - modernc.org/ccgo/v3 v3.16.9 // indirect - modernc.org/libc v1.17.1 // indirect - modernc.org/mathutil v1.5.0 // indirect - modernc.org/memory v1.2.1 // indirect - modernc.org/opt v0.1.3 // indirect - modernc.org/sqlite v1.18.1 // indirect - modernc.org/strutil v1.1.3 // indirect - modernc.org/token v1.0.1 // indirect ) diff --git a/go.sum b/go.sum index b5680257..90f9172f 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY= +github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -74,8 +76,6 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -148,7 +148,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= @@ -163,8 +162,6 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gorilla/schema v1.2.0 h1:YufUaxZYCKGFuAq3c96BOhjgd5nmXiOY9NGzF247Tsc= @@ -176,8 +173,6 @@ github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= -github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -187,8 +182,6 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= -github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= @@ -203,11 +196,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= -github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09JkG9WeqChjprR5s9bBZ+OM= github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -263,8 +251,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -283,26 +271,16 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc h1:9lRDQMhESg+zvGYmW5DyG0UqvY96Bu5QYsTLvCHdrgo= -github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GHm1iF1pBvUfxfsH0Wmnc2VbpgvbI9ZWuIRs= -github.com/uptrace/bun v1.1.8 h1:slxuaP4LYWFbPRUmTtQhfJN+6eX/6ar2HDKYTcI50SA= -github.com/uptrace/bun v1.1.8/go.mod h1:iT89ESdV3uMupD9ixt6Khidht+BK0STabK/LeZE+B84= -github.com/uptrace/bun/dialect/pgdialect v1.1.8 h1:wayJhjYDPGv8tgOBLolbBtSFQ0TihFoo8E1T129UdA8= -github.com/uptrace/bun/dialect/pgdialect v1.1.8/go.mod h1:nNbU8PHTjTUM+CRtGmqyBb9zcuRAB8I680/qoFSmBUk= -github.com/uptrace/bun/dialect/sqlitedialect v1.1.8 h1:IJ6qBLjeON21tpgmZF/V/k/oHdzAql5UrnaqMCksTlY= -github.com/uptrace/bun/dialect/sqlitedialect v1.1.8/go.mod h1:IZF76cHEf8eeGA29OpkYyPYDs4l/iSMTYRyuFRqeXdY= -github.com/uptrace/bun/driver/sqliteshim v1.1.8 h1:XWeEluAdhN7xKS2WEAbsB9pv1N24PSRMx1rbV6Oe95U= -github.com/uptrace/bun/driver/sqliteshim v1.1.8/go.mod h1:ltqNzXU7ClF4zFnC+zCnlby0xXvPBFoa45uYFq982QI= -github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= -github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= -github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= -github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -430,7 +408,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -455,11 +432,9 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -518,7 +493,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= -golang.org/x/tools v0.0.0-20201124115921-2c860bdd6e78/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= @@ -632,41 +606,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= -lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= -modernc.org/cc/v3 v3.36.2/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= -modernc.org/cc/v3 v3.36.3 h1:uISP3F66UlixxWEcKuIWERa4TwrZENHSL8tWxZz8bHg= -modernc.org/cc/v3 v3.36.3/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= -modernc.org/ccgo/v3 v3.16.9 h1:AXquSwg7GuMk11pIdw7fmO1Y/ybgazVkMhsZWCV0mHM= -modernc.org/ccgo/v3 v3.16.9/go.mod h1:zNMzC9A9xeNUepy6KuZBbugn3c0Mc9TeiJO4lgvkJDo= -modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= -modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= -modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= -modernc.org/libc v1.17.0/go.mod h1:XsgLldpP4aWlPlsjqKRdHPqCxCjISdHfM/yeWC5GyW0= -modernc.org/libc v1.17.1 h1:Q8/Cpi36V/QBfuQaFVeisEBs3WqoGAJprZzmf7TfEYI= -modernc.org/libc v1.17.1/go.mod h1:FZ23b+8LjxZs7XtFMbSzL/EhPxNbfZbErxEHc7cbD9s= -modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= -modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= -modernc.org/memory v1.2.0/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= -modernc.org/memory v1.2.1 h1:dkRh86wgmq/bJu2cAS2oqBCz/KsMZU7TUM4CibQ7eBs= -modernc.org/memory v1.2.1/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= -modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= -modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= -modernc.org/sqlite v1.18.1 h1:ko32eKt3jf7eqIkCgPAeHMBXw3riNSLhl2f3loEF7o8= -modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4= -modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw= -modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= -modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= -modernc.org/tcl v1.13.1 h1:npxzTwFTZYM8ghWicVIX1cRWzj7Nd8i6AqqX2p+IYao= -modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= -modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= -modernc.org/z v1.5.1 h1:RTNHdsrOpeoSeOF4FbzTo8gBYByaJ5xT7NgZ9ZqRiJM= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/internal/app/infrared/conn.go b/internal/app/infrared/conn.go index fbfa3e60..eb83a6c6 100644 --- a/internal/app/infrared/conn.go +++ b/internal/app/infrared/conn.go @@ -5,6 +5,8 @@ import ( "net" "strings" "time" + + "github.com/haveachin/infrared/pkg/event" ) type Edition string @@ -36,7 +38,7 @@ type Conn interface { GatewayID() string // Edition returns the Minecraft edition of this connection. Edition() Edition - Pipe(c net.Conn) error + Pipe(c net.Conn) (n int64, err error) } // ProcessedConn is a already processed connection that waits to be handles by a server @@ -83,19 +85,25 @@ type ConnTunnel struct { // MatchedDomain is the domain that the client matched when resolving // the server that it requested. MatchedDomain string + EventBus event.Bus } // Start starts to proxy the Conn to the Server. This call is blocking. -func (ct ConnTunnel) Start() error { +func (ct ConnTunnel) Start() (int64, error) { rc, err := ct.Server.HandleConn(ct.Conn) if err != nil { - return err + return 0, err } defer rc.Close() - go ct.Conn.Pipe(rc) - rc.Pipe(ct.Conn) - return nil + var consumedBytes int64 + go func() { + n, _ := ct.Conn.Pipe(rc) + consumedBytes += n + }() + n, _ := rc.Pipe(ct.Conn) + consumedBytes += n + return consumedBytes, nil } // Close closes both connection (client to server and server to client). diff --git a/internal/app/infrared/cpn.go b/internal/app/infrared/cpn.go index a61c7a72..b16a9399 100644 --- a/internal/app/infrared/cpn.go +++ b/internal/app/infrared/cpn.go @@ -61,9 +61,15 @@ func (cpn CPN) ListenAndServe(quit <-chan bool) { connLogger := cpn.Logger.With(logConn(c)...) connLogger.Debug("starting to process connection") - cpn.EventBus.Push(PreConnProcessingEvent{ + + replyChan := cpn.EventBus.Request(PreConnProcessingEvent{ Conn: c, - }, PreConnProcessingEventTopic) + }, PreProcessingEventTopic) + + if isEventCanceled(replyChan, connLogger) { + c.Close() + continue + } c.SetDeadline(time.Now().Add(cpn.ClientTimeout())) chain(cpn.Middlewares, HandlerFunc(func(c Conn) { @@ -81,9 +87,15 @@ func (cpn CPN) ListenAndServe(quit <-chan bool) { c.SetDeadline(time.Time{}) connLogger.Debug("sending client to server gateway") - cpn.EventBus.Push(PostConnProcessingEvent{ + + replyChan := cpn.EventBus.Request(PostConnProcessingEvent{ ProcessedConn: procConn, - }, PostConnProcessingEventTopic) + }, PostProcessingEventTopic) + + if isEventCanceled(replyChan, connLogger) { + procConn.Close() + return + } cpn.Out <- procConn })).ProcessConn(c) diff --git a/internal/app/infrared/cpn_test.go b/internal/app/infrared/cpn_test.go index c4216343..61e8e5a6 100644 --- a/internal/app/infrared/cpn_test.go +++ b/internal/app/infrared/cpn_test.go @@ -8,6 +8,7 @@ import ( "github.com/golang/mock/gomock" "github.com/haveachin/infrared/internal/app/infrared" + "github.com/haveachin/infrared/pkg/event" "go.uber.org/zap" ) @@ -40,9 +41,11 @@ func TestCPN_ListenAndServe(t *testing.T) { cp := NewMockConnProcessor(ctrl) cp.EXPECT().ClientTimeout().Times(1).Return(time.Duration(0)) cp.EXPECT().ProcessConn(tc.in).Times(1).Return(tc.out, tc.procErr) + replyChan := make(chan event.Reply) + close(replyChan) bus := NewMockBus(ctrl) - bus.EXPECT().Push(gomock.Any(), infrared.PreConnProcessingEventTopic). - Times(1).Return() + bus.EXPECT().Request(gomock.Any(), infrared.PreProcessingEventTopic). + Times(1).Return(replyChan) if tc.err == nil { tc.in.EXPECT().SetDeadline(gomock.Any()).Times(1).Return(nil) @@ -52,8 +55,8 @@ func TestCPN_ListenAndServe(t *testing.T) { tc.in.EXPECT().Close().Times(1).Return(nil) } else { tc.in.EXPECT().SetDeadline(time.Time{}).Times(1).Return(nil) - bus.EXPECT().Push(gomock.Any(), infrared.PostConnProcessingEventTopic). - Times(1).Return() + bus.EXPECT().Request(gomock.Any(), infrared.PostProcessingEventTopic). + Times(1).Return(replyChan) } in := make(chan infrared.Conn) diff --git a/internal/app/infrared/event.go b/internal/app/infrared/event.go index 3703e26e..005aee0e 100644 --- a/internal/app/infrared/event.go +++ b/internal/app/infrared/event.go @@ -1,19 +1,39 @@ package infrared +import ( + "github.com/haveachin/infrared/pkg/event" + "go.uber.org/zap" +) + const ( - NewConnEventTopic = "NewConn" - PreConnProcessingEventTopic = "PreConnProcessing" - PostConnProcessingEventTopic = "PostConnProcessing" - PreConnConnectingEventTopic = "PreConnConnecting" - PostServerConnConnectingEventTopic = "PostServerConnConnecting" + AcceptedConnEventTopic = "AcceptedConn" + PreProcessingEventTopic = "PreProcessing" + PostProcessingEventTopic = "PostProcessing" + PrePlayerJoinEventTopic = "PrePlayerJoin" + PlayerJoinEventTopic = "PlayerJoin" + PlayerLeaveEventTopicAsync = "PlayerLeave" +) - PlayerJoinEventTopic = "PlayerJoin" - PlayerLeaveEventTopic = "PlayerLeave" +// isEventCanceled evaluates all incoming replys and returns if the event +// is canceled and the Reply that canceled it. +func isEventCanceled(replyChan <-chan event.Reply, logger *zap.Logger) bool { + for reply := range replyChan { + if reply.Err == nil { + continue + } - ServerRegisterEventTopic = "ServerRegister" -) + logger.Info("event canceled", + zap.String("eventId", reply.EventID), + zap.String("handlerId", reply.HandlerID), + zap.String("reason", reply.Err.Error()), + ) -type NewConnEvent struct { + return true + } + return false +} + +type AcceptedConnEvent struct { Conn Conn } @@ -40,8 +60,5 @@ type PlayerLeaveEvent struct { ProcessedConn ProcessedConn Server Server MatchedDomain string -} - -type ServerRegisterEvent struct { - Server Server + ConsumedBytes int64 } diff --git a/internal/app/infrared/gateway.go b/internal/app/infrared/gateway.go index dd4c797d..b722652b 100644 --- a/internal/app/infrared/gateway.go +++ b/internal/app/infrared/gateway.go @@ -49,9 +49,15 @@ func ListenAndServe(gw Gateway, cpnChan chan<- Conn) { conn := gw.WrapConn(c, l).(Conn) logger.Info("accepting new connection", logConn(c)...) - event.Push(NewConnEvent{ + + replyChan := event.Request(AcceptedConnEvent{ Conn: conn, - }, NewConnEventTopic) + }, AcceptedConnEventTopic) + + if isEventCanceled(replyChan, logger) { + conn.Close() + continue + } cpnChan <- conn } diff --git a/internal/app/infrared/pool.go b/internal/app/infrared/pool.go index 745aa051..31c8b5e8 100644 --- a/internal/app/infrared/pool.go +++ b/internal/app/infrared/pool.go @@ -54,7 +54,7 @@ func (cp *ConnPool) handlePlayerStatus(ct ConnTunnel) { logger := cp.Logger.With(logProcessedConn(ct.Conn)...) logger.Info("connecting client to server") - if err := ct.Start(); err != nil { + if _, err := ct.Start(); err != nil { logger.Info("closing connection", zap.Error(err)) return } @@ -67,14 +67,19 @@ func (cp *ConnPool) handlePlayerLogin(ct ConnTunnel) { defer cp.removeFromPool(ct) logger := cp.Logger.With(logProcessedConn(ct.Conn)...) - event.Push(PlayerJoinEvent{ + replyChan := event.Request(PlayerJoinEvent{ ProcessedConn: ct.Conn, Server: ct.Server, MatchedDomain: ct.MatchedDomain, }, PlayerJoinEventTopic) + if isEventCanceled(replyChan, logger) { + return + } + logger.Info("connecting client to server") - if err := ct.Start(); err != nil { + consumedBytes, err := ct.Start() + if err != nil { logger.Info("closing connection", zap.Error(err)) return } @@ -84,7 +89,8 @@ func (cp *ConnPool) handlePlayerLogin(ct ConnTunnel) { ProcessedConn: ct.Conn, Server: ct.Server, MatchedDomain: ct.MatchedDomain, - }, PlayerLeaveEventTopic) + ConsumedBytes: consumedBytes, + }, PlayerLeaveEventTopicAsync) } func (cp *ConnPool) Reload(cfg ConnPoolConfig) { diff --git a/internal/app/infrared/proxy.go b/internal/app/infrared/proxy.go index ce11e890..ec55e3ef 100644 --- a/internal/app/infrared/proxy.go +++ b/internal/app/infrared/proxy.go @@ -136,6 +136,7 @@ func (p *Proxy) ListenAndServe(logger *zap.Logger) { go p.connPool.Start() p.serverGateway.Logger = logger + p.serverGateway.EventBus = event.DefaultBus p.serverGateway.Start() } diff --git a/internal/app/infrared/server.go b/internal/app/infrared/server.go index 2791d29e..f70e783c 100644 --- a/internal/app/infrared/server.go +++ b/internal/app/infrared/server.go @@ -38,6 +38,7 @@ type ServerGatewayConfig struct { In <-chan ProcessedConn Out chan<- ConnTunnel Logger *zap.Logger + EventBus event.Bus } type ServerGateway struct { @@ -131,10 +132,16 @@ func (sg *ServerGateway) Start() { pcLogger = pcLogger.With(logServer(srv)...) pcLogger.Debug("found server") - event.Push(PreConnConnectingEvent{ + + replyChan := event.Request(PreConnConnectingEvent{ ProcessedConn: pc, Server: srv, - }, PreConnConnectingEventTopic) + }, PrePlayerJoinEventTopic) + + if isEventCanceled(replyChan, pcLogger) { + pc.Close() + continue + } sg.Out <- ConnTunnel{ Conn: pc, diff --git a/internal/pkg/bedrock/conn.go b/internal/pkg/bedrock/conn.go index 85bca2e1..1e624867 100644 --- a/internal/pkg/bedrock/conn.go +++ b/internal/pkg/bedrock/conn.go @@ -47,24 +47,6 @@ func (c *Conn) ReadPackets() ([]packet.Data, error) { return pksData, nil } -func (c *Conn) ReadPacket() ([]packet.Data, error) { - pks, err := c.decoder.Decode() - if err != nil { - return nil, err - } - - var pksData []packet.Data - for _, pk := range pks { - pkData, err := packet.ParseData(pk) - if err != nil { - return nil, err - } - - pksData = append(pksData, pkData) - } - return pksData, nil -} - func (c *Conn) WritePacket(pk packet.Packet) error { buf := internal.BufferPool.Get().(*bytes.Buffer) defer func() { @@ -80,17 +62,19 @@ func (c *Conn) WritePacket(pk packet.Packet) error { return c.encoder.Encode(buf.Bytes()) } -func (c *Conn) Pipe(rc net.Conn) error { +func (c *Conn) Pipe(rc net.Conn) (int64, error) { + var nn int64 for { pk, err := c.Conn.ReadPacket() if err != nil { - return err + return nn, err } - _, err = rc.Write(pk) + n, err := rc.Write(pk) if err != nil { - return err + return nn, err } + nn += int64(n) } } diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 2d052c15..18efa820 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -1,8 +1,10 @@ package config import ( + "reflect" "sync" + "github.com/c2h5oh/datasize" "github.com/haveachin/infrared/internal/pkg/config/provider" "github.com/mitchellh/mapstructure" "go.uber.org/zap" @@ -121,8 +123,11 @@ func (c *config) Read() (map[string]any, error) { func Unmarshal(cfg any, v any) error { decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - Result: v, - DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: v, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + stringToDataSizeHookFunc(), + ), }) if err != nil { return err @@ -130,3 +135,20 @@ func Unmarshal(cfg any, v any) error { return decoder.Decode(cfg) } + +func stringToDataSizeHookFunc() mapstructure.DecodeHookFunc { + return func( + f reflect.Type, + t reflect.Type, + data interface{}) (interface{}, error) { + if f.Kind() != reflect.String { + return data, nil + } + if t != reflect.TypeOf(datasize.ByteSize(5)) { + return data, nil + } + + // Convert it by parsing + return datasize.ParseString(data.(string)) + } +} diff --git a/internal/pkg/config/provider/file.go b/internal/pkg/config/provider/file.go index b5508d9d..415a0578 100644 --- a/internal/pkg/config/provider/file.go +++ b/internal/pkg/config/provider/file.go @@ -121,6 +121,10 @@ func (p file) Close() error { func (p file) readConfigData() (Data, error) { cfg := map[string]any{} readConfig := func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { return nil } diff --git a/internal/pkg/java/config.go b/internal/pkg/java/config.go index 3772b640..7d967cdc 100644 --- a/internal/pkg/java/config.go +++ b/internal/pkg/java/config.go @@ -17,17 +17,17 @@ import ( ) type ServerConfig struct { - Domains []string `mapstructure:"domains"` - Address string `mapstructure:"address"` - ProxyBind string `mapstructure:"proxyBind"` - SendProxyProtocol bool `mapstructure:"sendProxyProtocol"` - SendRealIP bool `mapstructure:"sendRealIP"` - OverrideAddress bool `mapstructure:"overrideAddress"` - DialTimeout time.Duration `mapstructure:"dialTimeout"` - DialTimeoutMessage string `mapstructure:"dialTimeoutMessage"` - OverrideStatus OverrideServerStatusConfig `mapstructure:"overrideStatus"` - DialTimeoutStatus DialTimeoutServerStatusConfig `mapstructure:"dialTimeoutStatus"` - Gateways []string `mapstructure:"gateways"` + Domains []string `mapstructure:"domains"` + Address string `mapstructure:"address"` + ProxyBind string `mapstructure:"proxyBind"` + SendProxyProtocol bool `mapstructure:"sendProxyProtocol"` + SendRealIP bool `mapstructure:"sendRealIP"` + OverrideAddress bool `mapstructure:"overrideAddress"` + DialTimeout time.Duration `mapstructure:"dialTimeout"` + DialTimeoutMessage string `mapstructure:"dialTimeoutMessage"` + OverrideStatus OverrideServerStatusConfig `mapstructure:"overrideStatus"` + DialTimeoutStatus ServerStatusConfig `mapstructure:"dialTimeoutStatus"` + Gateways []string `mapstructure:"gateways"` } type OverrideServerStatusConfig struct { @@ -40,7 +40,7 @@ type OverrideServerStatusConfig struct { MOTD *string `mapstructure:"motd"` } -type DialTimeoutServerStatusConfig struct { +type ServerStatusConfig struct { VersionName string `mapstructure:"versionName"` ProtocolNumber int `mapstructure:"protocolNumber"` MaxPlayerCount int `mapstructure:"maxPlayerCount"` @@ -51,21 +51,21 @@ type DialTimeoutServerStatusConfig struct { } type ServerStatusPlayerSampleConfig struct { - Name string `mapstructure:"name,omitempty"` - UUID string `mapstructure:"uuid,omitempty"` + Name string `mapstructure:"name"` + UUID string `mapstructure:"uuid"` } type ListenerConfig struct { - Bind string `mapstructure:"bind"` - ReceiveProxyProtocol bool `mapstructure:"receiveProxyProtocol"` - ReceiveRealIP bool `mapstructure:"receiveRealIP,omitempty"` - ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage,omitempty"` - ServerNotFoundStatus DialTimeoutServerStatusConfig `mapstructure:"serverNotFoundStatus,omitempty"` + Bind string `mapstructure:"bind"` + ReceiveProxyProtocol bool `mapstructure:"receiveProxyProtocol"` + ReceiveRealIP bool `mapstructure:"receiveRealIP"` + ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage"` + ServerNotFoundStatus ServerStatusConfig `mapstructure:"serverNotFoundStatus"` } type GatewayConfig struct { Listeners map[string]ListenerConfig `mapstructure:"listeners"` - ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage,omitempty"` + ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage"` } type ConnProcessorConfig struct { @@ -98,17 +98,17 @@ type ProxyConfig struct { type ProxyConfigDefaults struct { Gateway struct { - Listener ListenerConfig `mapstructure:"listener,omitempty"` - ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage,omitempty"` - } `mapstructure:"gateway,omitempty"` - Server ServerConfig `mapstructure:"server,omitempty"` + Listener ListenerConfig `mapstructure:"listener"` + ServerNotFoundMessage string `mapstructure:"serverNotFoundMessage"` + } `mapstructure:"gateway"` + Server ServerConfig `mapstructure:"server"` } type Config struct { Java ProxyConfig `mapstructure:"java"` Defaults struct { Java ProxyConfigDefaults `mapstructure:"java"` - } `mapstructure:"defaults,omitempty"` + } `mapstructure:"defaults"` } func NewProxyConfigFromMap(cfg map[string]any) (infrared.ProxyConfig, error) { @@ -225,7 +225,7 @@ func (cfg Config) loadListeners(gatewayID string) (map[string]ListenerConfig, er } func newListener(id string, cfg ListenerConfig) (Listener, error) { - status, err := newDialTimeoutServerStatus(cfg.ServerNotFoundStatus) + status, err := NewServerServerStatus(cfg.ServerNotFoundStatus) if err != nil { return Listener{}, err } @@ -264,7 +264,7 @@ func newServer(id string, cfg ServerConfig) (infrared.Server, error) { return nil, err } - dialTimeoutStatus, err := newDialTimeoutServerStatus(cfg.DialTimeoutStatus) + dialTimeoutStatus, err := NewServerServerStatus(cfg.DialTimeoutStatus) if err != nil { return nil, err } @@ -296,6 +296,8 @@ func newServer(id string, cfg ServerConfig) (infrared.Server, error) { }, }, Addr: cfg.Address, + AddrHost: host, + AddrPort: port, SendProxyProtocol: cfg.SendProxyProtocol, SendRealIP: cfg.SendRealIP, OverrideAddress: cfg.OverrideAddress, @@ -303,8 +305,6 @@ func newServer(id string, cfg ServerConfig) (infrared.Server, error) { OverrideStatus: overrideStatus, DialTimeoutStatusJSON: string(bb), GatewayIDs: cfg.Gateways, - Host: host, - Port: port, }, }, nil } @@ -312,7 +312,7 @@ func newServer(id string, cfg ServerConfig) (infrared.Server, error) { func newOverrideServerStatus(cfg OverrideServerStatusConfig) (OverrideStatusResponse, error) { var iconPtr *string if cfg.IconPath != nil { - icon, err := loadImageAndEncodeToBase64String(*cfg.IconPath) + icon, err := LoadImageAndEncodeToBase64String(*cfg.IconPath) if err != nil { return OverrideStatusResponse{}, err } @@ -330,12 +330,12 @@ func newOverrideServerStatus(cfg OverrideServerStatusConfig) (OverrideStatusResp }, nil } -func newDialTimeoutServerStatus(cfg DialTimeoutServerStatusConfig) (DialTimeoutStatusResponse, error) { - icon, err := loadImageAndEncodeToBase64String(cfg.IconPath) +func NewServerServerStatus(cfg ServerStatusConfig) (ServerStatusResponse, error) { + icon, err := LoadImageAndEncodeToBase64String(cfg.IconPath) if err != nil { - return DialTimeoutStatusResponse{}, err + return ServerStatusResponse{}, err } - return DialTimeoutStatusResponse{ + return ServerStatusResponse{ VersionName: cfg.VersionName, ProtocolNumber: cfg.ProtocolNumber, MaxPlayerCount: cfg.MaxPlayerCount, @@ -365,7 +365,7 @@ func newChanCaps(cfg ChanCapsConfig, cpnCount int) infrared.ProxySettings { } } -func loadImageAndEncodeToBase64String(path string) (string, error) { +func LoadImageAndEncodeToBase64String(path string) (string, error) { if path == "" { return "", nil } diff --git a/internal/pkg/java/conn.go b/internal/pkg/java/conn.go index d41f765a..32a085a9 100644 --- a/internal/pkg/java/conn.go +++ b/internal/pkg/java/conn.go @@ -39,9 +39,8 @@ type Conn struct { w io.Writer } -func (c *Conn) Pipe(rc net.Conn) error { - _, err := io.Copy(rc, c) - return err +func (c *Conn) Pipe(rc net.Conn) (int64, error) { + return io.Copy(rc, c) } func (c Conn) GatewayID() string { diff --git a/internal/pkg/java/gateway.go b/internal/pkg/java/gateway.go index 969cbb76..8ef9767e 100644 --- a/internal/pkg/java/gateway.go +++ b/internal/pkg/java/gateway.go @@ -18,7 +18,7 @@ type Listener struct { ReceiveProxyProtocol bool ReceiveRealIP bool ServerNotFoundMessage string - ServerNotFoundStatus DialTimeoutStatusResponse + ServerNotFoundStatus ServerStatusResponse serverNotFoundStatusJSON string net.Listener diff --git a/internal/pkg/java/server.go b/internal/pkg/java/server.go index 526223b3..eb8892ab 100644 --- a/internal/pkg/java/server.go +++ b/internal/pkg/java/server.go @@ -19,6 +19,8 @@ type Server struct { ID string Domains []string Addr string + AddrHost string + AddrPort int SendProxyProtocol bool SendRealIP bool OverrideAddress bool @@ -29,9 +31,6 @@ type Server struct { DialTimeoutStatusJSON string GatewayIDs []string - Host string - Port int - overrideStatusCache *string } @@ -91,8 +90,8 @@ func (s InfraredServer) HandleConn(c net.Conn) (infrared.Conn, error) { } if s.Server.OverrideAddress { - pc.handshake.SetServerAddress(s.Server.Host) - pc.handshake.ServerPort = protocol.UnsignedShort(s.Server.Port) + pc.handshake.SetServerAddress(s.Server.AddrHost) + pc.handshake.ServerPort = protocol.UnsignedShort(s.Server.AddrPort) pc.readPks[0] = pc.handshake.Marshal() } diff --git a/internal/pkg/java/status_response.go b/internal/pkg/java/status_response.go index 00460cfd..31f4c068 100644 --- a/internal/pkg/java/status_response.go +++ b/internal/pkg/java/status_response.go @@ -64,7 +64,7 @@ func (r OverrideStatusResponse) ResponseJSON(resp status.ResponseJSON) status.Re return resp } -type DialTimeoutStatusResponse struct { +type ServerStatusResponse struct { VersionName string ProtocolNumber int MaxPlayerCount int @@ -74,7 +74,7 @@ type DialTimeoutStatusResponse struct { MOTD string } -func (r DialTimeoutStatusResponse) ResponseJSON() status.ResponseJSON { +func (r ServerStatusResponse) ResponseJSON() status.ResponseJSON { return status.ResponseJSON{ Version: status.VersionJSON{ Name: r.VersionName, diff --git a/internal/pkg/storage/postgres/repository.go b/internal/pkg/storage/postgres/repository.go deleted file mode 100644 index 41e177a0..00000000 --- a/internal/pkg/storage/postgres/repository.go +++ /dev/null @@ -1,19 +0,0 @@ -package postgresql - -import ( - "database/sql" - "database/sql/driver" - - "github.com/uptrace/bun" - "github.com/uptrace/bun/dialect/pgdialect" -) - -type Storage struct { - db *bun.DB -} - -func NewStorage(connector driver.Connector) *Storage { - return &Storage{ - db: bun.NewDB(sql.OpenDB(connector), pgdialect.New()), - } -} diff --git a/internal/plugin/api/api.go b/internal/plugin/api/api.go index 95782e57..4e9356a2 100644 --- a/internal/plugin/api/api.go +++ b/internal/plugin/api/api.go @@ -8,7 +8,6 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" - "github.com/gofrs/uuid" "github.com/haveachin/infrared/internal/app/infrared" "github.com/haveachin/infrared/internal/pkg/config" "github.com/haveachin/infrared/pkg/event" @@ -26,7 +25,6 @@ type Plugin struct { Config PluginConfig logger *zap.Logger eventBus event.Bus - eventID uuid.UUID mux http.Handler quit chan bool @@ -99,7 +97,6 @@ func (p *Plugin) Enable(api infrared.PluginAPI) error { } func (p Plugin) Disable() error { - p.eventBus.DetachRecipient(p.eventID) select { case p.quit <- true: default: diff --git a/internal/plugin/prometheus/prometheus.go b/internal/plugin/prometheus/prometheus.go index 37fe56da..6a9631a9 100644 --- a/internal/plugin/prometheus/prometheus.go +++ b/internal/plugin/prometheus/prometheus.go @@ -3,10 +3,8 @@ package prometheus import ( "context" "net/http" - "strings" "time" - "github.com/gofrs/uuid" "github.com/haveachin/infrared/internal/app/infrared" "github.com/haveachin/infrared/internal/pkg/config" "github.com/haveachin/infrared/pkg/event" @@ -26,7 +24,7 @@ type Plugin struct { Config PluginConfig logger *zap.Logger eventBus event.Bus - eventID uuid.UUID + eventID string mux http.Handler quit chan bool @@ -108,8 +106,8 @@ func (p Plugin) Disable() error { return nil } -func (p Plugin) registerEventHandler() { - id, _ := p.eventBus.AttachHandler(uuid.Nil, p.handleEvent) +func (p *Plugin) registerEventHandler() { + id, _ := p.eventBus.AttachHandlerAsyncFunc("", p.handleEvent) p.eventID = id } @@ -162,12 +160,12 @@ func (p Plugin) handleEvent(e event.Event) { server := e.Server.ID() domain := e.MatchedDomain p.playersConnected.With(prometheus.Labels{"host": domain, "server": server, "edition": edition}).Dec() - case infrared.ServerRegisterEvent: + /*case infrared.ServerRegisterEvent: edition := e.Server.Edition().String() server := e.Server.ID() for _, domain := range e.Server.Domains() { domain = strings.ToLower(domain) p.playersConnected.With(prometheus.Labels{"host": domain, "server": server, "edition": edition}) - } + }*/ } } diff --git a/internal/plugin/traffic_limiter/config.go b/internal/plugin/traffic_limiter/config.go new file mode 100644 index 00000000..75c2c018 --- /dev/null +++ b/internal/plugin/traffic_limiter/config.go @@ -0,0 +1,67 @@ +package traffic_limiter + +import ( + "encoding/json" + "fmt" + + "github.com/c2h5oh/datasize" + "github.com/haveachin/infrared/internal/pkg/java" + "github.com/imdario/mergo" +) + +type trafficLimiterConfig struct { + ServerIDs []string `mapstructure:"serverIds"` + File string `mapstructure:"file"` + TrafficLimit datasize.ByteSize `mapstructure:"trafficLimit"` + ResetCron string `mapstructure:"resetCron"` + OutOfBandwidthStatus java.ServerStatusConfig `mapstructure:"outOfBandwidthStatus"` + OutOfBandwidthMessage string `mapstructure:"outOfBandwidthMessage"` +} + +func (cfg PluginConfig) loadTrafficLimiterConfigs() (map[string]trafficLimiter, error) { + trafficLimiters := map[string]trafficLimiter{} + storages := map[string]*storage{} + for _, bwCfg := range cfg.TrafficLimiters { + if err := mergo.Merge(&bwCfg, cfg.Defaults.TrafficLimiter); err != nil { + return nil, err + } + + storage, ok := storages[bwCfg.File] + if !ok { + var err error + storage, err = newStorage(bwCfg.File) + if err != nil { + return nil, err + } + storages[bwCfg.File] = storage + } + + statusJSON, err := java.NewServerServerStatus(bwCfg.OutOfBandwidthStatus) + if err != nil { + return nil, err + } + + bb, err := json.Marshal(statusJSON.ResponseJSON()) + if err != nil { + return nil, err + } + + for _, sID := range bwCfg.ServerIDs { + _, ok := trafficLimiters[sID] + if ok { + return nil, fmt.Errorf("server with ID %q already has a traffic limiter", sID) + } + + trafficLimiters[sID] = trafficLimiter{ + file: bwCfg.File, + trafficLimit: bwCfg.TrafficLimit, + resetCron: bwCfg.ResetCron, + storage: storage, + OutOfBandwidthMessage: bwCfg.OutOfBandwidthMessage, + OutOfBandwidthStatusJSON: string(bb), + } + } + } + + return trafficLimiters, nil +} diff --git a/internal/plugin/traffic_limiter/storage.go b/internal/plugin/traffic_limiter/storage.go new file mode 100644 index 00000000..8eeb6731 --- /dev/null +++ b/internal/plugin/traffic_limiter/storage.go @@ -0,0 +1,147 @@ +package traffic_limiter + +import ( + "errors" + "io" + "os" + "sync" + "time" + + "gopkg.in/yaml.v3" +) + +type storage struct { + path string + mu sync.Mutex + cache *Bandwidth +} + +func newStorage(path string) (*storage, error) { + if err := createFileIfNotExist(path); err != nil { + return nil, err + } + + return &storage{ + path: path, + }, nil +} + +func createFileIfNotExist(path string) error { + exists, err := doesFileExist(path) + if err != nil { + return err + } + + if exists { + return nil + } + + return os.WriteFile(path, nil, 0644) +} + +func doesFileExist(path string) (bool, error) { + _, err := os.Stat(path) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return false, err + } + return false, nil + } + + return true, nil +} + +type Bandwidth struct { + Servers map[string]BandwidthServer `yaml:"servers"` +} + +type BandwidthServer struct { + ConsumedBytes int64 `yaml:"consumedBytes"` + LastResetAt time.Time `yaml:"lastResetAt"` +} + +func (s *storage) readBandwidth() (*Bandwidth, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cache != nil { + return s.cache, nil + } + + f, err := os.Open(s.path) + if err != nil { + return nil, err + } + defer f.Close() + + bw := &Bandwidth{} + if err := yaml.NewDecoder(f).Decode(bw); err != nil { + if errors.Is(err, io.EOF) { + return &Bandwidth{ + Servers: map[string]BandwidthServer{}, + }, nil + } + return nil, err + } + + return bw, nil +} + +func (s *storage) writeBandwidth(bw *Bandwidth) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.cache = bw + + data, err := yaml.Marshal(bw) + if err != nil { + return err + } + + return os.WriteFile(s.path, data, 0644) +} + +func (s *storage) ConsumedBytes(serverID string) (int64, error) { + bw, err := s.readBandwidth() + if err != nil { + return 0, err + } + + return bw.Servers[serverID].ConsumedBytes, nil +} + +func (s *storage) AddConsumedBytes(serverID string, consumedBytes int64) (int64, error) { + bw, err := s.readBandwidth() + if err != nil { + return 0, err + } + + srv, ok := bw.Servers[serverID] + if !ok { + srv = BandwidthServer{ + ConsumedBytes: consumedBytes, + } + } else { + srv.ConsumedBytes += consumedBytes + } + bw.Servers[serverID] = srv + + if err := s.writeBandwidth(bw); err != nil { + return 0, err + } + return srv.ConsumedBytes, nil +} + +func (s *storage) ResetConsumedBytes(serverID string) error { + bw, err := s.readBandwidth() + if err != nil { + return err + } + + bw.Servers[serverID] = BandwidthServer{ + ConsumedBytes: 0, + LastResetAt: time.Now(), + } + + return s.writeBandwidth(bw) +} diff --git a/internal/plugin/traffic_limiter/traffic_limiter.go b/internal/plugin/traffic_limiter/traffic_limiter.go new file mode 100644 index 00000000..7457af32 --- /dev/null +++ b/internal/plugin/traffic_limiter/traffic_limiter.go @@ -0,0 +1,188 @@ +package traffic_limiter + +import ( + "errors" + "fmt" + + "github.com/c2h5oh/datasize" + "github.com/haveachin/infrared/internal/app/infrared" + "github.com/haveachin/infrared/internal/pkg/config" + "github.com/haveachin/infrared/internal/pkg/java" + "github.com/haveachin/infrared/internal/pkg/java/protocol" + "github.com/haveachin/infrared/internal/pkg/java/protocol/login" + "github.com/haveachin/infrared/internal/pkg/java/protocol/status" + "github.com/haveachin/infrared/pkg/event" + "github.com/robfig/cron/v3" + "go.uber.org/zap" +) + +type trafficLimiter struct { + file string + trafficLimit datasize.ByteSize + resetCron string + storage *storage + OutOfBandwidthStatusJSON string + OutOfBandwidthMessage string +} + +type PluginConfig struct { + TrafficLimiters map[string]trafficLimiterConfig `mapstructure:"trafficLimiters"` + Defaults struct { + TrafficLimiter trafficLimiterConfig `mapstructure:"trafficLimiter"` + } `mapstructure:"defaults"` +} + +type Plugin struct { + Config PluginConfig + logger *zap.Logger + eventBus event.Bus + eventIDs []string + // ServerID mapped to trafficLimiter + trafficLimiters map[string]trafficLimiter +} + +func (p Plugin) Name() string { + return "Traffic Limiter" +} + +func (p Plugin) Version() string { + return "internal" +} + +func (p *Plugin) Load(cfg map[string]any) error { + if err := config.Unmarshal(cfg, &p.Config); err != nil { + return err + } + + trafficLimiters, err := p.Config.loadTrafficLimiterConfigs() + if err != nil { + return err + } + p.trafficLimiters = trafficLimiters + + return nil +} + +func (p *Plugin) Reload(cfg map[string]any) error { + var pluginCfg PluginConfig + if err := config.Unmarshal(cfg, &pluginCfg); err != nil { + return err + } + + p.Config = pluginCfg + return nil +} + +func (p *Plugin) Enable(api infrared.PluginAPI) error { + p.logger = api.Logger() + p.eventBus = api.EventBus() + + p.registerEventHandler() + p.startCronJobs() + + return nil +} + +func (p Plugin) Disable() error { + for _, id := range p.eventIDs { + p.eventBus.DetachRecipient(id) + } + return nil +} + +func (p *Plugin) startCronJobs() error { + resetCron := cron.New() + for srvID, tl := range p.trafficLimiters { + if _, err := resetCron.AddJob(tl.resetCron, cron.FuncJob(func() { + tl.storage.ResetConsumedBytes(srvID) + })); err != nil { + return err + } + } + go resetCron.Start() + return nil +} + +func (p *Plugin) registerEventHandler() { + preConnConnectingID, _ := p.eventBus.AttachHandlerFunc("", p.onPreConnConnecting, + infrared.PrePlayerJoinEventTopic, + ) + p.eventIDs = append(p.eventIDs, preConnConnectingID) + + playerLeaveID, _ := p.eventBus.AttachHandlerAsyncFunc("", p.onPlayerLeave, + infrared.PlayerLeaveEventTopicAsync, + ) + p.eventIDs = append(p.eventIDs, playerLeaveID) +} + +func (p Plugin) onPlayerLeave(e event.Event) { + switch e := e.Data.(type) { + case infrared.PlayerLeaveEvent: + tl, ok := p.trafficLimiters[e.Server.ID()] + if !ok { + return + } + + _, err := tl.storage.AddConsumedBytes(e.Server.ID(), e.ConsumedBytes) + if err != nil { + p.logger.Error("failed to add consumed bytes", zap.Error(err)) + return + } + } +} + +func (p Plugin) onPreConnConnecting(e event.Event) (any, error) { + switch e := e.Data.(type) { + case infrared.PreConnConnectingEvent: + t, ok := p.trafficLimiters[e.Server.ID()] + if !ok { + return nil, nil + } + + totalBytes, err := t.storage.ConsumedBytes(e.Server.ID()) + if err != nil { + p.logger.Error("failed to read consumed bytes", zap.Error(err)) + return nil, nil + } + + if t.trafficLimit <= datasize.ByteSize(totalBytes) { + p.logger.Info("traffic limit reached", zap.String("serverID", e.Server.ID())) + t.disconnectPlayer(e.ProcessedConn) + return nil, errors.New("traffic limit reached") + } + } + return nil, nil +} + +func (t *trafficLimiter) disconnectPlayer(pc infrared.ProcessedConn) error { + defer pc.Close() + + switch pc := pc.(type) { + case *java.ProcessedConn: + if pc.IsLoginRequest() { + msg := infrared.ExecuteMessageTemplate(t.OutOfBandwidthMessage, pc) + pk := login.ClientBoundDisconnect{ + Reason: protocol.Chat(fmt.Sprintf("{\"text\":\"%s\"}", msg)), + }.Marshal() + return pc.WritePacket(pk) + } + + msg := infrared.ExecuteMessageTemplate(t.OutOfBandwidthStatusJSON, pc) + pk := status.ClientBoundResponse{ + JSONResponse: protocol.String(msg), + }.Marshal() + + if err := pc.WritePacket(pk); err != nil { + return err + } + + ping, err := pc.ReadPacket(status.MaxSizeServerBoundPingRequest) + if err != nil { + return err + } + + return pc.WritePacket(ping) + default: + return errors.New("could not disconnect player") + } +} diff --git a/internal/plugin/webhook/webhook.go b/internal/plugin/webhook/webhook.go index 8c6ec9a0..9bc2fc5c 100644 --- a/internal/plugin/webhook/webhook.go +++ b/internal/plugin/webhook/webhook.go @@ -5,7 +5,6 @@ import ( "net/http" "time" - "github.com/gofrs/uuid" "github.com/haveachin/infrared/internal/app/infrared" "github.com/haveachin/infrared/internal/pkg/config" "github.com/haveachin/infrared/pkg/event" @@ -61,7 +60,7 @@ type Plugin struct { Config PluginConfig logger *zap.Logger eventBus event.Bus - eventID uuid.UUID + eventID string // GatewayID mapped to webhooks whks map[string][]webhook.Webhook } @@ -96,7 +95,7 @@ func (p *Plugin) Enable(api infrared.PluginAPI) error { p.logger = api.Logger() p.eventBus = api.EventBus() - id, _ := p.eventBus.AttachHandler(uuid.Nil, p.handleEvent) + id, _ := p.eventBus.AttachHandlerAsyncFunc("", p.handleEvent) p.eventID = id return nil @@ -148,7 +147,7 @@ func unmarshalServer(data *eventData, s infrared.Server) { func (p Plugin) handleEvent(e event.Event) { var data eventData switch e := e.Data.(type) { - case infrared.NewConnEvent: + case infrared.AcceptedConnEvent: unmarshalConn(&data, e.Conn) case infrared.PreConnProcessingEvent: unmarshalConn(&data, e.Conn) diff --git a/pkg/event/bus.go b/pkg/event/bus.go index 58b3607d..310eeef3 100644 --- a/pkg/event/bus.go +++ b/pkg/event/bus.go @@ -18,21 +18,65 @@ var ErrRecipientNotFound = errors.New("target recipient not found") type Bus interface { // Push pushes an event with arbitrary data to the event bus. Push(data any, topic ...string) - PushTo(receiverId uuid.UUID, data any, topic ...string) error - AttachHandler(id uuid.UUID, fn Handler, topics ...string) (handlerID uuid.UUID, replaced bool) - AttachChannel(id uuid.UUID, ch Channel, topics ...string) (channelID uuid.UUID, replaced bool) - DetachRecipient(id uuid.UUID) (success bool) + PushTo(to string, data any, topic ...string) error + Request(data any, topics ...string) <-chan Reply + RequestFrom(to string, data any, topics ...string) (<-chan Reply, error) + AttachHandler(id string, h HandlerSync, topics ...string) (handlerID string, replaced bool) + AttachHandlerFunc(id string, fn HandlerSyncFunc, topics ...string) (handlerID string, replaced bool) + AttachHandlerAsync(id string, h Handler, topics ...string) (handlerID string, replaced bool) + AttachHandlerAsyncFunc(id string, fh HandlerFunc, topics ...string) (handlerID string, replaced bool) + DetachRecipient(id string) (success bool) DetachAllRecipients() (n int) } +func Push(data any, topics ...string) { + DefaultBus.Push(data, topics...) +} + +func PushTo(to string, data any, topics ...string) error { + return DefaultBus.PushTo(to, data, topics...) +} + +func Request(data any, topics ...string) <-chan Reply { + return DefaultBus.Request(data, topics...) +} + +func RequestFrom(to string, data any, topics ...string) (<-chan Reply, error) { + return DefaultBus.RequestFrom(to, data, topics...) +} + +func AttachHandler(id string, h HandlerSync, topics ...string) (string, bool) { + return DefaultBus.AttachHandler(id, h, topics...) +} + +func AttachHandlerFunc(id string, h HandlerSyncFunc, topics ...string) (string, bool) { + return DefaultBus.AttachHandlerFunc(id, h, topics...) +} + +func AttachHandlerAsync(id string, h Handler, topics ...string) (string, bool) { + return DefaultBus.AttachHandlerAsync(id, h, topics...) +} + +func AttachHandlerAsyncFunc(id string, fn HandlerFunc, topics ...string) (string, bool) { + return DefaultBus.AttachHandlerAsync(id, fn, topics...) +} + +func DetachRecipient(id string) bool { + return DefaultBus.DetachRecipient(id) +} + +func DetachAllRecipients() int { + return DefaultBus.DetachAllRecipients() +} + type internalBus struct { sync.RWMutex - ws map[uuid.UUID]worker + ws map[string]worker } func NewInternalBus() Bus { return &internalBus{ - ws: map[uuid.UUID]worker{}, + ws: map[string]worker{}, } } @@ -40,57 +84,54 @@ func (b *internalBus) Push(data any, topics ...string) { b.sendEvent(New(data, topics...)) } -func Push(data any, topics ...string) { - DefaultBus.Push(data, topics...) -} - -func (b *internalBus) PushTo(to uuid.UUID, data any, topics ...string) error { +func (b *internalBus) PushTo(to string, data any, topics ...string) error { return b.sendEventTo(to, New(data, topics...)) } -func PushTo(to uuid.UUID, data any, topics ...string) error { - return DefaultBus.PushTo(to, data, topics...) +func (b *internalBus) Request(data any, topics ...string) <-chan Reply { + replyChan, _ := b.doRequest("", data, topics...) + return replyChan } -func (b *internalBus) AttachHandler(id uuid.UUID, fn Handler, topics ...string) (uuid.UUID, bool) { - if fn == nil { - println() - panic(fmt.Sprintf("AttachHandler called with id %q and nil handler", id)) - } - - if len(topics) > 0 { - fn = eventFilterFunc(topics, fn) - } - - if id == uuid.Nil { - id = uuid.Must(uuid.NewV4()) - } - - b.Lock() - defer b.Unlock() - w, replaced := b.ws[id] - if replaced { - w.close() - } +func (b *internalBus) RequestFrom(to string, data any, topics ...string) (<-chan Reply, error) { + return b.doRequest(to, data, topics...) +} - b.ws[id] = newWorker(id, fn) +func (b *internalBus) AttachHandler(id string, h HandlerSync, topics ...string) (string, bool) { + return b.attachHandler(id, HandlerFunc(func(e Event) { + if e.replyChan == nil { + panic("attached to async topic") + } - return id, replaced + data, err := h.HandleSync(e) + e.replyChan <- Reply{ + EventID: e.ID, + HandlerID: id, + Data: data, + Err: err, + } + }), topics) } -func AttachHandler(id uuid.UUID, fn Handler, topics ...string) (uuid.UUID, bool) { - return DefaultBus.AttachHandler(id, fn, topics...) +func (b *internalBus) AttachHandlerFunc(id string, fn HandlerSyncFunc, topics ...string) (string, bool) { + return b.AttachHandler(id, fn, topics...) } -func (b *internalBus) AttachChannel(id uuid.UUID, ch Channel, topics ...string) (uuid.UUID, bool) { - return b.AttachHandler(id, eventChanFunc(ch), topics...) +func (b *internalBus) AttachHandlerAsync(id string, h Handler, topics ...string) (string, bool) { + return b.attachHandler(id, HandlerFunc(func(e Event) { + if e.replyChan != nil { + e.replyChan <- Reply{} + } + + h.Handle(e) + }), topics) } -func AttachChannel(id uuid.UUID, ch Channel, topics ...string) (uuid.UUID, bool) { - return DefaultBus.AttachChannel(id, ch, topics...) +func (b *internalBus) AttachHandlerAsyncFunc(id string, fn HandlerFunc, topics ...string) (string, bool) { + return b.AttachHandlerAsync(id, fn, topics...) } -func (b *internalBus) DetachRecipient(id uuid.UUID) bool { +func (b *internalBus) DetachRecipient(id string) bool { b.Lock() defer b.Unlock() @@ -103,10 +144,6 @@ func (b *internalBus) DetachRecipient(id uuid.UUID) bool { return false } -func DetachRecipient(id uuid.UUID) bool { - return DefaultBus.DetachRecipient(id) -} - func (b *internalBus) DetachAllRecipients() int { b.Lock() defer b.Unlock() @@ -115,24 +152,21 @@ func (b *internalBus) DetachAllRecipients() int { for _, w := range b.ws { w.close() } - b.ws = map[uuid.UUID]worker{} + b.ws = map[string]worker{} return n } -func DetachAllRecipients() int { - return DefaultBus.DetachAllRecipients() -} - -func (b *internalBus) sendEvent(e Event) { +func (b *internalBus) sendEvent(e Event) int { b.RLock() defer b.RUnlock() for _, w := range b.ws { w.push(e) } + return len(b.ws) } -func (b *internalBus) sendEventTo(to uuid.UUID, e Event) error { +func (b *internalBus) sendEventTo(to string, e Event) error { b.RLock() defer b.RUnlock() if w, ok := b.ws[to]; ok { @@ -142,36 +176,98 @@ func (b *internalBus) sendEventTo(to uuid.UUID, e Event) error { return ErrRecipientNotFound } -func eventFilterFunc(topics []string, fn Handler) Handler { - return func(event Event) { +func (b *internalBus) attachHandler(id string, h Handler, topics []string) (string, bool) { + if h == nil { + panic(fmt.Sprintf("AttachHandler called with id %q and nil handler", id)) + } + + if topics != nil { + h = topicFilterFunc(topics, h) + } + + if id == "" { + id = uuid.Must(uuid.NewV4()).String() + } + + b.Lock() + defer b.Unlock() + w, replaced := b.ws[id] + if replaced { + w.close() + } + + b.ws[id] = newWorker(id, h) + + return id, replaced +} + +func (b *internalBus) doRequest(to string, data any, topics ...string) (<-chan Reply, error) { + bufChan := make(chan Reply) + e := New(data, topics...) + e.replyChan = bufChan + + n := 1 + if to == "" { + n = b.sendEvent(e) + } else if err := b.sendEventTo(to, e); err != nil { + close(bufChan) + return nil, err + } + + replyChan := make(chan Reply) + go func() { + for i := 0; i < n; i++ { + replyChan <- <-bufChan + } + close(bufChan) + close(replyChan) + }() + + return replyChan, nil +} + +func topicFilterFunc(topics []string, h Handler) Handler { + if len(topics) == 1 { + topic := topics[0] + return HandlerFunc(func(e Event) { + if e.hasTopic(topic) { + h.Handle(e) + return + } + + if e.replyChan != nil { + e.replyChan <- Reply{} + } + }) + } + + return HandlerFunc(func(e Event) { for _, topic := range topics { - if event.hasTopic(topic) { - fn(event) + if e.hasTopic(topic) { + h.Handle(e) return } } - } -} -func eventChanFunc(ch Channel) Handler { - return func(event Event) { - ch <- event - } + if e.replyChan != nil { + e.replyChan <- Reply{} + } + }) } type worker struct { - id uuid.UUID + id string in chan Event out chan Event - fn Handler + h Handler } -func newWorker(id uuid.UUID, fn Handler) worker { +func newWorker(id string, h Handler) worker { w := worker{ id: id, in: make(chan Event, 100), out: make(chan Event), - fn: fn, + h: h, } go w.publish() go w.process() @@ -185,24 +281,28 @@ func (w *worker) close() { } func (w *worker) publish() { - for n := range w.in { + for e := range w.in { + deadline := time.NewTimer(time.Second * 5) select { - case w.out <- n: - case <-time.After(time.Second * 5): + case w.out <- e: + if !deadline.Stop() { + <-deadline.C + } + case <-deadline.C: } } } func (w *worker) process() { - for n := range w.out { - w.fn(n) + for e := range w.out { + w.h.Handle(e) } close(w.out) } -func (w *worker) push(n Event) { +func (w *worker) push(e Event) { select { - case w.in <- n: + case w.in <- e: default: } } diff --git a/pkg/event/event.go b/pkg/event/event.go index 01fc5e5f..cee8aae4 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -6,11 +6,19 @@ import ( "github.com/gofrs/uuid" ) +type Reply struct { + EventID string + HandlerID string + Data any + Err error +} + type Event struct { - ID uuid.UUID + ID string OccurredAt time.Time Topics []string Data any + replyChan chan<- Reply } func (e Event) hasTopic(topic string) bool { @@ -22,13 +30,29 @@ func (e Event) hasTopic(topic string) bool { return false } -type Handler func(Event) +type HandlerSync interface { + HandleSync(Event) (any, error) +} + +type HandlerSyncFunc func(Event) (any, error) + +func (fn HandlerSyncFunc) HandleSync(e Event) (any, error) { + return fn(e) +} + +type Handler interface { + Handle(Event) +} -type Channel chan Event +type HandlerFunc func(Event) + +func (fn HandlerFunc) Handle(e Event) { + fn(e) +} func New(data any, topic ...string) Event { return Event{ - ID: uuid.Must(uuid.NewV4()), + ID: uuid.Must(uuid.NewV4()).String(), OccurredAt: time.Now(), Topics: topic, Data: data, diff --git a/test/attacks/dos/main.go b/test/attacks/dos/main.go index fa17281d..9b01eb89 100644 --- a/test/attacks/dos/main.go +++ b/test/attacks/dos/main.go @@ -17,7 +17,7 @@ var loginStartPayload []byte func init() { handshake := handshaking.ServerBoundHandshake{ ProtocolVersion: 758, - ServerAddress: "laptop.fritz.box", + ServerAddress: "localhost", ServerPort: 25565, NextState: 2, } @@ -47,7 +47,7 @@ func main() { log.Printf("%d requests sent\n", i) } - c, err := net.Dial("tcp", "192.168.178.20:25565") + c, err := net.Dial("tcp", "localhost:25565") if err != nil { log.Fatal(err) }