2
2
* Copyright Strimzi authors.
3
3
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4
4
*/
5
-
6
5
package io .strimzi .kafka .bridge .http ;
7
6
8
7
import com .fasterxml .jackson .databind .JsonNode ;
18
17
import io .strimzi .kafka .bridge .config .BridgeConfig ;
19
18
import io .strimzi .kafka .bridge .http .converter .JsonUtils ;
20
19
import io .strimzi .kafka .bridge .http .model .HttpBridgeError ;
20
+ import io .strimzi .kafka .bridge .metrics .JmxMetricsCollector ;
21
21
import io .strimzi .kafka .bridge .metrics .MetricsCollector ;
22
+ import io .strimzi .kafka .bridge .metrics .MetricsType ;
23
+ import io .strimzi .kafka .bridge .metrics .StrimziMetricsCollector ;
22
24
import io .vertx .core .AbstractVerticle ;
23
25
import io .vertx .core .Promise ;
24
26
import io .vertx .core .file .FileSystem ;
43
45
import org .apache .logging .log4j .LogManager ;
44
46
import org .apache .logging .log4j .Logger ;
45
47
48
+ import javax .management .MalformedObjectNameException ;
49
+ import java .io .BufferedReader ;
50
+ import java .io .IOException ;
51
+ import java .io .InputStream ;
52
+ import java .io .InputStreamReader ;
53
+ import java .nio .charset .StandardCharsets ;
54
+ import java .nio .file .Files ;
46
55
import java .util .HashMap ;
47
56
import java .util .HashSet ;
48
57
import java .util .Iterator ;
49
58
import java .util .Map ;
50
59
import java .util .Set ;
60
+ import java .util .stream .Collectors ;
51
61
52
62
import static io .netty .handler .codec .http .HttpHeaderNames .ACCEPT ;
53
63
import static io .netty .handler .codec .http .HttpHeaderNames .ACCESS_CONTROL_ALLOW_METHODS ;
59
69
/**
60
70
* Main bridge class listening for connections and handling HTTP requests.
61
71
*/
62
- @ SuppressWarnings ({"checkstyle:MemberName" })
72
+ @ SuppressWarnings ({"checkstyle:MemberName" , "checkstyle:ClassDataAbstractionCoupling" , "checkstyle:ClassFanOutComplexity" })
63
73
public class HttpBridge extends AbstractVerticle {
64
74
private static final Logger LOGGER = LogManager .getLogger (HttpBridge .class );
65
75
@@ -76,17 +86,59 @@ public class HttpBridge extends AbstractVerticle {
76
86
77
87
private final Map <ConsumerInstanceId , Long > timestampMap = new HashMap <>();
78
88
79
- private final MetricsCollector metricsCollector ;
89
+ private MetricsCollector metricsCollector = null ;
80
90
81
91
/**
82
92
* Constructor
83
93
*
84
94
* @param bridgeConfig bridge configuration
85
- * @param metricsCollector metricsCollector instance for scraping metrics from different registries
86
95
*/
87
- public HttpBridge (BridgeConfig bridgeConfig , MetricsCollector metricsCollector ) {
96
+ public HttpBridge (BridgeConfig bridgeConfig ) {
88
97
this .bridgeConfig = bridgeConfig ;
89
- this .metricsCollector = metricsCollector ;
98
+ if (bridgeConfig .getMetricsType () != null ) {
99
+ if (bridgeConfig .getMetricsType () == MetricsType .JMX_EXPORTER ) {
100
+ this .metricsCollector = createJmxMetricsCollector (bridgeConfig );
101
+ } else if (bridgeConfig .getMetricsType () == MetricsType .STRIMZI_REPORTER ) {
102
+ this .metricsCollector = new StrimziMetricsCollector ();
103
+ }
104
+ }
105
+ if (metricsCollector != null ) {
106
+ LOGGER .info ("Metrics of type '{}' enabled and exposed on /metrics endpoint" , bridgeConfig .getMetricsType ());
107
+ }
108
+ }
109
+
110
+ /**
111
+ * Create a JmxMetricsCollector instance with the YAML configuration filters.
112
+ * This is loaded from a custom config file if present or from the default configuration file.
113
+ *
114
+ * @return JmxCollectorRegistry instance
115
+ */
116
+ private static JmxMetricsCollector createJmxMetricsCollector (BridgeConfig bridgeConfig ) {
117
+ try {
118
+ if (bridgeConfig .getJmxExporterConfigPath () == null ) {
119
+ // load default configuration
120
+ LOGGER .info ("Using default JMX metrics configuration" );
121
+ InputStream is = Application .class .getClassLoader ().getResourceAsStream ("jmx_metrics_config.yaml" );
122
+ if (is == null ) {
123
+ return null ;
124
+ }
125
+ try (BufferedReader reader = new BufferedReader (new InputStreamReader (is , StandardCharsets .UTF_8 ))) {
126
+ String yaml = reader
127
+ .lines ()
128
+ .collect (Collectors .joining ("\n " ));
129
+ return new JmxMetricsCollector (yaml );
130
+ }
131
+ } else if (Files .exists (bridgeConfig .getJmxExporterConfigPath ())) {
132
+ // load custom configuration file
133
+ LOGGER .info ("Loading custom JMX metrics configuration file from {}" , bridgeConfig .getJmxExporterConfigPath ());
134
+ String yaml = Files .readString (bridgeConfig .getJmxExporterConfigPath (), StandardCharsets .UTF_8 );
135
+ return new JmxMetricsCollector (yaml );
136
+ } else {
137
+ throw new RuntimeException ("Custom JMX metrics configuration file not found" );
138
+ }
139
+ } catch (IOException | MalformedObjectNameException e ) {
140
+ throw new RuntimeException ("Failed to initialize JMX metrics collector" , e );
141
+ }
90
142
}
91
143
92
144
private void bindHttpServer (Promise <Void > startPromise ) {
@@ -235,7 +287,6 @@ private CorsHandler getCorsHandler() {
235
287
236
288
@ Override
237
289
public void stop (Promise <Void > stopPromise ) {
238
-
239
290
LOGGER .info ("Stopping HTTP-Kafka bridge verticle ..." );
240
291
241
292
this .isReady = false ;
@@ -563,9 +614,7 @@ private void metrics(RoutingContext routingContext) {
563
614
.setStatusCode (HttpResponseStatus .OK .code ())
564
615
.end (metricsCollector .scrape ());
565
616
} else {
566
- HttpBridgeError error = new HttpBridgeError (HttpResponseStatus .NOT_FOUND .code (), "The metrics endpoint is not enabled." );
567
- HttpUtils .sendResponse (routingContext , HttpResponseStatus .NOT_FOUND .code (),
568
- BridgeContentType .KAFKA_JSON , JsonUtils .jsonToBytes (error .toJson ()));
617
+ HttpUtils .sendResponse (routingContext , HttpResponseStatus .NOT_FOUND .code (), null , null );
569
618
}
570
619
}
571
620
0 commit comments