-
-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feature] Add AMQP (RabbitMQ) support in async consumers API #761
Comments
Hello @deefdragon I knew that adding async consumers feature may quickly result into requests for other native integrations :) I don't want to add support integrations with all the possible queueing systems in Centrifugo core, so need to think a bit about a good strategy here. My first thought that it should be somehow based on a popularity of the system we integrate with - think RabbitMQ is popular enough to be one such system. And one more criteria – I think we need a real use case before writing a new async consumer. Do you have a production system where you want to use Centrifugo with async consuming from Rabbit? If yes - could you describe the use case? If not – I'd say we better leave this issue open for some time first, discuss questions you asked and then wait for the feedback from community. Now to questions - yep, I think you understood right how code is structured now, async consumer is just a Service which consumes some queue and calls Dispatch. In the message from queue system we expect API command to execute - method and payload. The important property of current consumers (PostgreSQL table and Kafka topics) is that they both can maintain message ordering in channels if properly partitioned. RabbitMQ can't do this in basic setup as far I know. It seems that it's possible to achieve with https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams/ and this article additionally describes how to achieve partitioning with it for increased throughput.
I'd say consuming from many queues is nice if possible, for Kafka it's possible to consume from many topics - but that was almost free to implement, not sure how this may be in RabbitMQ case. If it's simple to use many queues - why not and users can decide whether to configure one or many.
Yep, just the note about ordering of processing – I believe most Centrifugo users want to keep order of messages in channels. Of course there could be cases where order is not important. Would be nice to provide an option to keep ordering right from the start. But in general this is why real use case is needed here to have some ground for decisions.
Think applying changes upon restart is sufficient for now. Applying changes on the fly is nice to have - but I afraid can require non-trivial changes in configuration - no enough understanding at this point how difficult this could be.
Yep, currently we test consumers in CI, using docker-compose.yaml to start external services. Locally, just Overall, I'd approach this carefully and would be nice to understand it may be really useful for some Centrifugo user with production app. |
Production Use CaseTo answer the question around current production use with one of a few examples, I have a system that authenticates with twitch for user login (among others). When a user changes their password on twitch, twitch sends out a de-auth message to all 3rd party integrations. On getting this message, I de-auth any active keys to prevent them from refreshing, but before the JWT expires naturally (<5m), I want to be able to inform any active clients to log the user out. (Of the front end specifically so they can not send new http requests, not centrifugo. I am aware PRO does support de-authing tokens). My intent was that users would be able to listen on an appropriate channel on the front end, and when I send a message to the rabbitMQ queue that gets these messages, centrifugo then passes that message on to the appropriate user. (My very initial planning so far has options to substitute headers in the channel so you can target a given channel in a namespace or a given user) In general, that could be handled by an HTTP call, but that also applies for most cases with async consumers I think. As a general case, RabitMQ is automatically integrated to all of my services due to how I initialize my infra (I have a common framework during startup), and the ability to not have to deploy another service to get messaging direct to users would be ideal. IntegrationI'm wondering if it would be possible to use go's plugin system to separate out the different supported async consumers. There are some very specific considerations when it comes to version matching that may complicate that however. As is, I'm going to start with mimicking the existing code to get a hang on how well rabbitMQ can integrate to being with. OrderingTo me, the most important part of centrifugo is how it centralizes sending users events on demand, and the time between events on the same channel for all of my use-cases is normally on the order of several seconds at smallest, and several minutes under normal circumstances. As such, I missed the importance of tight ordering for other users and considered it more an optional feature. Useful, but not required. I will do what I can to take keeping ordering into consideration when working on this. |
I have a basic AMQP (I always forget that's the generic variant of rabbitMQ queues. Ive updated the title) consumer written up, tho I've not gone through and done super rigorous testing as of yet. I have a better understanding of the complexity in the way the consumers are currently written now, and what you mean about strategy. I misinterpreted The version I currently have written follows the pattern of the Kafka consumer and encodes both the payload and the method into a single json message. To me however, the ideal end form for an async amqp consumer would be to only have data/b64data be part of the body and have all the other data pulled from the headers/config. My reasoning for this is to allow setting up the AMQP consumer to listen to the same, already existing messages I'm sending. So in this example, I already have Service 1 and Service 2 parsing data from RabbitMQ queues. I'd like to be able to create a queue that attaches to the existing exchange, and have Centrifugo be able to get/build the channel and other options from the headers and config. In this way, the only change that would have to be made to pass any events to the front end would be to change the centrifugo config, and the rabbitMQ queues, requiring no code changes. The code to do this however is going to be quite a bit more complicated due to the list of options that need to be supported. As I have an active use-case for this, I'm going to split what I have currently off, and try to come up with a proof of concept to allow the more complex formatting use-cases. I will circle around with my findings if I think it can be done reasonably, and stably. (Apologies if I'm getting pushy here. The use case Ive described is somewhat high priority given its security related, but I also have some other stuff undergoing an active migration that this would simplify by leaps and bounds if I can get it functioning) |
Thanks for sharing more details!
Currently the answer to this - we are not ready to include Go plugin support to Centrifugo. If you look in the Go ecosystem most systems that added Go plugin support consider it experimental, and the fact plugin requires exact versions of libraries makes this quite unmanageable.
If ordering is not important for your use case, I suppose it's ok to not have it but think about possible solutions. We can emphasize this in doc if ordering is hard to achieve, just would be nice to understand whether it's hard or impossible.
Think this is the most difficult question here. Yes - consumers in Centrifugo expect messages to be Centrifugo API command. If you want to have some rules how to extract data from RabbitMQ messages - then I suppose it's out of scope of Centrifugo at the moment. Also, I think it would be quite strange/non-obvious if all your messages travelling to various destinations will contain Centrifugo-specific payload in headers. I think the correct solution here – write a service which will transform RabbitMQ messages to Centrifugo format - in this case you can simply use HTTP API I suppose. |
ImplementationsHere is my initial implementation that reads data in from AMQP and dispatches via the existing method used by kafka (I actually used literally the same struct to parse the data). I also have a more advanced variant that allows substituting information in as needed for the channel, method, and the other parameters (this is optional as it defaults to the simple variant). It is somewhat complex to get this method to work however, as it uses go-templates to accomplish the substitution, and some really messy json encoding/decoding to build the payload structure. I'm going to be testing the more advanced version in my production environment to see if I encounter any particular issues/complexities with it, but its been stable so far. If I do get heavy use out of the complex form I'm going to see if I can clean it up & optimize it to not require converting to/from json half a dozen times and actually reasonable to submit a PR for. I also would like to see if I can make it usable on more than just the AMQP consumer so its some kind of standardized. Extra headers
As is, I pass different headers for different destinations, so in several cases I'm already passing extraneous data from the perspective of one program vs another. I also do tracing that is informed by the headers I include in the messages, so for most cases I would arguably not be including any extra data. This is a fair comment however, as that will not necessarily be the case for most users. Overall I think I would prefer having the extra headers as opposed to having to send messages into two separate queues, but that's part of why I'm going to do the testing. To verify if this is the case&actually useful. OrderingAs a side note, I am still researching & testing if ordering is possible. To my understanding the method rabbitMQ uses to build the single consumer streams means that it SHOULD just be a drop in replacement, and you just point the consumer at the appropriate queue. I don't know if the user needs to be able to set any headers for the connection however. |
Hello, took a quick look, still thinking that templating to extract Centrifugo-specific data is a very controversial approach. One of the goals of Centrifugo - we generally try to translate good practices to users. Having large message for all systems seems like an antipattern for most use cases. I understand it reduces the work and number of components, but may become a limitation or security concern in the future. |
Looks like RabbitMQ Super streams provide exact semantics we want in Centrifugo regarding throughput scalability and ordering: https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/ |
Regarding streams, that's actually setup that can actually be done completely on the user's side when they are configuring the queues, so if the user wants to implement the highest level strictness and semantics, they can choose to do so. (I'll write the documentation for this if you are willing to accept the rabbitmq consumer, and recommend&show how to enable streams) I've been using this (the advanced version specifically) for the last several months, and its been very stable, but running full templates for everything was indeed overkill. Everything I used it for just used Would it alleviate enough of the worries you have regarding it being an anit-pattern/security issue if the main data was fed directly in as the content? Having only the channel be adjustable, and only through the request headers? This would also allow for largely simplifying the method used to specify the channel to IE |
@deefdragon hello, just letting you know that I've noticed your comment, unfortunately I lost the context – so will try to refresh my understanding about the implementation you have and come back with thoughts. |
Well, I went through the issue and refreshed the context. Current thoughts: Probably we should introduce a new mode of async consumers. By default, async consumers should be written in a way that they expect a method and API payload command inside the incoming message payload from the external system. I.e. work in a way consumers implemented currently. In PG case method and payload are different columns, in Kafka we expect:
But in the different mode (let's call it Each consumer that implements Also some other thoughts:
What do you think @deefdragon ? Will the design I described above work for you? I still do not want to rush a lot with implementation of this trying to validate the approach first, but it seems to me that with the reduced scope it may be possible to add this to Centrifugo itself and benefit from this for existing Kafka consumer also.\ UPD. Probably, we should also try to support |
@deefdragon – kindly reminding, would be nice to hear your opinion ^^ |
Apologies on the slow response. family issues have taken my time. I'm on leave for a few weeks now, and tackling this is on my list to do. Will have a longer form response later today on your thoughts. |
I think I follow everything youve brought up, but I have some clarifying questions.
Is there any chance you could query some of your users to see how they are using the existing Kafka/Postgres async consumers? See if there is anything that could inform how we are implementing this? I also don't want to rush this. Any implementation needs to be good enough that others are willing to use it. |
Very good question, after thinking a bit – I do not see any major benefit of having it on one level or another, though for some reason biased towards upper level (near the type of consumer) – as an attempt to standardize the mode. Whether different consumers support it or not may be documented and validated on start. At the same time the options how to extract list of channels from external system message will stay inside individual consumer config - it seems impossible to somehow make it the same for everything. In PostgreSQL case channels may be sent comma-separated through
Yeah, I mean we can't expose templating based on
I mean whether approach how consumer acks processed messages in super streams scenario differs somehow from basic queue consuming. Of the API is fully the same – and no differences in consumer code really required.
I see the usage stats of async consuming feature usage, it was introduced relatively recently, so about only 12 setups using it, 10 use Kafka, 2 use PostgreSQL. Will try asking in the community channel for some details from users of feature – though I doubt I'll find any feedback this way. |
I will mess with the config layout to see what feels right, tho I suspect it being at the top level will be. And yeah, it makes sense that there will be some different configuration for each async consumer. I can make the channel just the raw header, but Id really like to have at-least some simple template support (full go templates are def too much after experimenting with them) so that I don't have to add a header for centrifugo specifically. It would be nice if I could get it set up so that there's some standard to work with for all of the implementations. The way RabbitMQ designed their APIs for super-streams and the like, its the exact same API. Changing the settings on the queue side to enable super-streams requires no additional work on the consumer side. (again, I will test this to confirm, but I'm pretty sure on this case) I'm going to try to take a crack at this this week, but I cant guarantee anything. |
Centrifugo already depends on Line 35 in 76cb23b
|
I wanted to prepare barebones for this while working on #832 – and here are some outcomes:
Did MVP for Kafka in v6_dev branch, it looks like this: // KafkaConsumerConfig is a configuration for Kafka async consumer.
type KafkaConsumerConfig struct {
...
// PublicationDataMode is a configuration for the mode where message payload already
// contains data ready to publish into channels, instead of API command.
PublicationDataMode KafkaPublicationDataModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"`
}
// KafkaPublicationDataModeConfig is a configuration for Kafka publication data mode.
// In this mode we expect Kafka message payload to contain data ready to publish into
// channels, instead of API command. All other fields used to build channel Publication
// can be passed in Kafka message headers – thus it's possible to integrate existing
// topics with Centrifugo.
type KafkaPublicationDataModeConfig struct {
// Enabled enables Kafka publication data mode for the Kafka consumer.
Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
// ChannelsHeader is a header name to extract channels to publish data into
// (channels must be comma-separated). Ex. of value: "channel1,channel2".
ChannelsHeader string `mapstructure:"channels_header" json:"channels_header" envconfig:"channels_header" yaml:"channels_header" toml:"channels_header"`
// IdempotencyKeyHeader is a header name to extract Publication idempotency key from
// Kafka message. See https://centrifugal.dev/docs/server/server_api#publishrequest.
IdempotencyKeyHeader string `mapstructure:"idempotency_key_header" json:"idempotency_key_header" envconfig:"idempotency_key_header" yaml:"idempotency_key_header" toml:"idempotency_key_header"`
// DeltaHeader is a header name to extract Publication delta flag from Kafka message
// which tells Centrifugo whether to use delta compression for message or not.
// See https://centrifugal.dev/docs/server/delta_compression and
// https://centrifugal.dev/docs/server/server_api#publishrequest.
DeltaHeader string `mapstructure:"delta_header" json:"delta_header" envconfig:"delta_header" yaml:"delta_header" toml:"delta_header"`
} For the previous discussion it means the following:
I just prototyped this all yesterday, maybe some better ideas will come up as time goes... |
Is your feature request related to a problem? Please describe.
This is a re-opening of #373 (and a few others) to add RabbitMQ as an async consumer input now that several existing asynchronous consumers have been added in V5.2.0.
I created this as a new ticket specifically because I am looking into writing a PR for it, and want to have an open ticket to collect the questions I have in.
Describe the solution you'd like
Addition of RabbitMQ
Design Questions
I have been looking at the code for the existing consumers, and it appears that the consumer has to do 3 things.
Everything else appears to be consumer specific. Am I missing anything?
Consumer Multiplexing
Is a new consumer created for each specified input? IE if I have 2 tables I am reading data in from in postgres, (or 2 queues in rabbitMQ) Would I be calling the setup function twice, or is the consumer client set up once, and expected to handle the multiple inputs itself?
Multiple instances of centrifugo
When centrifugo is scaled out horizontally, it appears that postgres divides the work into partitions to make sure each piece of data is only handled by one instance each. Does the dispatch call properly send data between the instances so that the clients are all properly informed?
If so, RabbitMQ would be able to have each client in the centrifugo instance connect to the same queue, and the data would be processed in a partitioned manner automatically. I just wish to make sure that is acceptable.
Updates Mid-Execution
Is the expectation that the user restarts centrifugo to apply updates to asnyc consumers? or must they be able to adjust on the fly to changes in config?
Testing expectations
I will be able to write some unit tests for the RabbitMQ consumer of course, but I also saw some things that makes me think there are integration tests. Is that the case, and if so, what are the expectations around integration tests (and where might I look for more information/examples on those)?
The text was updated successfully, but these errors were encountered: