From 703050c9a5db4ce5dc48b83411cb3cc1bb109db4 Mon Sep 17 00:00:00 2001 From: Marc <14913675+MarcStdt@users.noreply.github.com> Date: Sun, 21 Jul 2024 18:22:52 +0200 Subject: [PATCH] chore: init command (#17) * chore: init command * chore: ci * chore: ci * chore different releases * feat: queue manager * chore: add lockfile status tests * chore: tests and fixes * chore: opeanpi * chore: legacy commands * chore: new scroll related commands * fix: htop example --- .docker/druid-install-command.sh | 18 +- .docker/entrypoint.sh | 5 + .github/workflows/pr.yml | 18 ++ .vscode/launch.json | 12 +- Makefile | 4 + cmd/root.go | 1 + cmd/run.go | 9 +- cmd/scroll.go | 19 ++ cmd/scroll_validate.go | 34 ++ cmd/serve.go | 34 +- docs/docs.go | 46 +-- docs/swagger.json | 46 +-- docs/swagger.yaml | 30 +- examples/htop/.scroll/scroll-lock.json | 2 +- examples/htop/.scroll/scroll.yaml | 18 +- examples/minecraft/.scroll/scroll-lock.json | 2 +- examples/minecraft/.scroll/scroll.yaml | 78 ++--- examples/nginx/.scroll/scroll-lock.json | 6 +- examples/nginx/.scroll/scroll.yaml | 32 +- internal/core/domain/scroll.go | 42 ++- internal/core/domain/scroll_lock.go | 8 +- internal/core/ports/services_ports.go | 17 +- internal/core/services/cron_manager.go | 19 +- internal/core/services/process_launcher.go | 240 ++------------ internal/core/services/process_manager.go | 53 ++-- internal/core/services/queue_manager.go | 309 ++++++++++++++++++ internal/core/services/scroll_service.go | 15 +- internal/handler/scroll_handler.go | 29 +- internal/signals/process_shutdown.go | 12 +- internal/utils/misc.go | 14 - test/container/minecraft_test.go | 6 +- test/container/nginx_test.go | 3 +- test/integration/example_test.go | 59 +++- test/mock/services.go | 138 ++++++-- test/procedure_launcher_test.go | 58 ---- test/process_manager_test.go | 16 +- test/queue_manager_test.go | 328 ++++++++++++++++++++ 37 files changed, 1194 insertions(+), 586 deletions(-) create mode 100644 cmd/scroll.go create mode 100644 cmd/scroll_validate.go create mode 100644 internal/core/services/queue_manager.go delete mode 100644 test/procedure_launcher_test.go create mode 100644 test/queue_manager_test.go diff --git a/.docker/druid-install-command.sh b/.docker/druid-install-command.sh index 98d4797..746f84f 100644 --- a/.docker/druid-install-command.sh +++ b/.docker/druid-install-command.sh @@ -1,21 +1,19 @@ #!/bin/bash #check if first argument is there, set CHANNEL to it, otherwise default to latest -if [ -z "$1" ]; then - CHANNEL="$CHANNEL/download" +if [ -z "$CHANNEL" ]; then + URL_PATH="latest/download" else - CHANNEL=download/$1 + URL_PATH=download/$CHANNEL fi -wget -O /app/resources/druid https://github.com/highcard-dev/druid-cli/releases/$CHANNEL/druid -wget -O /app/resources/druid_rcon https://github.com/highcard-dev/druid-cli/releases/$CHANNEL/druid_rcon -wget -O /app/resources/druid_rcon_web https://github.com/highcard-dev/druid-cli/releases/$CHANNEL/druid_rcon_web -wget -O /app/resources/entrypoint.sh https://github.com/highcard-dev/druid-cli/releases/$CHANNEL/entrypoint.sh +wget -O /app/resources/druid https://github.com/highcard-dev/druid-cli/releases/$URL_PATH/druid +wget -O /app/resources/druid_rcon https://github.com/highcard-dev/druid-cli/releases/$URL_PATH/druid_rcon +wget -O /app/resources/druid_rcon_web https://github.com/highcard-dev/druid-cli/releases/$URL_PATH/druid_rcon_web +wget -O /app/resources/entrypoint.sh https://github.com/highcard-dev/druid-cli/releases/$URL_PATH/entrypoint.sh chmod +x /app/resources/druid /app/resources/druid_rcon /app/resources/druid_rcon_web # Modify the PATH variable to prioritize /app/resources export PATH=/app/resources:$PATH -bash /app/resources/entrypoint.sh - -#https://github.com/highcard-dev/druid-cli/releases/download/0.1.8-prerelease3/druid \ No newline at end of file +bash /app/resources/entrypoint.sh $@ diff --git a/.docker/entrypoint.sh b/.docker/entrypoint.sh index 6a3f41c..a668e11 100755 --- a/.docker/entrypoint.sh +++ b/.docker/entrypoint.sh @@ -70,6 +70,11 @@ if [ -z "$input" ] || [[ $input =~ ([^/]+)/([^:]+):([^/]+) ]]; then then args+=("--port" "${DRUID_PORT}") fi + + if [ ! -z "${DRUID_INIT}" ]; + then + args+=("--init") + fi echo "Running druid with args from env: ${args[@]}" exec druid "${args[@]}" diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index e86b48b..054548e 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -12,6 +12,13 @@ jobs: - uses: actions/setup-go@v5 with: go-version: "^1.21" + - name: Extract branch name + id: extract_branch + run: echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/} | sed 's/\//-/g')" >> $GITHUB_ENV + - uses: paulhatch/semantic-version@v5.3.0 + id: version + with: + version_format: "${major}.${minor}.${patch}-${{ env.BRANCH_NAME }}${increment}" - run: make test-integration-docker name: Run integration tests inside Docker - run: make test @@ -20,3 +27,14 @@ jobs: name: Build - run: make build-plugins name: Build Plugins + - name: Prerelease + uses: softprops/action-gh-release@v2 + with: + tag_name: ${{ steps.version.outputs.version }} + prerelease: true + files: | + bin/druid + bin/druid_rcon + bin/druid_rcon_web_rust + .docker/entrypoint.sh + .docker/druid-install-command.sh diff --git a/.vscode/launch.json b/.vscode/launch.json index df72c9f..b57cc5d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -151,6 +151,16 @@ "trace": "verbose", "debugAdapter": "legacy" - } + }, + { + "name": "Test Current File", + "type": "go", + "request": "launch", + "mode": "test", + "program": "${file}", + "env": {}, + "args": [], + "showLog": true + } ] } \ No newline at end of file diff --git a/Makefile b/Makefile index ca34abd..4d0350e 100644 --- a/Makefile +++ b/Makefile @@ -30,6 +30,10 @@ mock: test: go test -v ./test +test_clean: + go clean -testcache + go test -v ./test + test-integration: go test -v ./test/integration diff --git a/cmd/root.go b/cmd/root.go index 62a9358..2312b30 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -31,6 +31,7 @@ func init() { RootCmd.AddCommand(UpdateCommand) RootCmd.AddCommand(SemverCmd) RootCmd.AddCommand(VersionCmd) + RootCmd.AddCommand(ScrollCmd) c, _ := os.Getwd() diff --git a/cmd/run.go b/cmd/run.go index ab142ec..81d138a 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "strings" "github.com/highcard-dev/daemon/internal/core/services" "github.com/highcard-dev/daemon/internal/core/services/registry" @@ -33,6 +32,8 @@ var RunCmd = &cobra.Command{ } processLauncher := services.NewProcedureLauncher(client, processManager, services.NewPluginManager(), consoleService, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, processLauncher) + if !scrollService.LockExists() { scrollService.WriteNewScrollLock() logger.Log().Info("Lock file created") @@ -55,11 +56,9 @@ var RunCmd = &cobra.Command{ return err } - parts := strings.Split(args[0], ".") - - command := strings.TrimPrefix(args[0], parts[0]+".") + command := args[0] - err = processLauncher.RunNew(command, parts[0], false) + err = queueManager.AddItem(command, false) return err }, } diff --git a/cmd/scroll.go b/cmd/scroll.go new file mode 100644 index 0000000..6b7df2c --- /dev/null +++ b/cmd/scroll.go @@ -0,0 +1,19 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +// scrollCmd represents the command for scrolling +var ScrollCmd = &cobra.Command{ + Use: "scroll", + Short: "Commands related to the scroll file", + Long: `Commands related to the scroll file`, + Run: func(cmd *cobra.Command, args []string) { + cmd.Usage() + }, +} + +func init() { + ScrollCmd.AddCommand(ScrollValidateCmd) +} diff --git a/cmd/scroll_validate.go b/cmd/scroll_validate.go new file mode 100644 index 0000000..674ae44 --- /dev/null +++ b/cmd/scroll_validate.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "fmt" + + "github.com/highcard-dev/daemon/internal/core/domain" + "github.com/spf13/cobra" +) + +var ScrollValidateCmd = &cobra.Command{ + Use: "validate", + Short: "Validates the scroll file", + Long: `This command validates the scroll file to ensure it meets the required criteria.`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + scrollDir := cwd + if len(args) > 0 { + scrollDir = args[0] + } + + scroll, err := domain.NewScroll(scrollDir) + + if err != nil { + return fmt.Errorf("failed to load scroll: %w", err) + } + + if err := scroll.Validate(); err != nil { + return fmt.Errorf("failed to validate scroll: %w", err) + } + + fmt.Println("Scroll validated successfully.") + return nil + }, +} diff --git a/cmd/serve.go b/cmd/serve.go index 73ff95f..e7c3590 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -22,6 +22,7 @@ var ignoreVersionCheck bool var port int var shutdownWait int var additionalEndpoints []string +var initScroll bool var ServeCommand = &cobra.Command{ Use: "serve", @@ -87,7 +88,9 @@ to interact and monitor the Scroll Application`, processLauncher := services.NewProcedureLauncher(client, processManager, pluginManager, consoleService, logManager, scrollService) - scrollHandler := handler.NewScrollHandler(scrollService, pluginManager, processLauncher) + queueManager := services.NewQueueManager(scrollService, processLauncher) + + scrollHandler := handler.NewScrollHandler(scrollService, pluginManager, processLauncher, queueManager) processHandler := handler.NewProcessHandler(processManager) scrollLogHandler := handler.NewScrollLogHandler(scrollService, logManager, processManager) scrollMetricHandler := handler.NewScrollMetricHandler(scrollService, processMonitor) @@ -104,7 +107,7 @@ to interact and monitor the Scroll Application`, a := s.Initialize() - signals.SetupSignals(processLauncher, processManager, a, shutdownWait) + signals.SetupSignals(queueManager, processManager, a, shutdownWait) currentScroll, lock, err := scrollService.Bootstrap(ignoreVersionCheck) if err != nil { @@ -124,14 +127,8 @@ to interact and monitor the Scroll Application`, if err != nil { return err } - //run if something is there - err = processLauncher.StartLockfile(lock) - - if err != nil { - return err - } - } else { - logger.Log().Info("No lock file found, bootstrapping") + } else if initScroll { + logger.Log().Info("No lock file found, but init command available. Bootstrapping...") //There is an error here. We need to bootstrap the files before we render out the templates in the bootstrap func above err := scrollService.CreateLockAndBootstrapFiles() if err != nil { @@ -151,17 +148,28 @@ to interact and monitor the Scroll Application`, } //start scroll.init process //initialize if nothing is there - err = processLauncher.Initalize(lock) + err = queueManager.AddItem(currentScroll.Init, true) if err != nil { return err } + + scrollService.WriteNewScrollLock() + logger.Log().Info("Bootstrapping done") } + err = queueManager.QueueLockFile() + if err != nil { + return err + } + + //run if something is there + go queueManager.Work() + //schedule crons logger.Log().Info("Schedule crons") - cronManager := services.NewCronManager(currentScroll.Cronjobs, processLauncher) + cronManager := services.NewCronManager(currentScroll.Cronjobs, queueManager) err = cronManager.Init() if err != nil { @@ -188,6 +196,8 @@ func init() { ServeCommand.Flags().StringVarP(&userId, "user-id", "u", "", "Allowed user id") + ServeCommand.Flags().BoolVarP(&initScroll, "init", "", false, "Initialize the scroll if no lock file is present") + ServeCommand.Flags().BoolVarP(&ignoreVersionCheck, "ignore-version-check", "", false, "Ignore version check") ServeCommand.Flags().StringArrayVarP(&additionalEndpoints, "additional-endpoints", "", []string{}, "Additional endpoints to serve. Valid values: annotations") diff --git a/docs/docs.go b/docs/docs.go index 9c8afab..101db83 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -374,17 +374,6 @@ const docTemplate = `{ "wait": {} } }, - "ProcessCommand": { - "type": "object", - "properties": { - "commands": { - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/CommandInstructionSet" - } - } - } - }, "ProcessMonitorMetrics": { "type": "object", "properties": { @@ -494,6 +483,18 @@ const docTemplate = `{ "description": "don't make this a semver, it's not allways", "type": "string" }, + "commands": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/CommandInstructionSet" + } + }, + "cronjobs": { + "type": "array", + "items": { + "$ref": "#/definitions/domain.Cronjob" + } + }, "desc": { "type": "string" }, @@ -512,12 +513,6 @@ const docTemplate = `{ } } }, - "processes": { - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/ProcessCommand" - } - }, "version": { "type": "string" } @@ -565,6 +560,20 @@ const docTemplate = `{ "ConsoleTypePlugin" ] }, + "domain.Cronjob": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "name": { + "type": "string" + }, + "schedule": { + "type": "string" + } + } + }, "domain.Process": { "type": "object", "properties": { @@ -626,9 +635,6 @@ const docTemplate = `{ "command": { "type": "string" }, - "process": { - "type": "string" - }, "sync": { "type": "boolean" } diff --git a/docs/swagger.json b/docs/swagger.json index 8331871..302e6ab 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -363,17 +363,6 @@ "wait": {} } }, - "ProcessCommand": { - "type": "object", - "properties": { - "commands": { - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/CommandInstructionSet" - } - } - } - }, "ProcessMonitorMetrics": { "type": "object", "properties": { @@ -483,6 +472,18 @@ "description": "don't make this a semver, it's not allways", "type": "string" }, + "commands": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/CommandInstructionSet" + } + }, + "cronjobs": { + "type": "array", + "items": { + "$ref": "#/definitions/domain.Cronjob" + } + }, "desc": { "type": "string" }, @@ -501,12 +502,6 @@ } } }, - "processes": { - "type": "object", - "additionalProperties": { - "$ref": "#/definitions/ProcessCommand" - } - }, "version": { "type": "string" } @@ -554,6 +549,20 @@ "ConsoleTypePlugin" ] }, + "domain.Cronjob": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "name": { + "type": "string" + }, + "schedule": { + "type": "string" + } + } + }, "domain.Process": { "type": "object", "properties": { @@ -615,9 +624,6 @@ "command": { "type": "string" }, - "process": { - "type": "string" - }, "sync": { "type": "boolean" } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 4a467c5..07e795e 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -40,13 +40,6 @@ definitions: type: string wait: {} type: object - ProcessCommand: - properties: - commands: - additionalProperties: - $ref: '#/definitions/CommandInstructionSet' - type: object - type: object ProcessMonitorMetrics: properties: connections: @@ -119,6 +112,14 @@ definitions: app_version: description: don't make this a semver, it's not allways type: string + commands: + additionalProperties: + $ref: '#/definitions/CommandInstructionSet' + type: object + cronjobs: + items: + $ref: '#/definitions/domain.Cronjob' + type: array desc: type: string init: @@ -131,10 +132,6 @@ definitions: type: string type: object type: object - processes: - additionalProperties: - $ref: '#/definitions/ProcessCommand' - type: object version: type: string type: object @@ -167,6 +164,15 @@ definitions: - ConsoleTypeTTY - ConsoleTypeProcess - ConsoleTypePlugin + domain.Cronjob: + properties: + command: + type: string + name: + type: string + schedule: + type: string + type: object domain.Process: properties: name: @@ -208,8 +214,6 @@ definitions: properties: command: type: string - process: - type: string sync: type: boolean type: object diff --git a/examples/htop/.scroll/scroll-lock.json b/examples/htop/.scroll/scroll-lock.json index 1591fc2..586c8c3 100755 --- a/examples/htop/.scroll/scroll-lock.json +++ b/examples/htop/.scroll/scroll-lock.json @@ -1 +1 @@ -{"statuses":{"main.start":"running"},"scroll_version":"0.0.1","scroll_name":"registry-1.docker.io/highcard/scroll-htop"} \ No newline at end of file +{"statuses":{},"scroll_version":"0.0.1","scroll_name":"registry-1.docker.io/highcard/scroll-htop"} \ No newline at end of file diff --git a/examples/htop/.scroll/scroll.yaml b/examples/htop/.scroll/scroll.yaml index 4f05487..c21c9d4 100644 --- a/examples/htop/.scroll/scroll.yaml +++ b/examples/htop/.scroll/scroll.yaml @@ -2,13 +2,11 @@ name: registry-1.docker.io/highcard/scroll-htop desc: Nginx server version: 0.0.1 app_version: latest -init: "main.start" -processes: - main: - commands: - start: - run: restart - procedures: - - mode: exec-tty - data: - - htop +init: start" +commands: + start: + run: restart + procedures: + - mode: exec-tty + data: + - htop diff --git a/examples/minecraft/.scroll/scroll-lock.json b/examples/minecraft/.scroll/scroll-lock.json index ae97a52..05a36fd 100755 --- a/examples/minecraft/.scroll/scroll-lock.json +++ b/examples/minecraft/.scroll/scroll-lock.json @@ -1 +1 @@ -{"statuses":{"main.install":"done","main.start":"running"},"scroll_version":"0.0.1","scroll_name":"registry-1.docker.io/highcard/scroll-minecraft-spigot"} \ No newline at end of file +{"statuses":{"install":"done","start":"running"},"scroll_version":"0.0.1","scroll_name":"registry-1.docker.io/highcard/scroll-minecraft-spigot"} \ No newline at end of file diff --git a/examples/minecraft/.scroll/scroll.yaml b/examples/minecraft/.scroll/scroll.yaml index 7c0fcdb..c99996c 100644 --- a/examples/minecraft/.scroll/scroll.yaml +++ b/examples/minecraft/.scroll/scroll.yaml @@ -2,39 +2,45 @@ name: registry-1.docker.io/highcard/scroll-minecraft-spigot desc: Minecraft Spigot version: 0.0.1 app_version: 1.20.4 -init: "main.start" -processes: - main: - commands: - start: - needs: [main.install] - run: restart - procedures: - - mode: exec - data: - - java - - -Xmx1024M - - -Xms1024M - - -jar - - spigot.jar - - nogui - stop: - procedures: - - mode: stdin - data: - - main.start - - stop - install: - run: once - procedures: - - mode: exec - data: - - wget - - -O - - spigot.jar - - https://s3.eu-central-1.wasabisys.com/druid-scroll-artifacts/minecraft/spigot/spigot-1.20.4.jar - - mode: exec - data: - - bash - - -c - - echo eula=true > eula.txt +init: "start" +commands: + start: + needs: [install] + run: restart + procedures: + - mode: exec + data: + - java + - -Xmx1024M + - -Xms1024M + - -jar + - spigot.jar + - nogui + stop: + procedures: + - mode: stdin + data: + - start + - stop + install: + run: once + procedures: + - mode: exec + data: + - wget + - -O + - spigot.jar + - https://s3.eu-central-1.wasabisys.com/druid-scroll-artifacts/minecraft/spigot/spigot-1.20.4.jar + - mode: exec + data: + - bash + - -c + - echo eula=true > eula.txt + restart: + procedures: + - mode: command + data: + - stop + - mode: command + data: + - start diff --git a/examples/nginx/.scroll/scroll-lock.json b/examples/nginx/.scroll/scroll-lock.json index eee857b..59adcbf 100755 --- a/examples/nginx/.scroll/scroll-lock.json +++ b/examples/nginx/.scroll/scroll-lock.json @@ -1 +1,5 @@ -{"statuses":{"main.start":"running"},"scroll_version":"0.0.1","scroll_name":"registry-1.docker.io/highcard/scroll-nginx"} \ No newline at end of file +{ + "statuses": { "start": "running" }, + "scroll_version": "0.0.1", + "scroll_name": "registry-1.docker.io/highcard/scroll-nginx" +} diff --git a/examples/nginx/.scroll/scroll.yaml b/examples/nginx/.scroll/scroll.yaml index aa38074..0d9ad7c 100644 --- a/examples/nginx/.scroll/scroll.yaml +++ b/examples/nginx/.scroll/scroll.yaml @@ -2,20 +2,18 @@ name: registry-1.docker.io/highcard/scroll-nginx desc: Nginx server version: 0.0.1 app_version: latest -init: "main.start" -processes: - main: - commands: - start: - run: restart - procedures: - - mode: exec - data: - - nginx - stop: - procedures: - - mode: exec - data: - - nginx - - -s - - stop +init: "start" +commands: + start: + run: restart + procedures: + - mode: exec + data: + - nginx + stop: + procedures: + - mode: exec + data: + - nginx + - -s + - stop diff --git a/internal/core/domain/scroll.go b/internal/core/domain/scroll.go index 41ed931..e02b62f 100644 --- a/internal/core/domain/scroll.go +++ b/internal/core/domain/scroll.go @@ -26,14 +26,14 @@ type Cronjob struct { } type File struct { - Name string `yaml:"name" json:"name"` - Desc string `yaml:"desc" json:"desc"` - Version *semver.Version `yaml:"version" json:"version"` - AppVersion string `yaml:"app_version" json:"app_version"` //don't make this a semver, it's not allways - Init string `yaml:"init" json:"init"` - Processes map[string]*ProcessCommand `yaml:"processes" json:"processes"` - Plugins map[string]map[string]string `yaml:"plugins" json:"plugins"` - Cronjobs []*Cronjob `yaml:"cronjobs" json:"cronjobs"` + Name string `yaml:"name" json:"name"` + Desc string `yaml:"desc" json:"desc"` + Version *semver.Version `yaml:"version" json:"version"` + AppVersion string `yaml:"app_version" json:"app_version"` //don't make this a semver, it's not allways + Init string `yaml:"init" json:"init"` + Commands map[string]*CommandInstructionSet `yaml:"commands" json:"commands"` + Plugins map[string]map[string]string `yaml:"plugins" json:"plugins"` + Cronjobs []*Cronjob `yaml:"cronjobs" json:"cronjobs"` } // @name ScrollFile type Scroll struct { @@ -60,10 +60,6 @@ type CommandInstructionSet struct { Run RunMode `yaml:"run,omitempty" json:"run,omitempty"` } // @name CommandInstructionSet -type ProcessCommand struct { - Commands map[string]CommandInstructionSet `yaml:"commands" json:"commands"` -} // @name ProcessCommand - var ErrScrollDoesNotExist = fmt.Errorf("scroll does not exist") func NewScroll(scrollDir string) (*Scroll, error) { @@ -103,3 +99,25 @@ func (sc *Scroll) ParseFile(file []byte) (*Scroll, error) { sc.File = f return sc, nil } + +func (sc *Scroll) Validate() error { + if sc.Name == "" { + return fmt.Errorf("scroll name is required") + } + if sc.Desc == "" { + return fmt.Errorf("scroll description is required") + } + if sc.Version == nil { + return fmt.Errorf("scroll version is required") + } + if sc.AppVersion == "" { + return fmt.Errorf("scroll app_version is required") + } + if sc.Init == "" { + return fmt.Errorf("scroll init is required") + } + if len(sc.Commands) == 0 { + return fmt.Errorf("scroll commands are required") + } + return nil +} diff --git a/internal/core/domain/scroll_lock.go b/internal/core/domain/scroll_lock.go index d67faad..2da0c8a 100644 --- a/internal/core/domain/scroll_lock.go +++ b/internal/core/domain/scroll_lock.go @@ -53,11 +53,11 @@ func WriteNewScrollLock(path string) *ScrollLock { return lock } -func (scrollLock *ScrollLock) GetStatus(process string, command string) ScrollLockStatus { - return scrollLock.Statuses[process+"."+command] +func (scrollLock *ScrollLock) GetStatus(command string) ScrollLockStatus { + return scrollLock.Statuses[command] } -func (scrollLock *ScrollLock) SetStatus(process string, command string, status ScrollLockStatus) { - scrollLock.Statuses[process+"."+command] = status +func (scrollLock *ScrollLock) SetStatus(command string, status ScrollLockStatus) { + scrollLock.Statuses[command] = status scrollLock.Write() } diff --git a/internal/core/ports/services_ports.go b/internal/core/ports/services_ports.go index 4df9e3e..915f700 100644 --- a/internal/core/ports/services_ports.go +++ b/internal/core/ports/services_ports.go @@ -25,12 +25,13 @@ type ScrollServiceInterface interface { GetCwd() string WriteNewScrollLock() *domain.ScrollLock GetLock() (*domain.ScrollLock, error) - GetCommand(cmd string, processId string) (*domain.CommandInstructionSet, error) + GetCommand(cmd string) (*domain.CommandInstructionSet, error) } type ProcedureLauchnerInterface interface { - RunNew(commandId string, processId string, changeStatus bool) error - RunProcedure(*domain.Procedure, string, string) (string, *int, error) + LaunchPlugins() error + RunProcedure(*domain.Procedure, string) (string, *int, error) + Run(cmd string, runCommandCb func(cmd string) error) error } type PluginManagerInterface interface { @@ -48,9 +49,9 @@ type LogManagerInterface interface { type ProcessManagerInterface interface { GetRunningProcesses() map[string]*domain.Process - GetRunningProcess(process string, commandName string) *domain.Process - Run(process string, commandName string, command []string, dir string) (*int, error) - RunTty(process string, comandName string, command []string, dir string) (*int, error) + GetRunningProcess(commandName string) *domain.Process + Run(commandName string, command []string, dir string) (*int, error) + RunTty(comandName string, command []string, dir string) (*int, error) WriteStdin(process *domain.Process, data string) error } @@ -94,3 +95,7 @@ type OciRegistryInterface interface { type CronManagerInterface interface { Init() } + +type QueueManagerInterface interface { + AddItem(cmd string, changeStatus bool) error +} diff --git a/internal/core/services/cron_manager.go b/internal/core/services/cron_manager.go index 6f000bc..50fb1f4 100644 --- a/internal/core/services/cron_manager.go +++ b/internal/core/services/cron_manager.go @@ -1,7 +1,6 @@ package services import ( - "strings" "time" "github.com/go-co-op/gocron" @@ -12,14 +11,14 @@ import ( ) type CronManager struct { - crons []*domain.Cronjob - processLauncher ports.ProcedureLauchnerInterface + crons []*domain.Cronjob + queueManager ports.QueueManagerInterface } -func NewCronManager(cronjobs []*domain.Cronjob, processLauncher ports.ProcedureLauchnerInterface) *CronManager { +func NewCronManager(cronjobs []*domain.Cronjob, queueManager ports.QueueManagerInterface) *CronManager { return &CronManager{ - crons: cronjobs, - processLauncher: processLauncher, + crons: cronjobs, + queueManager: queueManager, } } @@ -30,13 +29,9 @@ func (c *CronManager) Init() error { _, err := scheduler.Cron(cron.Schedule).Do(func() { logger.Log().Info("Cronjob started", zap.String("name", cron.Name)) - //parse cron.Command e.g. main.start - //split by dot - parts := strings.Split(cron.Command, ".") - process := parts[0] - command := parts[1] + //run cron.Command e.g. main.start - err := c.processLauncher.RunNew(command, process, false) + err := c.queueManager.AddItem(cron.Command, false) if err != nil { logger.Log().Error("error running cronjob", zap.String("name", cron.Name), zap.Error(err)) diff --git a/internal/core/services/process_launcher.go b/internal/core/services/process_launcher.go index 8e58dde..cb185fd 100644 --- a/internal/core/services/process_launcher.go +++ b/internal/core/services/process_launcher.go @@ -3,8 +3,6 @@ package services import ( "errors" "fmt" - "strings" - "sync" "time" "github.com/highcard-dev/daemon/internal/core/domain" @@ -21,8 +19,6 @@ type ProcedureLauncher struct { consoleManager ports.ConsoleManagerInterface logManager ports.LogManagerInterface scrollService ports.ScrollServiceInterface - commandQueue map[string]domain.ScrollLockStatus - mu sync.Mutex } func NewProcedureLauncher( @@ -40,141 +36,11 @@ func NewProcedureLauncher( consoleManager: consoleManager, logManager: logManager, scrollService: scrollService, - commandQueue: make(map[string]domain.ScrollLockStatus), } return s } -func (sc *ProcedureLauncher) SetCommandQueue(commandName string, status domain.ScrollLockStatus) { - sc.mu.Lock() - defer sc.mu.Unlock() - sc.commandQueue[commandName] = status -} - -func (sc *ProcedureLauncher) RunNew(cmd string, processId string, changeStatus bool) error { - - logger.Log().Debug("Running command", - zap.String("processId", processId), - zap.String("cmd", cmd), - zap.Bool("changeStatus", changeStatus), - ) - - command, err := sc.scrollService.GetCommand(cmd, processId) - - if err != nil { - return err - } - - name := processId + "." + cmd - - if value, ok := sc.commandQueue[name]; ok { - if value != domain.ScrollLockStatusDone { - return errors.New("command already in queue") - } - } - - sc.SetCommandQueue(name, domain.ScrollLockStatusWaiting) - - needs := command.Needs - - lock, err := sc.scrollService.GetLock() - if err != nil { - return err - } - - if changeStatus { - lock.SetStatus(processId, cmd, domain.ScrollLockStatusWaiting) - } - - status := lock.GetStatus(processId, cmd) - - //Functions that run once, should be remembered, but should only have waiting status, when the are called explicitly - if command.Run == domain.RunModeOnce { - changeStatus = true - } - - //if done and should be done once, skip - if status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce { - sc.SetCommandQueue(name, domain.ScrollLockStatusDone) - return nil - } - - var wg sync.WaitGroup - var runError error - //check if needs are met, if not, start them, when not running, when running, wait - for _, need := range needs { - //check if is done in lockfile - process, command := utils.ParseProcessAndCommand(need) - if process == "" { - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - return errors.New("invalid need " + need) - } - - // else start - wg.Add(1) - go func(process string, command string) { - defer wg.Done() - - need, err := sc.scrollService.GetCommand(command, process) - - if err != nil { - runError = err - logger.Log().Error("Error getting need", - zap.String("process", process), - zap.String("command", command), - zap.Error(err), - ) - - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - return - } - - if need.Run == domain.RunModeRestart { - runError = errors.New("cannot have a need that is restart") - logger.Log().Error("Error getting need", - zap.String("process", process), - zap.String("command", command), - zap.Error(runError), - ) - - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - return - } - - //Don't change status, for subtasks. Either it's the main task or a remebered subtask - err = sc.RunNew(command, process, false) - if err != nil { - runError = err - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - } - }(process, command) - } - wg.Wait() - - sc.SetCommandQueue(name, domain.ScrollLockStatusRunning) - - if runError != nil { - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - return runError - } - - err = sc.Run(cmd, processId, changeStatus) - if err != nil { - sc.SetCommandQueue(name, domain.ScrollLockStatusError) - return err - } - - sc.SetCommandQueue(name, domain.ScrollLockStatusDone) - - return nil - -} - -// return at first -// TODO implement multiple scroll support -// To do this, best is to loop over activescrolldir and read every scroll -// TODO: remove initCommandsIdentifiers func (sc *ProcedureLauncher) LaunchPlugins() error { go func() { for { @@ -199,26 +65,27 @@ func (sc *ProcedureLauncher) LaunchPlugins() error { return sc.pluginManager.ParseFromScroll(scroll.Plugins, string(sc.scrollService.GetScrollConfigRawYaml()), sc.scrollService.GetCwd()) } -func (sc *ProcedureLauncher) Run(cmd string, processId string, changeStatus bool) error { - - command, err := sc.scrollService.GetCommand(cmd, processId) - if err != nil { - return err - } +// I am unsure if we should support he command mode in the future as it is an antipattern for the scroll architecture, we try to solve stuff with dependencies +func (sc *ProcedureLauncher) Run(cmd string, runCommandCb func(cmd string) error) error { - lock, err := sc.scrollService.GetLock() + command, err := sc.scrollService.GetCommand(cmd) if err != nil { return err } - if changeStatus { - lock.SetStatus(processId, cmd, domain.ScrollLockStatusRunning) - } for _, proc := range command.Procedures { + + if proc.Mode == "command" { + if proc.Wait != nil { + return errors.New("command mode does not support wait") + } + err = runCommandCb(proc.Data.(string)) + return err + } + var err error var exitCode *int logger.Log().Debug("Running procedure", - zap.String("processId", processId), zap.String("cmd", cmd), zap.String("mode", proc.Mode), zap.Any("data", proc.Data), @@ -227,38 +94,30 @@ func (sc *ProcedureLauncher) Run(cmd string, processId string, changeStatus bool case int: //run in go routine and wait for x seconds go func(procedure domain.Procedure) { time.Sleep(time.Duration(wait) * time.Second) - sc.RunProcedure(&procedure, processId, cmd) + sc.RunProcedure(&procedure, cmd) }(*proc) case bool: //run in go routine maybe wait if wait { - _, exitCode, err = sc.RunProcedure(proc, processId, cmd) + _, exitCode, err = sc.RunProcedure(proc, cmd) } else { - go sc.RunProcedure(proc, processId, cmd) + go sc.RunProcedure(proc, cmd) } default: //run and wait - _, exitCode, err = sc.RunProcedure(proc, processId, cmd) + _, exitCode, err = sc.RunProcedure(proc, cmd) } if err != nil { logger.Log().Error("Error running procedure", - zap.String("processId", processId), zap.String("cmd", cmd), zap.Error(err)) - if changeStatus { - lock.SetStatus(processId, cmd, domain.ScrollLockStatusError) - } return err } if exitCode != nil && *exitCode != 0 { logger.Log().Error("Procedure ended with exit code "+fmt.Sprintf("%d", *exitCode), - zap.String("processId", processId), zap.String("cmd", cmd), zap.Int("exitCode", *exitCode), ) - if changeStatus { - lock.SetStatus(processId, cmd, domain.ScrollLockStatus(fmt.Sprintf("exit_code_%d", *exitCode))) - } return fmt.Errorf("procedure %s failed with exit code %d", proc.Mode, *exitCode) } @@ -269,18 +128,12 @@ func (sc *ProcedureLauncher) Run(cmd string, processId string, changeStatus bool } } - //restart means we are never done! - if changeStatus && command.Run != domain.RunModeRestart { - lock.SetStatus(processId, cmd, domain.ScrollLockStatusDone) - } - return nil } -func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, processId string, cmd string) (string, *int, error) { +func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, cmd string) (string, *int, error) { logger.Log().Debug("Running procedure", - zap.String("processId", processId), zap.String("cmd", cmd), zap.String("mode", proc.Mode), zap.Any("data", proc.Data), @@ -314,7 +167,6 @@ func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, processId stri } logger.Log().Debug("Running exec process", - zap.String("processId", processId), zap.String("cwd", processCwd), zap.Strings("instructions", instructions), ) @@ -322,9 +174,9 @@ func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, processId stri var exitCode *int if proc.Mode == "exec-tty" { - exitCode, err = sc.processManager.RunTty(processId, cmd, instructions, processCwd) + exitCode, err = sc.processManager.RunTty(cmd, instructions, processCwd) } else { - exitCode, err = sc.processManager.Run(processId, cmd, instructions, processCwd) + exitCode, err = sc.processManager.Run(cmd, instructions, processCwd) } return "", exitCode, err case "stdin": @@ -337,37 +189,23 @@ func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, processId stri if len(instructions) != 2 { return "", nil, errors.New("invalid stdin instructions") } - processAndCommandToWriteTo := instructions[0] + commandToWriteTo := instructions[0] stdtIn := instructions[1] - process1, command1 := utils.ParseProcessAndCommand(processAndCommandToWriteTo) logger.Log().Debug("Launching stdin process", - zap.String("processId", processId), zap.String("cwd", processCwd), zap.Strings("instructions", instructions), ) - process := sc.processManager.GetRunningProcess(process1, command1) + process := sc.processManager.GetRunningProcess(commandToWriteTo) if process == nil { return "", nil, errors.New("process not found") } sc.processManager.WriteStdin(process, stdtIn) - case "command": - - logger.Log().Debug("Launching stdin process", - zap.String("processId", processId), - zap.String("cwd", processCwd), - zap.String("instructions", proc.Data.(string)), - ) - - err := sc.RunNew(proc.Data.(string), processId, false) - return "", nil, err - case "scroll-switch": logger.Log().Debug("Launching scroll-switch process", - zap.String("processId", processId), zap.String("cwd", processCwd), zap.String("instructions", proc.Data.(string)), ) @@ -379,39 +217,3 @@ func (sc *ProcedureLauncher) RunProcedure(proc *domain.Procedure, processId stri } return "", nil, nil } - -func (sc *ProcedureLauncher) StartLockfile(lock *domain.ScrollLock) error { - - for processAndCommand, status := range lock.Statuses { - if status != domain.ScrollLockStatusRunning && status != domain.ScrollLockStatusWaiting && !strings.HasPrefix(string(status), "exit_code") { - logger.Log().Debug("Skipping process", zap.String("processAndCommand", processAndCommand), zap.String("status", string(status))) - continue - } - logger.Log().Info("Starting process from lockfile", zap.String("processAndCommand", processAndCommand), zap.String("status", string(status))) - process, command := utils.ParseProcessAndCommand(processAndCommand) - go sc.RunNew(command, process, true) - } - return nil -} - -func (sc *ProcedureLauncher) Initalize(lock *domain.ScrollLock) error { - scroll := sc.scrollService.GetCurrent() - - parts := strings.Split(scroll.Init, ".") - - if _, ok := lock.Statuses[scroll.Init]; ok { - //allready initialized - return nil - } - - if len(parts) != 2 { - return errors.New("invalid init command") - } - initCommands := scroll.Processes[parts[0]].Commands[parts[1]].Procedures - - if len(initCommands) > 0 { - go sc.RunNew(parts[1], parts[0], true) - lock.Write() - } - return nil -} diff --git a/internal/core/services/process_manager.go b/internal/core/services/process_manager.go index 7580bfc..25b3eac 100644 --- a/internal/core/services/process_manager.go +++ b/internal/core/services/process_manager.go @@ -30,10 +30,10 @@ func NewProcessManager(logManager ports.LogManagerInterface, consoleManager port } } -func (po *ProcessManager) RunTty(processName string, commandName string, command []string, cwd string) (*int, error) { +func (po *ProcessManager) RunTty(commandName string, command []string, cwd string) (*int, error) { process := domain.Process{ - Name: processName, + Name: commandName, Type: "process_tty", } @@ -52,7 +52,7 @@ func (po *ProcessManager) RunTty(processName string, commandName string, command process.Cmd = exec.Command(name, args...) process.Cmd.Dir = cwd - logger.Log().Info("Starting tty process", zap.String("processName", processName), zap.String("name", name), zap.Strings("args", args), zap.String("dir", cwd)) + logger.Log().Info("Starting tty process", zap.String("commandName", commandName), zap.String("name", name), zap.Strings("args", args), zap.String("dir", cwd)) out, err := pty.Start(process.Cmd) if err != nil { @@ -62,33 +62,34 @@ func (po *ProcessManager) RunTty(processName string, commandName string, command process.StdIn = out //self register process - po.AddRunningProcess(processName, commandName, &process) + po.AddRunningProcess(commandName, &process) - processCommandName := fmt.Sprintf("%s.%s", processName, commandName) //add process for monitoring - po.processMonitor.AddProcess(int32(process.Cmd.Process.Pid), processCommandName) + po.processMonitor.AddProcess(int32(process.Cmd.Process.Pid), commandName) //slight difference to normal process, as we only attach after the process has started //add console output - processName = fmt.Sprintf("process_tty.%s.%s", processName, commandName) + prefixedProcessCommandName := fmt.Sprintf("process_tty.%s", commandName) - po.consoleManager.AddConsoleWithIoReader(processName, domain.ConsoleTypeTTY, "stdin", out) + po.consoleManager.AddConsoleWithIoReader(prefixedProcessCommandName, domain.ConsoleTypeTTY, "stdin", out) //reset periodically process.Cmd.Wait() - po.processMonitor.RemoveProcess(processCommandName) - po.RemoveProcess(processName, commandName) + po.processMonitor.RemoveProcess(commandName) + po.RemoveProcess(commandName) + // Wait for goroutine to print everything (watchdog closes stdin) exitCode := process.Cmd.ProcessState.ExitCode() + po.consoleManager.MarkExited(prefixedProcessCommandName, exitCode) return &exitCode, nil } -func (po *ProcessManager) Run(processName string, commandName string, command []string, dir string) (*int, error) { +func (po *ProcessManager) Run(commandName string, command []string, dir string) (*int, error) { process := domain.Process{ - Name: processName, + Name: commandName, Type: "process", } //Todo, add processmonitoring explicitly here @@ -102,7 +103,7 @@ func (po *ProcessManager) Run(processName string, commandName string, command [] name, args := command[0], command[1:] logger.Log().Debug("Launch", - zap.String("processName", processName), + zap.String("commandName", commandName), zap.String("name", name), zap.Strings("args", args), zap.String("dir", dir), @@ -132,7 +133,7 @@ func (po *ProcessManager) Run(processName string, commandName string, command [] process.StdIn = stdin - prefixedProcessCommandName := fmt.Sprintf("process.%s.%s", processName, commandName) + prefixedProcessCommandName := fmt.Sprintf("process.%s", commandName) po.consoleManager.AddConsoleWithIoReader(prefixedProcessCommandName, domain.ConsoleTypeProcess, "stdin", stdoutReader) po.consoleManager.AddConsoleWithIoReader(prefixedProcessCommandName, domain.ConsoleTypeProcess, "stdin", stderrReader) @@ -147,11 +148,10 @@ func (po *ProcessManager) Run(processName string, commandName string, command [] } //self register process - po.AddRunningProcess(processName, commandName, &process) + po.AddRunningProcess(commandName, &process) - processCommandName := fmt.Sprintf("%s.%s", processName, commandName) //add process for monitoring - po.processMonitor.AddProcess(int32(process.Cmd.Process.Pid), processCommandName) + po.processMonitor.AddProcess(int32(process.Cmd.Process.Pid), commandName) //add console output @@ -169,8 +169,8 @@ func (po *ProcessManager) Run(processName string, commandName string, command [] <-cmdCtx.Done() - po.processMonitor.RemoveProcess(processCommandName) - po.RemoveProcess(processName, commandName) + po.processMonitor.RemoveProcess(commandName) + po.RemoveProcess(commandName) // Wait for goroutine to print everything (watchdog closes stdin) exitCode := process.Cmd.ProcessState.ExitCode() po.consoleManager.MarkExited(prefixedProcessCommandName, exitCode) @@ -201,20 +201,17 @@ func (pm *ProcessManager) GetRunningProcesses() map[string]*domain.Process { return pm.runningProcesses } -func (pm *ProcessManager) AddRunningProcess(processName string, commandName string, process *domain.Process) { - name := fmt.Sprintf("%s.%s", processName, commandName) - pm.runningProcesses[name] = process +func (pm *ProcessManager) AddRunningProcess(commandName string, process *domain.Process) { + pm.runningProcesses[commandName] = process } -func (pm *ProcessManager) GetRunningProcess(processName string, commandName string) *domain.Process { - name := fmt.Sprintf("%s.%s", processName, commandName) - if process, ok := pm.GetRunningProcesses()[name]; ok { +func (pm *ProcessManager) GetRunningProcess(commandName string) *domain.Process { + if process, ok := pm.GetRunningProcesses()[commandName]; ok { return process } return nil } -func (pm *ProcessManager) RemoveProcess(processName string, commandName string) { - name := fmt.Sprintf("%s.%s", processName, commandName) - delete(pm.runningProcesses, name) +func (pm *ProcessManager) RemoveProcess(commandName string) { + delete(pm.runningProcesses, commandName) } diff --git a/internal/core/services/queue_manager.go b/internal/core/services/queue_manager.go new file mode 100644 index 0000000..9642796 --- /dev/null +++ b/internal/core/services/queue_manager.go @@ -0,0 +1,309 @@ +package services + +import ( + "fmt" + "strings" + "sync" + + "github.com/highcard-dev/daemon/internal/core/domain" + "github.com/highcard-dev/daemon/internal/core/ports" + "github.com/highcard-dev/daemon/internal/utils/logger" + "go.uber.org/zap" +) + +var ErrAlreadyInQueue = fmt.Errorf("command is already in queue") +var ErrCommandNotFound = fmt.Errorf("command not found") +var ErrCommandDoneOnce = fmt.Errorf("command is already done and has run mode once") + +type QueueItem struct { + status domain.ScrollLockStatus + changeStatus bool +} + +type QueueManager struct { + mu sync.Mutex + runQueueMu sync.Mutex + scrollService ports.ScrollServiceInterface + processLauncher ports.ProcedureLauchnerInterface + commandQueue map[string]*QueueItem + taskChan chan string + taskDoneChan chan struct{} + shutdownChan chan struct{} + notifierChan []chan []string +} + +func NewQueueManager( + scrollService ports.ScrollServiceInterface, + processLauncher ports.ProcedureLauchnerInterface, +) *QueueManager { + return &QueueManager{ + scrollService: scrollService, + processLauncher: processLauncher, + commandQueue: make(map[string]*QueueItem), + taskChan: make(chan string), + taskDoneChan: make(chan struct{}), + shutdownChan: make(chan struct{}), + notifierChan: make([]chan []string, 0), + } +} + +func (sc *QueueManager) workItem(cmd string) error { + + queueItem := sc.GetQueueItem(cmd) + if queueItem == nil { + return fmt.Errorf("command %s not found", cmd) + } + changeStatus := queueItem.changeStatus + + logger.Log().Debug("Running command", + zap.String("cmd", cmd), + zap.Bool("changeStatus", changeStatus), + ) + + return sc.processLauncher.Run(cmd, func(cmd string) error { + return sc.AddItem(cmd, changeStatus) + }) +} + +func (sc *QueueManager) notify() { + queuedCommands := make([]string, 0) + + for cmd, _ := range sc.commandQueue { + if sc.getStatus(cmd) != domain.ScrollLockStatusDone && sc.getStatus(cmd) != domain.ScrollLockStatusError { + queuedCommands = append(queuedCommands, cmd) + } + } + + for _, notifier := range sc.notifierChan { + select { + case notifier <- queuedCommands: + // Successfully sent queuedCommands to the notifier channel + default: + // The notifier channel is not ready to receive, handle accordingly + // For example, log a warning or skip this notifier + } + } +} + +func (sc *QueueManager) AddItem(cmd string, changeStatus bool) error { + sc.mu.Lock() + defer sc.mu.Unlock() + + logger.Log().Debug("Running command", + zap.String("cmd", cmd), + ) + + command, err := sc.scrollService.GetCommand(cmd) + + if err != nil { + return err + } + + //Functions that run once, should be remembered, but should only have waiting status, when the are called explicitly + if command.Run == domain.RunModeOnce { + changeStatus = true + } + + if value, ok := sc.commandQueue[cmd]; ok { + + if value.status != domain.ScrollLockStatusDone && value.status != domain.ScrollLockStatusError { + return ErrAlreadyInQueue + } + + if value.status == domain.ScrollLockStatusDone && command.Run == domain.RunModeOnce { + return ErrCommandDoneOnce + } + } + + sc.commandQueue[cmd] = &QueueItem{ + status: domain.ScrollLockStatusWaiting, + changeStatus: changeStatus, + } + sc.taskChan <- cmd + + return nil +} + +func (sc *QueueManager) QueueLockFile() error { + lock, err := sc.scrollService.GetLock() + + if err != nil { + return err + } + for cmd, status := range lock.Statuses { + + //convert legacy command names + _, err := sc.scrollService.GetCommand(cmd) + if err != nil { + + parts := strings.Split(cmd, ".") + if len(parts) > 1 { + cmd = parts[1] + } else { + return err + } + + _, err = sc.scrollService.GetCommand(cmd) + if err != nil { + return err + } + } + + sc.commandQueue[cmd] = &QueueItem{ + status: status, + changeStatus: true, + } + } + + return nil +} + +func (sc *QueueManager) Work() { + + for { + select { + case <-sc.taskChan: + go (func() { + sc.RunQueue() + sc.notify() + })() + case <-sc.taskDoneChan: + go (func() { + sc.RunQueue() + sc.notify() + })() + case <-sc.shutdownChan: + //todo cleanup + return + } + } +} + +func (sc *QueueManager) RunQueue() { + sc.runQueueMu.Lock() + defer sc.runQueueMu.Unlock() + + for cmd, item := range sc.commandQueue { + + //if already running, skip + if sc.getStatus(cmd) == domain.ScrollLockStatusRunning { + continue + } + + command, err := sc.scrollService.GetCommand(cmd) + if err != nil { + logger.Log().Error("Error getting command", + zap.String("command", cmd), + zap.Error(err), + ) + continue + } + + //if run Mode is restart, we need to run it again + if (sc.getStatus(cmd) == domain.ScrollLockStatusError || sc.getStatus(cmd) == domain.ScrollLockStatusDone) && command.Run != domain.RunModeRestart { + continue + } + + dependencies := command.Needs + dependenciesReady := true + for _, dep := range dependencies { + _, ok := sc.commandQueue[dep] + //if item not in queue, add it and + if !ok { + dependenciesReady = false + sc.AddItem(dep, item.changeStatus) + continue + } + + if sc.getStatus(dep) != domain.ScrollLockStatusDone { + dependenciesReady = false + continue + } + } + + if dependenciesReady { + //we only run one process at a time, this is not optimal, but it is simple + sc.setStatus(cmd, domain.ScrollLockStatusRunning, item.changeStatus) + go func(c string, i *QueueItem) { + + err := sc.workItem(c) + if err != nil { + sc.setStatus(c, domain.ScrollLockStatusError, i.changeStatus) + logger.Log().Error("Error running command", zap.String("command", c), zap.Error(err)) + sc.taskDoneChan <- struct{}{} + return + } + + //restart means we are never done! + if i.changeStatus && command.Run != domain.RunModeRestart { + sc.setStatus(c, domain.ScrollLockStatusDone, true) + } else { + if command.Run == domain.RunModeRestart { + sc.setStatus(c, domain.ScrollLockStatusWaiting, false) + } else { + sc.setStatus(c, domain.ScrollLockStatusDone, false) + } + } + sc.taskDoneChan <- struct{}{} + }(cmd, item) + } + } +} + +func (sc *QueueManager) Shutdown() { + sc.shutdownChan <- struct{}{} +} + +func (sc *QueueManager) WaitUntilEmpty() { + notifier := make(chan []string) + sc.notifierChan = append(sc.notifierChan, notifier) + + for { + cmds := <-notifier + if len(cmds) == 0 { + // remove notifier + for i, n := range sc.notifierChan { + if n == notifier { + sc.notifierChan = append(sc.notifierChan[:i], sc.notifierChan[i+1:]...) + break + } + } + return + } + } +} + +func (sc *QueueManager) GetQueueItem(cmd string) *QueueItem { + sc.mu.Lock() + defer sc.mu.Unlock() + + if value, ok := sc.commandQueue[cmd]; ok { + return value + } + + return nil +} + +func (sc *QueueManager) getStatus(cmd string) domain.ScrollLockStatus { + sc.mu.Lock() + defer sc.mu.Unlock() + if value, ok := sc.commandQueue[cmd]; ok { + return value.status + } + return domain.ScrollLockStatusDone +} + +func (sc *QueueManager) setStatus(cmd string, status domain.ScrollLockStatus, writeLock bool) { + sc.mu.Lock() + defer sc.mu.Unlock() + if value, ok := sc.commandQueue[cmd]; ok { + value.status = status + } + if writeLock { + lock, err := sc.scrollService.GetLock() + if err != nil { + return + } + lock.SetStatus(cmd, status) + } +} diff --git a/internal/core/services/scroll_service.go b/internal/core/services/scroll_service.go index ad68490..71b796a 100644 --- a/internal/core/services/scroll_service.go +++ b/internal/core/services/scroll_service.go @@ -259,8 +259,7 @@ func (sc *ScrollService) clearInvalidLockfileStatuses() error { } for statusCommand := range sc.lock.Statuses { - process, command := utils.ParseProcessAndCommand(statusCommand) - _, err := sc.GetCommand(command, process) + _, err := sc.GetCommand(statusCommand) if err != nil { delete(sc.lock.Statuses, statusCommand) logger.Log().Info("Removed invalid status from lockfile", zap.String("status", statusCommand)) @@ -269,16 +268,12 @@ func (sc *ScrollService) clearInvalidLockfileStatuses() error { return sc.lock.Write() } -func (sc *ScrollService) GetCommand(cmd string, processId string) (*domain.CommandInstructionSet, error) { +func (sc *ScrollService) GetCommand(cmd string) (*domain.CommandInstructionSet, error) { scroll := sc.GetFile() //check if we can accually do it before we start - if ps, ok := scroll.Processes[processId]; ok { - cmds, ok := ps.Commands[cmd] - if !ok { - return nil, errors.New("command " + cmd + " not found") - } - return &cmds, nil + if cmds, ok := scroll.Commands[cmd]; ok { + return cmds, nil } else { - return nil, errors.New("process " + processId + " not found") + return nil, errors.New("command " + cmd + " not found") } } diff --git a/internal/handler/scroll_handler.go b/internal/handler/scroll_handler.go index e22747f..cf5e704 100644 --- a/internal/handler/scroll_handler.go +++ b/internal/handler/scroll_handler.go @@ -1,8 +1,6 @@ package handler import ( - "strings" - "github.com/gofiber/fiber/v2" "github.com/highcard-dev/daemon/internal/core/domain" "github.com/highcard-dev/daemon/internal/core/ports" @@ -14,10 +12,10 @@ type ScrollHandler struct { ScrollService ports.ScrollServiceInterface PluginManager ports.PluginManagerInterface ProcessLauncher ports.ProcedureLauchnerInterface + QueueManager ports.QueueManagerInterface } type StartScrollRequestBody struct { - ProcessId string `json:"process"` CommandId string `json:"command"` Sync bool `json:"sync"` } @@ -29,8 +27,8 @@ type StartProcedureRequestBody struct { Sync bool `json:"sync"` } -func NewScrollHandler(scrollService ports.ScrollServiceInterface, pluginManager ports.PluginManagerInterface, processLauncher ports.ProcedureLauchnerInterface) *ScrollHandler { - return &ScrollHandler{ScrollService: scrollService, PluginManager: pluginManager, ProcessLauncher: processLauncher} +func NewScrollHandler(scrollService ports.ScrollServiceInterface, pluginManager ports.PluginManagerInterface, processLauncher ports.ProcedureLauchnerInterface, queueManager ports.QueueManagerInterface) *ScrollHandler { + return &ScrollHandler{ScrollService: scrollService, PluginManager: pluginManager, ProcessLauncher: processLauncher, QueueManager: queueManager} } // @Summary Get current scroll @@ -65,7 +63,7 @@ func (sl ScrollHandler) RunCommand(c *fiber.Ctx) error { } if requestBody.Sync { - err = sl.ProcessLauncher.RunNew(requestBody.CommandId, requestBody.ProcessId, true) + err = sl.QueueManager.AddItem(requestBody.CommandId, true) if err != nil { logger.Log().Error("Error running command (sync)", zap.Error(err)) return c.SendStatus(500) @@ -73,7 +71,7 @@ func (sl ScrollHandler) RunCommand(c *fiber.Ctx) error { return c.SendStatus(200) } else { go func() { - err = sl.ProcessLauncher.RunNew(requestBody.CommandId, requestBody.ProcessId, true) + err = sl.QueueManager.AddItem(requestBody.CommandId, true) if err != nil { logger.Log().Error("Error running command (async)", zap.Error(err)) } @@ -126,23 +124,20 @@ func (sl ScrollHandler) RunProcedure(c *fiber.Ctx) error { } } - parts := strings.Split(requestBody.Process, ".") + command := requestBody.Process + _, err = sl.ScrollService.GetCommand(command) - if len(parts) != 2 { - if procedure.IsInternalMode() { - c.SendString("Invalid process") - return c.SendStatus(400) - } else { - parts = []string{requestBody.Process, ""} - } + if err != nil { + c.SendString("Command not found") + return c.SendStatus(400) } if !requestBody.Sync { - go sl.ProcessLauncher.RunProcedure(&procedure, parts[0], parts[1]) + go sl.ProcessLauncher.RunProcedure(&procedure, command) return c.SendStatus(201) } else { - res, _, err := sl.ProcessLauncher.RunProcedure(&procedure, parts[0], parts[1]) + res, _, err := sl.ProcessLauncher.RunProcedure(&procedure, command) if err != nil { c.SendString(err.Error()) return c.SendStatus(400) diff --git a/internal/signals/process_shutdown.go b/internal/signals/process_shutdown.go index f973c55..b28c079 100644 --- a/internal/signals/process_shutdown.go +++ b/internal/signals/process_shutdown.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "os/signal" - "strings" "syscall" "time" @@ -15,7 +14,7 @@ import ( "go.uber.org/zap" ) -func SetupSignals(processLauncher ports.ProcedureLauchnerInterface, processManager ports.ProcessManagerInterface, app *fiber.App, waitSeconds int) { +func SetupSignals(queueManager ports.QueueManagerInterface, processManager ports.ProcessManagerInterface, app *fiber.App, waitSeconds int) { sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGHUP, @@ -37,11 +36,11 @@ func SetupSignals(processLauncher ports.ProcedureLauchnerInterface, processManag logger.Log().Info("Received shudown signal", zap.String("signal", s.String())) - GracefulShutdown(processLauncher, processManager, app, waitSeconds) + GracefulShutdown(queueManager, processManager, app, waitSeconds) }() } -func GracefulShutdown(processLauncher ports.ProcedureLauchnerInterface, processManager ports.ProcessManagerInterface, app *fiber.App, waitSeconds int) { +func GracefulShutdown(queueManager ports.QueueManagerInterface, processManager ports.ProcessManagerInterface, app *fiber.App, waitSeconds int) { go func() { for { @@ -58,10 +57,7 @@ func GracefulShutdown(processLauncher ports.ProcedureLauchnerInterface, processM }() logger.Log().Info("Stopping all processes by defined routines") - for processName := range processManager.GetRunningProcesses() { - parts := strings.Split(processName, ".") - go processLauncher.RunNew("stop", parts[0], false) //TODO use stop types instead of name - } + go queueManager.AddItem("stop", false) //TODO use stop types instead of name logger.Log().Info(fmt.Sprintf("Waiting for %d seconds...", waitSeconds)) <-time.After(time.Minute) diff --git a/internal/utils/misc.go b/internal/utils/misc.go index fb25dc4..5e95deb 100644 --- a/internal/utils/misc.go +++ b/internal/utils/misc.go @@ -22,20 +22,6 @@ func SplitArtifact(url string) (string, string) { return repo, tag } -func ParseProcessAndCommand(processAndCommand string) (string, string) { - parts := strings.Split(processAndCommand, ".") - - if len(parts) == 1 { - return "main", processAndCommand - } - - if len(parts) != 2 { - return "", "" - } - process, command := parts[0], parts[1] - return process, command -} - func InterfaceToStringSlice(data interface{}) ([]string, error) { instructionsRaw, ok := data.([]interface{}) diff --git a/test/container/minecraft_test.go b/test/container/minecraft_test.go index 4ae49ab..4fdc31f 100644 --- a/test/container/minecraft_test.go +++ b/test/container/minecraft_test.go @@ -60,6 +60,8 @@ func TestContainerMinecraft(t *testing.T) { processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + t.Run("Launch real app from examples", func(t *testing.T) { scrollService.WriteNewScrollLock() @@ -84,7 +86,7 @@ func TestContainerMinecraft(t *testing.T) { if err == nil { println("Connected to server after", time.Since(now).String()) conn.Close() - err = procedureLauncher.RunNew("stop", "main", false) + err = queueManager.AddItem("stop", false) if err != nil { t.Error(err) doneConnecting <- err @@ -111,7 +113,7 @@ func TestContainerMinecraft(t *testing.T) { doneStarting <- errors.New("Timeout Starting") return case <-tick: - err := procedureLauncher.RunNew("start", "main", false) + err := queueManager.AddItem("start", false) if err != nil { t.Error(err) doneStarting <- err diff --git a/test/container/nginx_test.go b/test/container/nginx_test.go index 9956020..570dd2e 100644 --- a/test/container/nginx_test.go +++ b/test/container/nginx_test.go @@ -56,13 +56,14 @@ func TestContainerNginx(t *testing.T) { processMonitor := test_utils.GetMockedProcessMonitor(ctrl) processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) t.Run("Launch real app from examples", func(t *testing.T) { scrollService.WriteNewScrollLock() scrollService.Bootstrap(false) - err := procedureLauncher.RunNew("start", "main", false) + err := queueManager.AddItem("start", false) if err != nil { t.Error(err) } diff --git a/test/integration/example_test.go b/test/integration/example_test.go index 25139a9..3d91541 100644 --- a/test/integration/example_test.go +++ b/test/integration/example_test.go @@ -17,10 +17,11 @@ import ( ) type ServiceConfig struct { - ServiceName string - ExamplePath string - TestAddress string - TestName string + ServiceName string + ExamplePath string + TestAddress string + TestName string + LockFileStatus []string } func TestExamples(t *testing.T) { @@ -30,16 +31,18 @@ func TestExamples(t *testing.T) { configs := []ServiceConfig{ { - ServiceName: "minecraft", - ExamplePath: "../../examples/minecraft/.scroll/scroll.yaml", - TestAddress: "localhost:25565", - TestName: "Minecraft", + ServiceName: "minecraft", + ExamplePath: "../../examples/minecraft/.scroll/scroll.yaml", + TestAddress: "localhost:25565", + TestName: "Minecraft", + LockFileStatus: []string{"start", "install"}, }, { - ServiceName: "nginx", - ExamplePath: "../../examples/nginx/.scroll/scroll.yaml", - TestAddress: "localhost:80", - TestName: "Nginx", + ServiceName: "nginx", + ExamplePath: "../../examples/nginx/.scroll/scroll.yaml", + TestAddress: "localhost:80", + TestName: "Nginx", + LockFileStatus: []string{"start"}, }, // Add more services here } @@ -85,6 +88,9 @@ func TestExamples(t *testing.T) { processMonitor := test_utils.GetMockedProcessMonitor(ctrl) processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + + go queueManager.Work() scrollService.WriteNewScrollLock() scrollService.Bootstrap(false) @@ -110,7 +116,7 @@ func TestExamples(t *testing.T) { if err == nil { println("Connected to server after", time.Since(now).String()) conn.Close() - err = procedureLauncher.RunNew("stop", "main", false) + err = queueManager.AddItem("stop", false) if err != nil { t.Error(err) doneConnecting <- err @@ -130,6 +136,12 @@ func TestExamples(t *testing.T) { timeout := time.After(4 * time.Minute) tick := time.Tick(1 * time.Second) + err := queueManager.AddItem("start", true) + if err != nil { + doneStarting <- err + return + } + for { select { case <-timeout: @@ -137,7 +149,6 @@ func TestExamples(t *testing.T) { doneStarting <- errors.New("Timeout Starting") return case <-tick: - err := procedureLauncher.RunNew("start", "main", false) // If we are not testing a server, we can end the test here if config.TestAddress == "" { @@ -157,6 +168,26 @@ func TestExamples(t *testing.T) { if err != nil { t.Error("Failed to test to server: ", err) } + + lock, err := scrollService.GetLock() + + if err != nil { + t.Error(err) + return + + } + + if len(lock.Statuses) != len(config.LockFileStatus) { + t.Errorf("Lock file statuses count mismatch, expected: %d, got: %d", len(config.LockFileStatus), len(lock.Statuses)) + } + + for _, status := range config.LockFileStatus { + if _, ok := lock.Statuses[status]; !ok { + t.Errorf( + "Lock file status %s not found, expected: %v, got: %v", status, config.LockFileStatus, lock.Statuses, + ) + } + } }) } } diff --git a/test/mock/services.go b/test/mock/services.go index 571505d..3930feb 100644 --- a/test/mock/services.go +++ b/test/mock/services.go @@ -113,18 +113,18 @@ func (m *MockScrollServiceInterface) EXPECT() *MockScrollServiceInterfaceMockRec } // GetCommand mocks base method. -func (m *MockScrollServiceInterface) GetCommand(cmd, processId string) (*domain.CommandInstructionSet, error) { +func (m *MockScrollServiceInterface) GetCommand(cmd string) (*domain.CommandInstructionSet, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetCommand", cmd, processId) + ret := m.ctrl.Call(m, "GetCommand", cmd) ret0, _ := ret[0].(*domain.CommandInstructionSet) ret1, _ := ret[1].(error) return ret0, ret1 } // GetCommand indicates an expected call of GetCommand. -func (mr *MockScrollServiceInterfaceMockRecorder) GetCommand(cmd, processId any) *gomock.Call { +func (mr *MockScrollServiceInterfaceMockRecorder) GetCommand(cmd any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCommand", reflect.TypeOf((*MockScrollServiceInterface)(nil).GetCommand), cmd, processId) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCommand", reflect.TypeOf((*MockScrollServiceInterface)(nil).GetCommand), cmd) } // GetCurrent mocks base method. @@ -249,24 +249,38 @@ func (m *MockProcedureLauchnerInterface) EXPECT() *MockProcedureLauchnerInterfac return m.recorder } -// RunNew mocks base method. -func (m *MockProcedureLauchnerInterface) RunNew(commandId, processId string, changeStatus bool) error { +// LaunchPlugins mocks base method. +func (m *MockProcedureLauchnerInterface) LaunchPlugins() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RunNew", commandId, processId, changeStatus) + ret := m.ctrl.Call(m, "LaunchPlugins") ret0, _ := ret[0].(error) return ret0 } -// RunNew indicates an expected call of RunNew. -func (mr *MockProcedureLauchnerInterfaceMockRecorder) RunNew(commandId, processId, changeStatus any) *gomock.Call { +// LaunchPlugins indicates an expected call of LaunchPlugins. +func (mr *MockProcedureLauchnerInterfaceMockRecorder) LaunchPlugins() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunNew", reflect.TypeOf((*MockProcedureLauchnerInterface)(nil).RunNew), commandId, processId, changeStatus) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LaunchPlugins", reflect.TypeOf((*MockProcedureLauchnerInterface)(nil).LaunchPlugins)) +} + +// Run mocks base method. +func (m *MockProcedureLauchnerInterface) Run(cmd string, runCommandCb func(string) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", cmd, runCommandCb) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockProcedureLauchnerInterfaceMockRecorder) Run(cmd, runCommandCb any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProcedureLauchnerInterface)(nil).Run), cmd, runCommandCb) } // RunProcedure mocks base method. -func (m *MockProcedureLauchnerInterface) RunProcedure(arg0 *domain.Procedure, arg1, arg2 string) (string, *int, error) { +func (m *MockProcedureLauchnerInterface) RunProcedure(arg0 *domain.Procedure, arg1 string) (string, *int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RunProcedure", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "RunProcedure", arg0, arg1) ret0, _ := ret[0].(string) ret1, _ := ret[1].(*int) ret2, _ := ret[2].(error) @@ -274,9 +288,9 @@ func (m *MockProcedureLauchnerInterface) RunProcedure(arg0 *domain.Procedure, ar } // RunProcedure indicates an expected call of RunProcedure. -func (mr *MockProcedureLauchnerInterfaceMockRecorder) RunProcedure(arg0, arg1, arg2 any) *gomock.Call { +func (mr *MockProcedureLauchnerInterfaceMockRecorder) RunProcedure(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunProcedure", reflect.TypeOf((*MockProcedureLauchnerInterface)(nil).RunProcedure), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunProcedure", reflect.TypeOf((*MockProcedureLauchnerInterface)(nil).RunProcedure), arg0, arg1) } // MockPluginManagerInterface is a mock of PluginManagerInterface interface. @@ -446,17 +460,17 @@ func (m *MockProcessManagerInterface) EXPECT() *MockProcessManagerInterfaceMockR } // GetRunningProcess mocks base method. -func (m *MockProcessManagerInterface) GetRunningProcess(process, commandName string) *domain.Process { +func (m *MockProcessManagerInterface) GetRunningProcess(commandName string) *domain.Process { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetRunningProcess", process, commandName) + ret := m.ctrl.Call(m, "GetRunningProcess", commandName) ret0, _ := ret[0].(*domain.Process) return ret0 } // GetRunningProcess indicates an expected call of GetRunningProcess. -func (mr *MockProcessManagerInterfaceMockRecorder) GetRunningProcess(process, commandName any) *gomock.Call { +func (mr *MockProcessManagerInterfaceMockRecorder) GetRunningProcess(commandName any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRunningProcess", reflect.TypeOf((*MockProcessManagerInterface)(nil).GetRunningProcess), process, commandName) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRunningProcess", reflect.TypeOf((*MockProcessManagerInterface)(nil).GetRunningProcess), commandName) } // GetRunningProcesses mocks base method. @@ -474,33 +488,33 @@ func (mr *MockProcessManagerInterfaceMockRecorder) GetRunningProcesses() *gomock } // Run mocks base method. -func (m *MockProcessManagerInterface) Run(process, commandName string, command []string, dir string) (*int, error) { +func (m *MockProcessManagerInterface) Run(commandName string, command []string, dir string) (*int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", process, commandName, command, dir) + ret := m.ctrl.Call(m, "Run", commandName, command, dir) ret0, _ := ret[0].(*int) ret1, _ := ret[1].(error) return ret0, ret1 } // Run indicates an expected call of Run. -func (mr *MockProcessManagerInterfaceMockRecorder) Run(process, commandName, command, dir any) *gomock.Call { +func (mr *MockProcessManagerInterfaceMockRecorder) Run(commandName, command, dir any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProcessManagerInterface)(nil).Run), process, commandName, command, dir) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockProcessManagerInterface)(nil).Run), commandName, command, dir) } // RunTty mocks base method. -func (m *MockProcessManagerInterface) RunTty(process, comandName string, command []string, dir string) (*int, error) { +func (m *MockProcessManagerInterface) RunTty(comandName string, command []string, dir string) (*int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RunTty", process, comandName, command, dir) + ret := m.ctrl.Call(m, "RunTty", comandName, command, dir) ret0, _ := ret[0].(*int) ret1, _ := ret[1].(error) return ret0, ret1 } // RunTty indicates an expected call of RunTty. -func (mr *MockProcessManagerInterfaceMockRecorder) RunTty(process, comandName, command, dir any) *gomock.Call { +func (mr *MockProcessManagerInterfaceMockRecorder) RunTty(comandName, command, dir any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunTty", reflect.TypeOf((*MockProcessManagerInterface)(nil).RunTty), process, comandName, command, dir) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunTty", reflect.TypeOf((*MockProcessManagerInterface)(nil).RunTty), comandName, command, dir) } // WriteStdin mocks base method. @@ -938,3 +952,75 @@ func (mr *MockOciRegistryInterfaceMockRecorder) PushMeta(folder, repo any) *gomo mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushMeta", reflect.TypeOf((*MockOciRegistryInterface)(nil).PushMeta), folder, repo) } + +// MockCronManagerInterface is a mock of CronManagerInterface interface. +type MockCronManagerInterface struct { + ctrl *gomock.Controller + recorder *MockCronManagerInterfaceMockRecorder +} + +// MockCronManagerInterfaceMockRecorder is the mock recorder for MockCronManagerInterface. +type MockCronManagerInterfaceMockRecorder struct { + mock *MockCronManagerInterface +} + +// NewMockCronManagerInterface creates a new mock instance. +func NewMockCronManagerInterface(ctrl *gomock.Controller) *MockCronManagerInterface { + mock := &MockCronManagerInterface{ctrl: ctrl} + mock.recorder = &MockCronManagerInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCronManagerInterface) EXPECT() *MockCronManagerInterfaceMockRecorder { + return m.recorder +} + +// Init mocks base method. +func (m *MockCronManagerInterface) Init() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init") +} + +// Init indicates an expected call of Init. +func (mr *MockCronManagerInterfaceMockRecorder) Init() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockCronManagerInterface)(nil).Init)) +} + +// MockQueueManagerInterface is a mock of QueueManagerInterface interface. +type MockQueueManagerInterface struct { + ctrl *gomock.Controller + recorder *MockQueueManagerInterfaceMockRecorder +} + +// MockQueueManagerInterfaceMockRecorder is the mock recorder for MockQueueManagerInterface. +type MockQueueManagerInterfaceMockRecorder struct { + mock *MockQueueManagerInterface +} + +// NewMockQueueManagerInterface creates a new mock instance. +func NewMockQueueManagerInterface(ctrl *gomock.Controller) *MockQueueManagerInterface { + mock := &MockQueueManagerInterface{ctrl: ctrl} + mock.recorder = &MockQueueManagerInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueueManagerInterface) EXPECT() *MockQueueManagerInterfaceMockRecorder { + return m.recorder +} + +// AddItem mocks base method. +func (m *MockQueueManagerInterface) AddItem(cmd string, changeStatus bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddItem", cmd, changeStatus) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddItem indicates an expected call of AddItem. +func (mr *MockQueueManagerInterfaceMockRecorder) AddItem(cmd, changeStatus any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddItem", reflect.TypeOf((*MockQueueManagerInterface)(nil).AddItem), cmd, changeStatus) +} diff --git a/test/procedure_launcher_test.go b/test/procedure_launcher_test.go deleted file mode 100644 index 16d909f..0000000 --- a/test/procedure_launcher_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package services_test - -import ( - "testing" - - "github.com/Masterminds/semver/v3" - "github.com/highcard-dev/daemon/internal/core/domain" - "github.com/highcard-dev/daemon/internal/core/services" - mock_ports "github.com/highcard-dev/daemon/test/mock" - "go.uber.org/mock/gomock" -) - -func TestProcedureLauncher(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - logManager := mock_ports.NewMockLogManagerInterface(ctrl) - processMonitor := mock_ports.NewMockProcessMonitorInterface(ctrl) - ociRegistryMock := mock_ports.NewMockOciRegistryInterface(ctrl) - pluginManager := mock_ports.NewMockPluginManagerInterface(ctrl) - scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) - - consoleManager := services.NewConsoleManager(logManager) - processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) - procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) - - t.Run("RunNew", func(t *testing.T) { - processMonitor.EXPECT().AddProcess(gomock.Any(), "main.test").Times(1) - processMonitor.EXPECT().RemoveProcess("main.test").Times(1) - - scrollService.EXPECT().GetCommand("test", "main").Return(&domain.CommandInstructionSet{ - Procedures: []*domain.Procedure{ - { - Mode: "exec", - Wait: nil, - Data: []interface{}{"echo", "hello"}, - }, - }, - }, nil).AnyTimes() - - pluginManager.EXPECT().HasMode("exec").Return(false) - - logManager.EXPECT().AddLine("process.main.test", []byte("hello\n")).Times(1) - - scrollService.EXPECT().GetLock().Return(&domain.ScrollLock{ - Statuses: map[string]domain.ScrollLockStatus{}, - ScrollVersion: semver.MustParse("1.0.0"), - ScrollName: "test", - }, nil).AnyTimes() - - scrollService.EXPECT().GetCwd().Return("/tmp").AnyTimes() - - err := procedureLauncher.RunNew("test", "main", false) - if err != nil { - t.Error(err) - } - }) -} diff --git a/test/process_manager_test.go b/test/process_manager_test.go index ddb058b..6f17874 100644 --- a/test/process_manager_test.go +++ b/test/process_manager_test.go @@ -17,10 +17,10 @@ func TestProcessManager(t *testing.T) { processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) t.Run("Run", func(t *testing.T) { - processMonitor.EXPECT().AddProcess(gomock.Any(), "test.echo").Times(1) - processMonitor.EXPECT().RemoveProcess("test.echo").Times(1) - logManager.EXPECT().AddLine("process.test.echo", []byte("hello\n")).Times(1) - exitCode, err := processManager.Run("test", "echo", []string{"echo", "hello"}, "/tmp") + processMonitor.EXPECT().AddProcess(gomock.Any(), "echo").Times(1) + processMonitor.EXPECT().RemoveProcess("echo").Times(1) + logManager.EXPECT().AddLine("process.echo", []byte("hello\n")).Times(1) + exitCode, err := processManager.Run("echo", []string{"echo", "hello"}, "/tmp") if err != nil { t.Error(err) @@ -31,11 +31,11 @@ func TestProcessManager(t *testing.T) { } }) t.Run("RunTty", func(t *testing.T) { - processMonitor.EXPECT().AddProcess(gomock.Any(), "test.echo").Times(1) - processMonitor.EXPECT().RemoveProcess("test.echo").Times(1) + processMonitor.EXPECT().AddProcess(gomock.Any(), "echo").Times(1) + processMonitor.EXPECT().RemoveProcess("echo").Times(1) - logManager.EXPECT().AddLine("process_tty.test.echo", gomock.Any()).MinTimes(1) - exitCode, err := processManager.RunTty("test", "echo", []string{"echo", "hello"}, "/tmp") + logManager.EXPECT().AddLine("process_tty.echo", gomock.Any()).MinTimes(1) + exitCode, err := processManager.RunTty("echo", []string{"echo", "hello"}, "/tmp") if err != nil { t.Error(err) diff --git a/test/queue_manager_test.go b/test/queue_manager_test.go new file mode 100644 index 0000000..52277e7 --- /dev/null +++ b/test/queue_manager_test.go @@ -0,0 +1,328 @@ +package services_test + +import ( + "fmt" + "testing" + + "github.com/Masterminds/semver/v3" + "github.com/highcard-dev/daemon/internal/core/domain" + "github.com/highcard-dev/daemon/internal/core/services" + mock_ports "github.com/highcard-dev/daemon/test/mock" + "go.uber.org/mock/gomock" +) + +type CommandTest struct { + Repeat int + AccualExecution int + RunMode domain.RunMode +} + +func TestQueueManager(t *testing.T) { + + testCases := []CommandTest{ + { + Repeat: 1, + AccualExecution: 1, + RunMode: domain.RunModeAlways, + }, + { + Repeat: 5, + AccualExecution: 5, + RunMode: domain.RunModeAlways, + }, + { + Repeat: 1, + AccualExecution: 1, + RunMode: domain.RunModeOnce, + }, + { + Repeat: 2, + AccualExecution: 1, + RunMode: domain.RunModeOnce, + }, + { + Repeat: 5, + AccualExecution: 1, + RunMode: domain.RunModeOnce, + }, + } + + for _, testCase := range testCases { + + t.Run(fmt.Sprintf("AddItem (RunMode: %s, Repeat: %d)", testCase.RunMode, testCase.Repeat), func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + logManager := mock_ports.NewMockLogManagerInterface(ctrl) + processMonitor := mock_ports.NewMockProcessMonitorInterface(ctrl) + ociRegistryMock := mock_ports.NewMockOciRegistryInterface(ctrl) + pluginManager := mock_ports.NewMockPluginManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + consoleManager := services.NewConsoleManager(logManager) + processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) + procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + + processMonitor.EXPECT().AddProcess(gomock.Any(), "test").AnyTimes() + processMonitor.EXPECT().RemoveProcess("test").AnyTimes() + + scrollService.EXPECT().GetCommand("test").Return(&domain.CommandInstructionSet{ + Run: testCase.RunMode, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello"}, + }, + }, + }, nil).AnyTimes() + + pluginManager.EXPECT().HasMode(gomock.Any()).Return(false).AnyTimes() + + logManager.EXPECT().AddLine("process.test", []byte("hello\n")).Times(testCase.AccualExecution) + + scrollService.EXPECT().GetLock().Return(&domain.ScrollLock{ + Statuses: map[string]domain.ScrollLockStatus{}, + ScrollVersion: semver.MustParse("1.0.0"), + ScrollName: "test", + }, nil).AnyTimes() + + scrollService.EXPECT().GetCwd().Return("/tmp").AnyTimes() + + go queueManager.Work() + + for i := 0; i < testCase.Repeat; i++ { + err := queueManager.AddItem("test", false) + if err != nil { + if testCase.RunMode == domain.RunModeOnce && err == services.ErrCommandDoneOnce { + continue + } + t.Error(err) + } + queueManager.WaitUntilEmpty() + } + }) + + t.Run(fmt.Sprintf("AddItem error first, but after that succeeds (RunMode: %s, Repeat: %d)", testCase.RunMode, testCase.Repeat), func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + processMonitor := mock_ports.NewMockProcessMonitorInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + procedureLauncher := mock_ports.NewMockProcedureLauchnerInterface(ctrl) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + + processMonitor.EXPECT().AddProcess(gomock.Any(), "test").AnyTimes() + processMonitor.EXPECT().RemoveProcess("test").AnyTimes() + + scrollService.EXPECT().GetCommand("test").Return(&domain.CommandInstructionSet{ + Run: testCase.RunMode, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello"}, + }, + }, + }, nil).AnyTimes() + + scrollService.EXPECT().GetLock().Return(&domain.ScrollLock{ + Statuses: map[string]domain.ScrollLockStatus{}, + ScrollVersion: semver.MustParse("1.0.0"), + ScrollName: "test", + }, nil).AnyTimes() + + scrollService.EXPECT().GetCwd().Return("/tmp").AnyTimes() + + times := testCase.AccualExecution + if testCase.RunMode == domain.RunModeOnce && testCase.Repeat > 1 { + times = 2 + } + + first := true + procedureLauncher.EXPECT().Run(gomock.Any(), gomock.Any()).DoAndReturn(func(cmd string, runCommandCb func(cmd string) error) error { + if first { + first = false + return fmt.Errorf("error") + } else { + return nil + } + }).Times(times) + + go queueManager.Work() + + for i := 0; i < testCase.Repeat; i++ { + err := queueManager.AddItem("test", false) + + if err != nil { + if testCase.RunMode == domain.RunModeOnce && err == services.ErrCommandDoneOnce { + continue + } + t.Error(err) + } + queueManager.WaitUntilEmpty() + } + }) + + t.Run(fmt.Sprintf("AddItem Command (RunMode: %s, Repeat: %d)", testCase.RunMode, testCase.Repeat), func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + logManager := mock_ports.NewMockLogManagerInterface(ctrl) + processMonitor := mock_ports.NewMockProcessMonitorInterface(ctrl) + ociRegistryMock := mock_ports.NewMockOciRegistryInterface(ctrl) + pluginManager := mock_ports.NewMockPluginManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + consoleManager := services.NewConsoleManager(logManager) + processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) + procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + + processMonitor.EXPECT().AddProcess(gomock.Any(), "test").AnyTimes() + processMonitor.EXPECT().RemoveProcess("test").AnyTimes() + + scrollService.EXPECT().GetCommand("test").Return(&domain.CommandInstructionSet{ + Run: testCase.RunMode, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello"}, + }, + }, + }, nil).AnyTimes() + + scrollService.EXPECT().GetCommand("test_command").Return(&domain.CommandInstructionSet{ + Procedures: []*domain.Procedure{ + { + Mode: "command", + Wait: nil, + Data: "test", + }, + }, + }, nil).AnyTimes() + + pluginManager.EXPECT().HasMode(gomock.Any()).Return(false).AnyTimes() + + logManager.EXPECT().AddLine("process.test", []byte("hello\n")).Times(testCase.AccualExecution) + + scrollService.EXPECT().GetLock().Return(&domain.ScrollLock{ + Statuses: map[string]domain.ScrollLockStatus{}, + ScrollVersion: semver.MustParse("1.0.0"), + ScrollName: "test", + }, nil).AnyTimes() + + scrollService.EXPECT().GetCwd().Return("/tmp").AnyTimes() + + go queueManager.Work() + + for i := 0; i < testCase.Repeat; i++ { + err := queueManager.AddItem("test_command", false) + if err != nil { + t.Error(err) + } + + queueManager.WaitUntilEmpty() + } + }) + } + + t.Run("AddItem Deep Need Structure", func(t *testing.T) { + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + logManager := mock_ports.NewMockLogManagerInterface(ctrl) + processMonitor := mock_ports.NewMockProcessMonitorInterface(ctrl) + ociRegistryMock := mock_ports.NewMockOciRegistryInterface(ctrl) + pluginManager := mock_ports.NewMockPluginManagerInterface(ctrl) + scrollService := mock_ports.NewMockScrollServiceInterface(ctrl) + + consoleManager := services.NewConsoleManager(logManager) + processManager := services.NewProcessManager(logManager, consoleManager, processMonitor) + procedureLauncher := services.NewProcedureLauncher(ociRegistryMock, processManager, pluginManager, consoleManager, logManager, scrollService) + queueManager := services.NewQueueManager(scrollService, procedureLauncher) + + lock := &domain.ScrollLock{ + Statuses: map[string]domain.ScrollLockStatus{}, + } + scrollService.EXPECT().GetLock().Return(lock, nil).AnyTimes() + processMonitor.EXPECT().AddProcess(gomock.Any(), gomock.Any()).Times(4) + //processMonitor.EXPECT().AddProcess(gomock.Any(), "dep1").Times(1) + //processMonitor.EXPECT().AddProcess(gomock.Any(), "test").Times(1) + + processMonitor.EXPECT().RemoveProcess(gomock.Any()).Times(4) + //processMonitor.EXPECT().RemoveProcess("dep1").Times(1) + //processMonitor.EXPECT().RemoveProcess("test").Times(1) + + scrollService.EXPECT().GetCommand("test").Return(&domain.CommandInstructionSet{ + Needs: []string{"dep1"}, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello"}, + }, + }, + }, nil).AnyTimes() + + scrollService.EXPECT().GetCommand("dep1").Return(&domain.CommandInstructionSet{ + Needs: []string{"dep2.1", "dep2.2"}, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello1"}, + }, + }, + }, nil).AnyTimes() + scrollService.EXPECT().GetCommand("dep2.1").Return(&domain.CommandInstructionSet{ + Run: domain.RunModeOnce, + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello2.1"}, + }, + }, + }, nil).AnyTimes() + scrollService.EXPECT().GetCommand("dep2.2").Return(&domain.CommandInstructionSet{ + Procedures: []*domain.Procedure{ + { + Mode: "exec", + Wait: nil, + Data: []interface{}{"echo", "hello2.2"}, + }, + }, + }, nil).AnyTimes() + + pluginManager.EXPECT().HasMode(gomock.Any()).Return(false).AnyTimes() + + logManager.EXPECT().AddLine(gomock.Any(), gomock.Any()).Times(4) + //logManager.EXPECT().AddLine("process.dep1", gomock.Eq([]byte("hello1\n"))).Times(1) + //logManager.EXPECT().AddLine("process.test", gomock.Eq([]byte("hello\n"))).Times(1) + + scrollService.EXPECT().GetLock().Return(&domain.ScrollLock{ + Statuses: map[string]domain.ScrollLockStatus{}, + ScrollVersion: semver.MustParse("1.0.0"), + ScrollName: "test", + }, nil).AnyTimes() + + scrollService.EXPECT().GetCwd().Return("/tmp").AnyTimes() + + go queueManager.Work() + err := queueManager.AddItem("test", false) + if err != nil { + t.Error(err) + } + + queueManager.WaitUntilEmpty() + + if len(lock.Statuses) != 1 { + t.Error("Lock status must be 1 (dep2.1)") + } + }) +}