Skip to content

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Notifications You must be signed in to change notification settings

bgsrb/apache-kafka-in-6-minutes

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Apache Kafka in 6 minutes

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In this video I explain partitioning, consumer offsets, replication and many other concepts found in Kafka.

Apache Kafka in 6 minutes

Example

package main

import (
	"encoding/json"
	"fmt"

	k "github.com/segmentio/kafka-go"

	"github.com/sohamkamani/golang-kafka-example/kafka"
)

func main() {

	k := kafka.New([]string{"localhost:9092"}, "test",
		kafka.WithPartitions(3),
		kafka.WithReplications(1),
		kafka.WithBalancer(&k.Hash{}),
	)

	//producers
	k.Producer(produce("producer_1"))

	fmt.Println("--------------------------------------------")

	//consumers
	go k.Consumer("mobile", consume("mobile_consumer_1"))
	go k.Consumer("mobile", consume("mobile_consumer_2"))
	go k.Consumer("mobile", consume("mobile_consumer_3"))

	go k.Consumer("computer", consume("computer_consumer_1"))
	go k.Consumer("computer", consume("computer_consumer_2"))
	go k.Consumer("computer", consume("computer_consumer_3"))

	exit()
}

type Event struct {
	Event string
	Key   string
	Team1 string
	Team2 string
}

func produce(id string) func(producer *kafka.Producer) {

	return func(producer *kafka.Producer) {
		events := []Event{{
			Event: "Querter Start",
			Key:   "America_Canada",
			Team1: "America",
			Team2: "Canada",
		}, {
			Event: "Foul",
			Key:   "Malta_Portugal",
			Team1: "Malta",
			Team2: "Portugal",
		}, {
			Event: "Score 39-46",
			Key:   "America_Canada",
			Team1: "America",
			Team2: "Canada",
		}, {
			Event: "Free Throw",
			Key:   "Brazil_Australia",
			Team1: "Brazil",
			Team2: "Australia",
		}, {
			Event: "Score 41-46",
			Key:   "America_Canada",
			Team1: "America",
			Team2: "Canada",
		}, {
			Event: "Querter End",
			Key:   "Brazil_Australia",
			Team1: "Brazil",
			Team2: "Australia",
		}}
		for _, event := range events {
			value, _ := json.Marshal(event)
			err := producer.Produce(event.Key, value)
			if err != nil {
				fmt.Println("could not write message " + err.Error())
			}
			fmt.Printf("%s -> %s %s : %s\n", id, event.Team1, event.Team2, event.Event)
		}
	}
}

func consume(id string) func(consumer *kafka.Consumer) {
	return func(consumer *kafka.Consumer) {
		for {
			msg, err := consumer.ReadMessage()
			if err != nil {
				fmt.Println("could not read message " + err.Error())
			}
			var event Event
			json.Unmarshal(msg.Value, &event)

			fmt.Printf("%s -> %s %s : %s\n", id, event.Team1, event.Team2, event.Event)
		}
	}
}

func exit() {
	exit := make(chan int)
	<-exit
}

Output

producer_1 -> America Canada : Querter Start
producer_1 -> Malta Portugal : Foul
producer_1 -> America Canada : Score 39-46
producer_1 -> Brazil Australia : Free Throw
producer_1 -> America Canada : Score 41-46
producer_1 -> Brazil Australia : Querter End
--------------------------------------------
mobile_consumer_2 -> America Canada : Querter Start
mobile_consumer_3 -> Malta Portugal : Foul
mobile_consumer_1 -> America Canada : Score 39-46
mobile_consumer_2 -> Brazil Australia : Free Throw
mobile_consumer_1 -> Brazil Australia : Querter End
mobile_consumer_3 -> America Canada : Score 41-46
computer_consumer_2 -> America Canada : Querter Start
computer_consumer_2 -> Brazil Australia : Free Throw
computer_consumer_3 -> America Canada : Score 39-46
computer_consumer_1 -> Malta Portugal : Foul
computer_consumer_3 -> Brazil Australia : Querter End
computer_consumer_1 -> America Canada : Score 41-46

About

A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages