Replies: 9 comments 6 replies
-
@danielsinai I liked the idea, it is actually exactly the vision of Memphis, to be modular and extensible as possible. |
Beta Was this translation helpful? Give feedback.
-
That is fair enough. Would you like to set up a kickoff meeting? |
Beta Was this translation helpful? Give feedback.
-
former #849 was updated - please review proposed solution |
Beta Was this translation helpful? Give feedback.
-
package adapter
// https://github.com/orgs/memphisdev/discussions/857
//
// Adapter simplifies creation of satellite process for memphis core server.
//
// It supports:
// - modular design (module is block in terms of adapter)
// - order of blocks initialization
// - convenient negotiation between blocks
// - server heartbeat
// - graceful shutdown
// Adapter process consists of set of asynchronously running blocks.
// Some blocks belong to adapter itself:
// - initiator
// - shutdownListener
// Another are developed for specific process or environment, e.g.:
// - serverConnector
// - syslogReceiver
// - syslogPublisher
// - restGateway
//
// Block has Name (analog of golang type) and Responsibility (instance of specific block)
// This separation allows to run simultaneously blocks with the same Name.
// Block has set of the callbacks:
// - mandatory: Init|Run|Finish
// - optional: OnServerConnect|OnServerDisconnect|OnEnqueue
//
//
// Simplified Block life cycle:
//
// Init
// Run
// OnServerConnect
// [*]OnEnqueue
// OnServerDisconnect
// Finish
//
// Please pay attention, that after Run order of callbacks will be unpredictable.
//
type Block struct {
Name string
Responsibility string
Init Init
Run Run
Finish Finish
OnConnect OnServerConnect
OnDisconnect OnServerDisconnect
OnEnqueue OnEnqueue
}
// Init callback is executed by adapter once during initialization.
// Blocks are initialized in sequenced order according to configuration.
// Some rules :
// - don't run hard processing within Init
// - don't work with server till call of OnServerConnect
type Init func(conf any) error
// After successful initialization of ALL blocks, adapter creates goroutine and calls Run
// Other callbacks will be executed on another goroutines
// After Run block is allowed to negotiate with another blocks of the process
// via BlockController
type Run func(self BlockController)
// Finish callback is executed by adapter once during shutdown of the process.
// Blocks are finished in reverse order.
//
// For tests adapter supports possibility to run Init/Finish blocks per test.
type Finish func() error
// Optional OnServerConnect callback is executed by adapter after successful
// connection to server.
type OnServerConnect func(sc any, lp any)
// Optional OnServerDisconnect callback is executed by adapter when previously
// connected server disconnects.
type OnServerDisconnect func(sc any)
// Because asynchronous nature of blocks, negotiation between blocks done using 'bags'
// Developers of blocks should agree on content of bags.
// Adapter doesn't force specific format of the bag
// with one exception: key of the map should not start from "__".
// This prefix is used by adapter for house-keeping values.
// You can enqueue nil bug, but on the side of the receiver it may be not nil,
// because data added by adapter
type Bag map[string]any
// Optional OnEnqueue callback is executed by adapter as result of receiving Bag.
// Block can send bag to itself.
type OnEnqueue func(bb Bag)
// BlockController provides possibility for negotiation between blocks
// Block gets own controller as parameter of Run
type BlockController interface {
Name() string
Responsibility() string
// Asynchronously send bag to controlled block
// true is returned if controlled block has OnEnqueue callback
Enqueue(bb Bag) bool
// Asynchronously notify controlled block about server status
//
// true is returned if controlled block has OnServerConnect callback
ServerConnected(sc any, lp any) bool
// true is returned if controlled block has OnServerDisconnect callback
ServerDisconnected(sc any) bool
// Asynchronously call Finish callback of controlled block
//
Finish()
// Get controller of block
//
Controller(resp string) (bc BlockController, exists bool)
}
// Adapter has little knowledge about memphis internals.
// This is the reason to define server connection, logger and
// configuration as any.
// Use type assertions for "casting" to concrete interface/implementation:
// .............
// logger, ok := lp.(*log.Logger)
//
// if ok {
// logger.Println(....)
// }
// ............. |
Beta Was this translation helpful? Give feedback.
-
Adapter functionality also implemented as set of blocks:
package adapter
import (
"os"
"os/signal"
"syscall"
)
const SigListenerBlock = "finisher"
func init() {
RegisterBlockFactory(InitiatorBlock,
func(resp string) Block {
finisher := new(finisherBlock)
block := Block{
Name: SigListenerBlock,
Responsibility: SigListenerBlock,
Init: finisher.init,
Run: finisher.run,
Finish: finisher.finish,
}
return block
})
}
type finisherBlock struct {
done chan struct{}
term chan os.Signal
bc BlockController
}
func (bl *finisherBlock) init(conf any) error {
return nil
}
func (bl *finisherBlock) run(self BlockController) {
bl.bc = self
bl.done = make(chan struct{})
bl.term = make(chan os.Signal, 2)
defer close(bl.term)
signal.Notify(bl.term, syscall.SIGINT, syscall.SIGTERM)
select {
case <-bl.done:
return
case <-bl.term:
if bl.bc != nil {
ibc, ok := bl.bc.Controller(InitiatorBlock)
if ok {
ibc.Finish()
}
}
}
}
func (bl *finisherBlock) finish() error {
close(bl.done)
return nil
} |
Beta Was this translation helpful? Give feedback.
-
Additional info for discussion: Regarding usage of the one process for more than one protocol - design allows multi- Code for creation of pluggable/modular/etc adapter will be separated library. This approach has several advantages:
|
Beta Was this translation helpful? Give feedback.
-
Development of #849 repos and stages: [Note] You can see above that adapter doesn't depend on memphis code and functionality
Next stage - embedding of adapter within memphis core server:
Please pay attention that HLD of adapter doesn't depend on specific go features. Your comments are welcome as usual |
Beta Was this translation helpful? Give feedback.
-
Hi @g41797, and just to make sure I got you right (anyway it would be great to have a diagram), the flow goes like this (Kafka API for example): Did I missed something? |
Beta Was this translation helpful? Give feedback.
-
Actually Server interface should be implemented: type Configurator interface {
// Returns configuration of running process
// If implementation supports multithreading,
// every call may return the same "object"
// Otherwise copy/clone of configuration should be returned
Configuration() (conf any, err error)
}
type Server interface {
Configurator
// Connects to the server and return connection to server and logger
// for send log information to the server
// If connection failed, returns error.
// ' Connect' for already connected
// and still not brocken connection should
// return the same values returned in previous
// successful call(s) and nil error
//
Connect() (conn, logger any, err error)
// Returns false if
// - was not connected at all
// - was connected, but connection is brocken
// True returned if
// - connected and connection is alive
IsConnected() bool
} |
Beta Was this translation helpful? Give feedback.
-
Building a pluggable protocol adapter to Memphis to enable the community to add more protocols and adapters like Kafka API / Pulsar / amqp / and more.
Beta Was this translation helpful? Give feedback.
All reactions