Bibliotek for enkelt å kunne lage mikrotjenester som bruker konseptet rapids and rivers til @fredgeorge.
- Alle publiserer på rapid. Kan lese fra flere topics, men publiserer kun på rapid-topic
- Rivers filtrerer meldinger etter hvilke kriterier de har
isalive
er true så snart rapids connection er startetisready
er true så snartonStartup
-lytterne er ferdige. KafkaRapid vil ikke begynne å polle meldinger før etter onStartup-lytterne er ferdige, og vil dermed ikke bli assignet partisjoner av brokerne.- Rivers vil kun få packets i
onPacket
nårMessageProblems
er fri for feilmeldinger (errors og severe) - Rivers kan bruke
require*()
-funksjoner for å akkumulere errors i etMessageProblems
-objekt som sendes tilonError
- Rivers kan bruke
demand*()
-funksjoner for å stoppe parsing ved feil. Exception sendes tilonSevere
Man kan bruke en kombinasjon av demand*()
og require*()
. For eksempel om alle meldingene har et @event_name
, så kan man bruke
demandValue("@event_name", "my_event")
for å avbryte parsing når event-navnet er ikke som forventet. Dersom man har alle andre former
for validering med require*()
, så kan man f.eks. logge innholdet i pakken i onError
i lag med en feilmelding som sier noe sånn som klarte ikke å parse my_event
.
Dersom man ikke benytter seg av demand*()
så er det umulig å vite i onError()
hvorvidt @event_name
var forventet verdi eller ikke, og logging vil dermed ende opp med å spamme
med alle meldinger på rapiden som riveren ikke forstår.
- Kjør migreringer i
onStartup
- Bruk rollout strategy
Recreate
. Ellers vil du ha én pod som leser meldinger og skriver til db, mens den andre holder på med migreringer
- Samme kjøreregler som over, bare at du vil få nedetid på api-et
- Rest-api-delen av appen bør skilles ut som egen app som har readonly-connection mot databasen. Dersom migreringene er bakover-kompatible så kan unngår man nedetid, og man kan migrere en "live" database
- Tut å kjør. Rollout strategy
RollingUpdate
vil fungere helt utmerket
fun main() {
val env = System.getenv()
val dataSourceBuilder = DataSourceBuilder(env)
val dataSource = dataSourceBuilder.getDataSource()
RapidApplication.create(env).apply {
MyCoolApp(this, MyDao(dataSource))
}.apply {
register(object : RapidsConnection.StatusListener {
override fun onStartup(rapidsConnection: RapidsConnection) {
// migrate database before consuming messages, but after rapids have started (and isalive returns OK)
dataSourceBuilder.migrate()
}
})
}.start()
}
internal class MyCoolApp(
rapidsConnection: RapidsConnection,
private val myDao: MyDao
) : River.PacketListener {
init {
River(rapidsConnection).apply {
validate { it.demandValue("@event_name", "my_event") }
validate { it.requireKey("a_required_key") }
// nested objects can be chained using "."
validate { it.requireValue("nested.key", "works_as_well") }
}.register(this)
}
override fun onError(problems: MessageProblems, context: RapidsConnection.MessageContext) {
/* fordi vi bruker demandValue() på event_name kan vi trygt anta at meldingen
er "my_event", og at det er minst én av de ulike require*() som har feilet */
}
override fun onPacket(packet: JsonMessage, context: RapidsConnection.MessageContext) {
println(packet["a_required_key"].asText())
// nested objects can be chained using "."
println(packet["nested.key"].asText())
}
}
- Servicebruker mountes inn på
/var/run/secrets/nais.io/service_user
- Bootstrap servers angis ved miljøvariabel
KAFKA_BOOTSTRAP_SERVERS
- Consumer group angis med miljøvariabel
KAFKA_CONSUMER_GROUP_ID
- Rapid topic angis med miljøvariabel
KAFKA_RAPID_TOPIC
- Rivers angis med miljøvariabel
KAFKA_EXTRA_TOPIC
(Komma separert liste hvis flere rivers.)
Rapids-biblioteket bundler egen logback.xml
så det trengs ikke spesifiseres i mikrotjenestene.
Den bundlede logback.xml
har konfigurasjon for secureLogs (men husk å enable secureLogs i nais.yaml!), tilgjengelig med:
LoggerFactory.getLogger("tjenestekall")
Spørsmål knyttet til koden eller prosjektet kan stilles som issues her på GitHub.
Interne henvendelser kan sendes via Slack i kanalen #hm-rapids-and-rivers.