-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathconsumer_metrics.go
87 lines (67 loc) · 3.03 KB
/
consumer_metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/* Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
package gonzo
import (
"fmt"
"github.com/rcrowley/go-metrics"
)
// Metrics is a set of all metrics for one Consumer instance.
type Metrics struct {
// Consumer is all metrics for enclosing Consumer instance.
Consumer ConsumerMetrics
// PartitionConsumers is a map of topic/partitions to PartitionConsumer metrics.
PartitionConsumers map[string]map[int32]PartitionConsumerMetrics
}
// ConsumerMetrics is an interface for accessing and modifying Consumer metrics.
type ConsumerMetrics interface {
// NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by
// enclosing Consumer.
NumOwnedTopicPartitions(func(metrics.Counter))
// Registry provides access to metrics registry for enclosing Consumer.
Registry() metrics.Registry
// Stop unregisters all metrics from the registry.
Stop()
}
// KafkaConsumerMetrics implements ConsumerMetrics and is used when ConsumerConfig.EnableMetrics is set to true.
type KafkaConsumerMetrics struct {
registry metrics.Registry
numOwnedTopicPartitions metrics.Counter
}
// NewKafkaConsumerMetrics creates new KafkaConsumerMetrics for a given consumer group.
func NewKafkaConsumerMetrics(groupID string, consumerID string) *KafkaConsumerMetrics {
registry := metrics.NewPrefixedRegistry(fmt.Sprintf("%s.%s.", groupID, consumerID))
return &KafkaConsumerMetrics{
registry: registry,
numOwnedTopicPartitions: metrics.NewRegisteredCounter("numOwnedTopicPartitions", registry),
}
}
// NumOwnedTopicPartitions is a counter which value is the number of currently owned topic partitions by
// enclosing Consumer.
func (cm *KafkaConsumerMetrics) NumOwnedTopicPartitions(f func(metrics.Counter)) {
f(cm.numOwnedTopicPartitions)
}
// Registry provides access to metrics registry for enclosing Consumer.
func (cm *KafkaConsumerMetrics) Registry() metrics.Registry {
return cm.registry
}
// Stop unregisters all metrics from the registry.
func (cm *KafkaConsumerMetrics) Stop() {
cm.registry.UnregisterAll()
}
var noOpConsumerMetrics = new(noOpKafkaConsumerMetrics)
type noOpKafkaConsumerMetrics struct{}
func (*noOpKafkaConsumerMetrics) NumOwnedTopicPartitions(func(metrics.Counter)) {}
func (*noOpKafkaConsumerMetrics) Registry() metrics.Registry {
panic("Registry() call on no op metrics")
}
func (*noOpKafkaConsumerMetrics) Stop() {}