Skip to content

Commit

Permalink
MOD: rename JoinGroup() to Start(), ExitGroup() to Stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
huanglin committed Jul 10, 2018
1 parent 8bf24eb commit 08c1de4
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 12 deletions.
10 changes: 5 additions & 5 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func (cg *ConsumerGroup) initSaramaConsumer() error {
return err
}

// JoinGroup would register ConsumerGroup, and rebalance would be triggered.
// Start would register ConsumerGroup, and rebalance would be triggered.
// ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message.
func (cg *ConsumerGroup) JoinGroup() error {
func (cg *ConsumerGroup) Start() error {
// exit when failed to register the consumer
err := cg.storage.registerConsumer(cg.name, cg.id, nil)
if err != nil && err != zk.ErrNodeExists {
Expand All @@ -101,9 +101,9 @@ func (cg *ConsumerGroup) JoinGroup() error {
return nil
}

// ExitGroup would unregister ConsumerGroup, and rebalance would be triggered.
// Stop would unregister ConsumerGroup, and rebalance would be triggered.
// The partitions which consumed by this ConsumerGroup would be assigned to others.
func (cg *ConsumerGroup) ExitGroup() {
func (cg *ConsumerGroup) Stop() {
cg.stop()
cg.wg.Wait()
}
Expand Down Expand Up @@ -196,7 +196,7 @@ CONSUME_TOPIC_LOOP:
cg.stopper = make(chan struct{})
cg.rebalanceTrigger = make(chan struct{})
continue CONSUME_TOPIC_LOOP
case <-cg.stopper: // triggered when ExitGroup() is called
case <-cg.stopper: // triggered when Stop() is called
cg.logger.WithField("group", cg.name).Info("ConsumerGroup is stopping")
wg.Wait()
cg.logger.WithField("group", cg.name).Info("ConsumerGroup was stopped")
Expand Down
2 changes: 1 addition & 1 deletion consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ func TestConsumerGroup(t *testing.T) {
if err != nil {
t.Error(err)
}
fmt.Println(cg.JoinGroup())
fmt.Println(cg.Start())
}
6 changes: 3 additions & 3 deletions example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
func handleSignal(sig os.Signal, cg *consumergroup.ConsumerGroup) {
switch sig {
case syscall.SIGINT:
cg.ExitGroup()
cg.Stop()
case syscall.SIGTERM:
cg.ExitGroup()
cg.Stop()
default:
}
}
Expand Down Expand Up @@ -49,7 +49,7 @@ func main() {

registerSignal(cg)

err = cg.JoinGroup()
err = cg.Start()
if err != nil {
fmt.Println("Failed to join group, err ", err.Error())
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestOffsetAutoCommit(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create consumer instance, err %s", err)
}
defer c.ExitGroup()
defer c.Stop()
time.Sleep(3000 * time.Millisecond) // we have no way to know if the consumer is ready
produceMessages(brokers, topic, 0, count)
messages, _ := c.GetMessages(topic)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestRebalance(t *testing.T) {
}

for _, c := range consumers {
c.ExitGroup()
c.Stop()
// no way to exit group when the consumer is rebalancing
time.Sleep(3 * time.Second)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func createConsumerInstance(addrs []string, groupID, topic string) (*consumergro
if err != nil {
return nil, err
}
err = cg.JoinGroup()
err = cg.Start()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 08c1de4

Please sign in to comment.