From 08c1de4f25edadec6351fe77d2b65662b0aed7ab Mon Sep 17 00:00:00 2001 From: huanglin Date: Tue, 10 Jul 2018 11:22:06 +0800 Subject: [PATCH] MOD: rename JoinGroup() to Start(), ExitGroup() to Stop() --- consumer_group.go | 10 +++++----- consumer_group_test.go | 2 +- example/example.go | 6 +++--- tests/integration/offset_test.go | 2 +- tests/integration/rebalance_test.go | 2 +- tests/integration/util.go | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 41b8717..44e7c66 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -88,9 +88,9 @@ func (cg *ConsumerGroup) initSaramaConsumer() error { return err } -// JoinGroup would register ConsumerGroup, and rebalance would be triggered. +// Start would register ConsumerGroup, and rebalance would be triggered. // ConsumerGroup computes the partitions which should be consumed by consumer's num, and start fetching message. -func (cg *ConsumerGroup) JoinGroup() error { +func (cg *ConsumerGroup) Start() error { // exit when failed to register the consumer err := cg.storage.registerConsumer(cg.name, cg.id, nil) if err != nil && err != zk.ErrNodeExists { @@ -101,9 +101,9 @@ func (cg *ConsumerGroup) JoinGroup() error { return nil } -// ExitGroup would unregister ConsumerGroup, and rebalance would be triggered. +// Stop would unregister ConsumerGroup, and rebalance would be triggered. // The partitions which consumed by this ConsumerGroup would be assigned to others. -func (cg *ConsumerGroup) ExitGroup() { +func (cg *ConsumerGroup) Stop() { cg.stop() cg.wg.Wait() } @@ -196,7 +196,7 @@ CONSUME_TOPIC_LOOP: cg.stopper = make(chan struct{}) cg.rebalanceTrigger = make(chan struct{}) continue CONSUME_TOPIC_LOOP - case <-cg.stopper: // triggered when ExitGroup() is called + case <-cg.stopper: // triggered when Stop() is called cg.logger.WithField("group", cg.name).Info("ConsumerGroup is stopping") wg.Wait() cg.logger.WithField("group", cg.name).Info("ConsumerGroup was stopped") diff --git a/consumer_group_test.go b/consumer_group_test.go index 2a4b5fa..c8f5186 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -16,5 +16,5 @@ func TestConsumerGroup(t *testing.T) { if err != nil { t.Error(err) } - fmt.Println(cg.JoinGroup()) + fmt.Println(cg.Start()) } diff --git a/example/example.go b/example/example.go index 8ba0a38..30f9f6b 100644 --- a/example/example.go +++ b/example/example.go @@ -13,9 +13,9 @@ import ( func handleSignal(sig os.Signal, cg *consumergroup.ConsumerGroup) { switch sig { case syscall.SIGINT: - cg.ExitGroup() + cg.Stop() case syscall.SIGTERM: - cg.ExitGroup() + cg.Stop() default: } } @@ -49,7 +49,7 @@ func main() { registerSignal(cg) - err = cg.JoinGroup() + err = cg.Start() if err != nil { fmt.Println("Failed to join group, err ", err.Error()) os.Exit(1) diff --git a/tests/integration/offset_test.go b/tests/integration/offset_test.go index 0b29e6d..e24147e 100644 --- a/tests/integration/offset_test.go +++ b/tests/integration/offset_test.go @@ -17,7 +17,7 @@ func TestOffsetAutoCommit(t *testing.T) { if err != nil { t.Fatalf("Failed to create consumer instance, err %s", err) } - defer c.ExitGroup() + defer c.Stop() time.Sleep(3000 * time.Millisecond) // we have no way to know if the consumer is ready produceMessages(brokers, topic, 0, count) messages, _ := c.GetMessages(topic) diff --git a/tests/integration/rebalance_test.go b/tests/integration/rebalance_test.go index 60b9f6e..e1143a9 100644 --- a/tests/integration/rebalance_test.go +++ b/tests/integration/rebalance_test.go @@ -54,7 +54,7 @@ func TestRebalance(t *testing.T) { } for _, c := range consumers { - c.ExitGroup() + c.Stop() // no way to exit group when the consumer is rebalancing time.Sleep(3 * time.Second) } diff --git a/tests/integration/util.go b/tests/integration/util.go index 296fc09..f6b5dfe 100644 --- a/tests/integration/util.go +++ b/tests/integration/util.go @@ -52,7 +52,7 @@ func createConsumerInstance(addrs []string, groupID, topic string) (*consumergro if err != nil { return nil, err } - err = cg.JoinGroup() + err = cg.Start() if err != nil { return nil, err }