From ffe1b5b785882900d313c4cd6f62160e81c8f344 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 16:11:15 +0800 Subject: [PATCH 1/9] add tool getSyncStateDataCommand --- .../rocketmq/client/impl/MQClientAPIImpl.java | 19 +++ .../rocketmq/common/protocol/RequestCode.java | 2 + .../common/protocol/body/InSyncStateData.java | 124 +++++++++++++++++ .../rocketmq/controller/Controller.java | 6 + .../controller/impl/DledgerController.java | 6 + .../impl/event/ControllerResult.java | 4 + .../impl/manager/ReplicasInfoManager.java | 25 ++++ .../processor/ControllerRequestProcessor.java | 13 ++ .../rocketmq/namesrv/NamesrvController.java | 1 + .../processor/ControllerRequestProcessor.java | 14 ++ .../tools/admin/DefaultMQAdminExt.java | 18 ++- .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 14 +- .../tools/command/MQAdminStartup.java | 8 +- .../command/ha/SyncStateDataCommand.java | 130 ++++++++++++++++++ 15 files changed, 375 insertions(+), 15 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/InSyncStateData.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 614ce9fb86b..5af467ee163 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -88,6 +88,7 @@ import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; @@ -2854,4 +2855,22 @@ public HARuntimeInfo getBrokerHAStatus(final String brokerAddr, final long timeo throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public InSyncStateData getInSyncStateData(final String controllerAddress, + final List brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, null); + final byte[] body = RemotingSerializable.encode(brokers); + request.setBody(body); + RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + assert response != null; + + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return RemotingSerializable.decode(response.getBody(), InSyncStateData.class); + } + default: + break; + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 7ba9fd04636..8dbcd1a5011 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -250,4 +250,6 @@ public class RequestCode { public static final int CONTROLLER_GET_REPLICA_INFO = 1004; public static final int CONTROLLER_GET_METADATA_INFO = 1005; + + public static final int CONTROLLER_GET_SYNC_STATE_DATA = 1006; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/InSyncStateData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/InSyncStateData.java new file mode 100644 index 00000000000..0d953c9d2b5 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/InSyncStateData.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class InSyncStateData extends RemotingSerializable { + private Map inSyncTable; + + public InSyncStateData() { + this.inSyncTable = new HashMap<>(); + } + + public void addInSyncState(final String brokerName, final InSyncState inSyncState) { + this.inSyncTable.put(brokerName, inSyncState); + } + + public Map getInSyncTable() { + return inSyncTable; + } + + public void setInSyncTable( + Map inSyncTable) { + this.inSyncTable = inSyncTable; + } + + public static class InSyncState extends RemotingSerializable { + private String masterAddress; + private int masterEpoch; + private int syncStateSetEpoch; + private List inSyncMembers; + + public InSyncState(String masterAddress, int masterEpoch, int syncStateSetEpoch, + List inSyncMembers) { + this.masterAddress = masterAddress; + this.masterEpoch = masterEpoch; + this.syncStateSetEpoch = syncStateSetEpoch; + this.inSyncMembers = inSyncMembers; + } + + public String getMasterAddress() { + return masterAddress; + } + + public void setMasterAddress(String masterAddress) { + this.masterAddress = masterAddress; + } + + public int getMasterEpoch() { + return masterEpoch; + } + + public void setMasterEpoch(int masterEpoch) { + this.masterEpoch = masterEpoch; + } + + public int getSyncStateSetEpoch() { + return syncStateSetEpoch; + } + + public void setSyncStateSetEpoch(int syncStateSetEpoch) { + this.syncStateSetEpoch = syncStateSetEpoch; + } + + public List getInSyncMembers() { + return inSyncMembers; + } + + public void setInSyncMembers( + List inSyncMembers) { + this.inSyncMembers = inSyncMembers; + } + } + + public static class InSyncMember extends RemotingSerializable { + private String address; + private Long brokerId; + + public InSyncMember(String address, Long brokerId) { + this.address = address; + this.brokerId = brokerId; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public Long getBrokerId() { + return brokerId; + } + + public void setBrokerId(Long brokerId) { + this.brokerId = brokerId; + } + + @Override public String toString() { + return "InSyncMember{" + + "address='" + address + '\'' + + ", brokerId=" + brokerId + + '}'; + } + } +} diff --git a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java index fca8f55654e..b4ed69a409f 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/Controller.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/Controller.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.controller; +import java.util.List; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.protocol.body.SyncStateSet; import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader; @@ -88,6 +89,11 @@ CompletableFuture alterSyncStateSet( */ CompletableFuture getReplicaInfo(final GetReplicaInfoRequestHeader request); + /** + * Get inSyncStateData for target brokers, this api is used for admin tools. + */ + CompletableFuture getSyncStateData(final List brokerNames); + /** * Get Metadata of controller * diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java index 8a4428f3bd9..6240e54b6d6 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java @@ -154,6 +154,12 @@ public CompletableFuture getReplicaInfo(final GetReplicaInfoReq () -> this.replicasInfoManager.getReplicaInfo(request), false); } + @Override + public CompletableFuture getSyncStateData(List brokerNames) { + return this.scheduler.appendEvent("getSyncStateData", + () -> this.replicasInfoManager.getInSyncStateData(brokerNames), false); + } + @Override public RemotingCommand getControllerMetadata() { final MemberState state = getMemberState(); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java index 41af2af04e0..5c6318434b8 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/event/ControllerResult.java @@ -27,6 +27,10 @@ public class ControllerResult { private int responseCode = ResponseCode.SUCCESS; private String remark; + public ControllerResult() { + this(null); + } + public ControllerResult(T response) { this.events = new ArrayList<>(); this.response = response; diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index 3f991df4e95..c213405595d 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -16,8 +16,10 @@ */ package org.apache.rocketmq.controller.impl.manager; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiPredicate; @@ -27,6 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.namesrv.ControllerConfig; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; import org.apache.rocketmq.common.protocol.body.SyncStateSet; import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader; @@ -277,6 +280,28 @@ public ControllerResult getReplicaInfo(final GetRe return result; } + public ControllerResult getInSyncStateData(final List brokerNames) { + final ControllerResult result = new ControllerResult<>(); + final InSyncStateData inSyncStateData = new InSyncStateData(); + for (String brokerName : brokerNames) { + if (isContainsBroker(brokerName)) { + // If exist broker metadata, just return metadata + final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName); + final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName); + final Set syncStateSet = replicasInfo.getSyncStateSet(); + final ArrayList inSyncMembers = new ArrayList<>(); + syncStateSet.forEach(replicas -> { + inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerInfo.getBrokerId(replicas))); + }); + + final InSyncStateData.InSyncState inSyncState = new InSyncStateData.InSyncState(replicasInfo.getMasterAddress(), replicasInfo.getMasterEpoch(), replicasInfo.getSyncStateSetEpoch(), inSyncMembers); + inSyncStateData.addInSyncState(brokerName, inSyncState); + } + } + result.setBody(inSyncStateData.encode()); + return result; + } + /** * Apply events to memory statemachine. * diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java index bf185c82630..164e9e701c2 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.controller.processor; import io.netty.channel.ChannelHandlerContext; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; @@ -43,6 +44,7 @@ import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO; +import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER; /** @@ -114,6 +116,17 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand final BrokerHeartbeatRequestHeader requestHeader = request.decodeCommandCustomHeader(BrokerHeartbeatRequestHeader.class); this.heartbeatManager.onBrokerHeartbeat(requestHeader.getClusterName(), requestHeader.getBrokerAddr()); } + case CONTROLLER_GET_SYNC_STATE_DATA: { + if (request.getBody() != null) { + final List brokerNames = RemotingSerializable.decode(request.getBody(), List.class); + if (brokerNames != null && brokerNames.size() > 0) { + final CompletableFuture future = this.controller.getSyncStateData(brokerNames); + if (future != null) { + return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); + } + } + } + } default: { final String error = " request type " + request.getCode() + " not supported"; return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 3f2e0f7f464..55dd1d7b8b4 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -276,6 +276,7 @@ private void registerProcessor() { this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor); this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_REPLICA_INFO, controllerRequestProcessor, this.controllerRequestExecutor); this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_METADATA_INFO, controllerRequestProcessor, this.controllerRequestExecutor); + this.remotingServer.registerProcessor(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, controllerRequestProcessor, this.controllerRequestExecutor); } } } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java index 01e75803d3e..45bc0efe7e6 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.namesrv.processor; import io.netty.channel.ChannelHandlerContext; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.constant.LoggerName; @@ -38,6 +39,7 @@ import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_ELECT_MASTER; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_METADATA_INFO; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_REPLICA_INFO; +import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_GET_SYNC_STATE_DATA; import static org.apache.rocketmq.common.protocol.RequestCode.CONTROLLER_REGISTER_BROKER; /** @@ -97,6 +99,18 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand case CONTROLLER_GET_METADATA_INFO: { return this.controller.getControllerMetadata(); } + case CONTROLLER_GET_SYNC_STATE_DATA: { + if (request.getBody() != null) { + final List brokerNames = RemotingSerializable.decode(request.getBody(), List.class); + if (brokerNames != null && brokerNames.size() > 0) { + log.info("Try get sync state data for brokers:{}", brokerNames); + final CompletableFuture future = this.controller.getSyncStateData(brokerNames); + if (future != null) { + return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); + } + } + } + } default: { final String error = " request type " + request.getCode() + " not supported"; return RemotingCommand.createResponseCommand(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index b8bb399794b..fc1e76ee4c7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -16,6 +16,11 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQBrokerException; @@ -38,6 +43,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; @@ -59,12 +65,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.MessageTrack; - -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; import org.apache.rocketmq.tools.admin.common.AdminToolResult; public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { @@ -731,6 +731,12 @@ public HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnect return this.defaultMQAdminExtImpl.getBrokerHAStatus(brokerAddr); } + @Override + public InSyncStateData getInSyncStateData(String controllerAddress, + List brokers) throws RemotingException, InterruptedException, MQBrokerException { + return this.defaultMQAdminExtImpl.getInSyncStateData(controllerAddress, brokers); + } + @Override public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 7b78ead3a55..333d5e2e4f4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -75,6 +75,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; @@ -1605,6 +1606,11 @@ public void resetOffsetByQueueId(final String brokerAddr, final String consumeGr return this.mqClientInstance.getMQClientAPIImpl().getBrokerHAStatus(brokerAddr, timeoutMillis); } + @Override public InSyncStateData getInSyncStateData(String controllerAddress, + List brokers) throws RemotingException, InterruptedException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getInSyncStateData(controllerAddress, brokers); + } + @Override public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 5ed147c13d5..34fb2d78af5 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -16,6 +16,11 @@ */ package org.apache.rocketmq.tools.admin; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.rocketmq.client.MQAdmin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -37,6 +42,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; @@ -56,12 +62,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult; import org.apache.rocketmq.tools.admin.api.MessageTrack; - -import java.io.UnsupportedEncodingException; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; import org.apache.rocketmq.tools.admin.common.AdminToolResult; public interface MQAdminExt extends MQAdmin { @@ -383,6 +383,8 @@ MessageExt queryMessage(String clusterName, HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; + InSyncStateData getInSyncStateData(String controllerAddress, List brokers) throws RemotingException, InterruptedException, MQBrokerException ; + /** * Reset master flush offset in slave * diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index bc720383b5c..5939abe71e0 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -19,6 +19,8 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.joran.JoranConfigurator; import ch.qos.logback.core.joran.spi.JoranException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; @@ -58,6 +60,7 @@ import org.apache.rocketmq.tools.command.export.ExportMetadataCommand; import org.apache.rocketmq.tools.command.export.ExportMetricsCommand; import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand; +import org.apache.rocketmq.tools.command.ha.SyncStateDataCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; @@ -92,9 +95,6 @@ import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - public class MQAdminStartup { protected static List subCommandList = new ArrayList(); @@ -246,6 +246,8 @@ public static void initCommand() { initCommand(new ExportMetricsCommand()); initCommand(new HAStatusSubCommand()); + + initCommand(new SyncStateDataCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java new file mode 100644 index 00000000000..0577fa68461 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.ha; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.protocol.body.InSyncStateData; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class SyncStateDataCommand implements SubCommand { + @Override + public String commandName() { + return "syncStateData"; + } + + @Override + public String commandDesc() { + return "Fetch syncStateData for target brokers"; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("a", "controllerAddress", true, "the address of controller"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "brokerName", true, "which broker to fetch"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "follow", true, "the interval(second) of get info"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + if (commandLine.hasOption('f')) { + String flushSecondStr = commandLine.getOptionValue('f'); + int flushSecond = 3; + if (flushSecondStr != null && !flushSecondStr.trim().equals("")) { + flushSecond = Integer.parseInt(flushSecondStr); + } + + defaultMQAdminExt.start(); + + while (true) { + this.innerExec(commandLine, options, defaultMQAdminExt); + Thread.sleep(flushSecond * 1000); + } + } else { + defaultMQAdminExt.start(); + + this.innerExec(commandLine, options, defaultMQAdminExt); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private void innerExec(CommandLine commandLine, Options options, + DefaultMQAdminExt defaultMQAdminExt) throws Exception { + String controllerAddress = commandLine.getOptionValue('a').trim(); + if (commandLine.hasOption('b')) { + String brokerName = commandLine.getOptionValue('b').trim(); + final ArrayList brokers = new ArrayList<>(); + brokers.add(brokerName); + printData(controllerAddress, brokers, defaultMQAdminExt); + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + Set brokerNames = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); + printData(controllerAddress, new ArrayList<>(brokerNames), defaultMQAdminExt); + } else { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } + } + + private void printData(String controllerAddress, List brokerNames, + DefaultMQAdminExt defaultMQAdminExt) throws Exception { + if (brokerNames.size() > 0) { + final InSyncStateData syncStateData = defaultMQAdminExt.getInSyncStateData(controllerAddress, brokerNames); + final Map syncTable = syncStateData.getInSyncTable(); + for (Map.Entry next : syncTable.entrySet()) { + final List syncMembers = next.getValue().getInSyncMembers(); + System.out.printf("\n#brokerName\t%s\n#MasterAddr\t%s\n#MasterEpoch\t%d\n#SyncStateSetEpoch\t%d\n#SyncStateSetMemberNums\t%d\n", + next.getKey(), next.getValue().getMasterAddress(), next.getValue().getMasterEpoch(), next.getValue().getSyncStateSetEpoch(), + syncMembers.size()); + for (InSyncStateData.InSyncMember member : syncMembers) { + System.out.printf("\n member:\t%s", member.toString()); + } + } + } + } +} From 0106e20d461b21da96cf8b3e19c832c023dd71ed Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 16:38:17 +0800 Subject: [PATCH 2/9] get controller leaderAddr when execute command --- .../rocketmq/client/impl/MQClientAPIImpl.java | 22 ++++++++++++++++--- .../controller/impl/DledgerController.java | 1 + .../processor/ControllerRequestProcessor.java | 1 - .../command/ha/SyncStateDataCommand.java | 2 +- 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 5af467ee163..60700b6a470 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -180,6 +180,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; @@ -208,6 +209,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import static org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS; + public class MQClientAPIImpl implements NameServerUpdateCallback { private final static InternalLogger log = ClientLogger.getLog(); private static boolean sendSmartMsg = @@ -2856,14 +2859,27 @@ public HARuntimeInfo getBrokerHAStatus(final String brokerAddr, final long timeo throw new MQBrokerException(response.getCode(), response.getRemark()); } + public GetMetaDataResponseHeader getControllerMetaData(final String controllerAddress) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, RemotingCommandException, MQBrokerException { + final RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_METADATA_INFO, null); + final RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + assert response != null; + if (response.getCode() == SUCCESS) { + return response.decodeCommandCustomHeader(GetMetaDataResponseHeader.class); + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } + public InSyncStateData getInSyncStateData(final String controllerAddress, - final List brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + final List brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingCommandException { + final GetMetaDataResponseHeader controllerMetaData = getControllerMetaData(controllerAddress); + assert controllerMetaData != null; + assert controllerMetaData.getControllerLeaderAddress() != null; + final String leaderAddress = controllerMetaData.getControllerLeaderAddress(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, null); final byte[] body = RemotingSerializable.encode(brokers); request.setBody(body); - RemotingCommand response = this.remotingClient.invokeSync(controllerAddress, request, 3000); + RemotingCommand response = this.remotingClient.invokeSync(leaderAddress, request, 3000); assert response != null; - switch (response.getCode()) { case ResponseCode.SUCCESS: { return RemotingSerializable.decode(response.getBody(), InSyncStateData.class); diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java index 6240e54b6d6..a0fdb99d40c 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java @@ -156,6 +156,7 @@ public CompletableFuture getReplicaInfo(final GetReplicaInfoReq @Override public CompletableFuture getSyncStateData(List brokerNames) { + return this.scheduler.appendEvent("getSyncStateData", () -> this.replicasInfoManager.getInSyncStateData(brokerNames), false); } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java index 45bc0efe7e6..ee69d909487 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java @@ -103,7 +103,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand if (request.getBody() != null) { final List brokerNames = RemotingSerializable.decode(request.getBody(), List.class); if (brokerNames != null && brokerNames.size() > 0) { - log.info("Try get sync state data for brokers:{}", brokerNames); final CompletableFuture future = this.controller.getSyncStateData(brokerNames); if (future != null) { return future.get(WAIT_TIMEOUT_OUT, TimeUnit.SECONDS); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java index 0577fa68461..cf07933d08c 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java @@ -34,7 +34,7 @@ public class SyncStateDataCommand implements SubCommand { @Override public String commandName() { - return "syncStateData"; + return "getSyncStateSet"; } @Override From c1c8a8186ac92ef56eaaedb76c5332fff5d3b9c6 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 19:40:07 +0800 Subject: [PATCH 3/9] modify getControllerMetadata api --- .../broker/hacontroller/ReplicasManager.java | 6 +++--- .../rocketmq/namesrv/NamesrvController.java | 6 +++++- .../processor/ControllerRequestProcessor.java | 20 ++++++++++++++++--- .../AutoSwitchRoleIntegrationTest.java | 1 + 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java index 41addf37ce0..08faec2e3d1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java @@ -333,9 +333,9 @@ private boolean updateControllerMetadata() { for (String address : this.controllerAddresses) { try { final GetMetaDataResponseHeader responseHeader = this.brokerOuterAPI.getControllerMetaData(address); - if (responseHeader != null && responseHeader.isLeader()) { - this.controllerLeaderAddress = address; - LOGGER.info("Change controller leader address to {}", this.controllerAddresses); + if (responseHeader != null && StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) { + this.controllerLeaderAddress = responseHeader.getControllerLeaderAddress(); + LOGGER.info("Change controller leader address to {}", this.controllerLeaderAddress); return true; } } catch (final Exception ignore) { diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java index 55dd1d7b8b4..b6a109a6117 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java @@ -270,7 +270,7 @@ private void registerProcessor() { this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor); if (controllerConfig.isEnableStartupController()) { - final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this.controller); + final ControllerRequestProcessor controllerRequestProcessor = new ControllerRequestProcessor(this); this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET, controllerRequestProcessor, this.controllerRequestExecutor); this.remotingServer.registerProcessor(RequestCode.CONTROLLER_ELECT_MASTER, controllerRequestProcessor, this.controllerRequestExecutor); this.remotingServer.registerProcessor(RequestCode.CONTROLLER_REGISTER_BROKER, controllerRequestProcessor, this.controllerRequestExecutor); @@ -345,6 +345,10 @@ public void setRemotingServer(RemotingServer remotingServer) { this.remotingServer = remotingServer; } + public Controller getController() { + return controller; + } + public Configuration getConfiguration() { return configuration; } diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java index ee69d909487..2651636e3c7 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ControllerRequestProcessor.java @@ -20,16 +20,19 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.SyncStateSet; import org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader; import org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoRequestHeader; import org.apache.rocketmq.controller.Controller; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -48,10 +51,12 @@ public class ControllerRequestProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME); private static final int WAIT_TIMEOUT_OUT = 5; + private final NamesrvController namesrvController; private final Controller controller; - public ControllerRequestProcessor(final Controller controller) { - this.controller = controller; + public ControllerRequestProcessor(final NamesrvController namesrvController) { + this.namesrvController = namesrvController; + this.controller = namesrvController.getController(); } @Override @@ -97,7 +102,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand break; } case CONTROLLER_GET_METADATA_INFO: { - return this.controller.getControllerMetadata(); + final RemotingCommand response = this.controller.getControllerMetadata(); + final GetMetaDataResponseHeader responseHeader = (GetMetaDataResponseHeader) response.readCustomHeader(); + if (StringUtils.isNoneEmpty(responseHeader.getControllerLeaderAddress())) { + final String leaderAddress = responseHeader.getControllerLeaderAddress(); + // Because the controller is proxy by namesrv, so we should replace the controllerAddress to namesrvAddress. + final int splitIndex = StringUtils.lastIndexOf(leaderAddress, ":"); + final String namesrvAddress = leaderAddress.substring(0, splitIndex + 1) + this.namesrvController.getNettyServerConfig().getListenPort(); + responseHeader.setControllerLeaderAddress(namesrvAddress); + } + return response; } case CONTROLLER_GET_SYNC_STATE_DATA: { if (request.getBody() != null) { diff --git a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java index d2219a870be..bdf23ecbd79 100644 --- a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java +++ b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java @@ -58,6 +58,7 @@ public void init(int mappedFileSize) throws Exception { final NettyServerConfig serverConfig = new NettyServerConfig(); int namesrvPort = nextPort(); serverConfig.setListenPort(namesrvPort); + System.out.println(namesrvPort); this.controllerConfig = buildControllerConfig("n0", peers); this.namesrvController = new NamesrvController(new NamesrvConfig(), serverConfig, new NettyClientConfig(), controllerConfig); From 00b4471faccb68ca748be81a2097bfe8000e6c25 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 19:49:42 +0800 Subject: [PATCH 4/9] code review --- .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 60700b6a470..c01045344b1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -2871,10 +2871,12 @@ public GetMetaDataResponseHeader getControllerMetaData(final String controllerAd public InSyncStateData getInSyncStateData(final String controllerAddress, final List brokers) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingCommandException { + // Get controller leader address. final GetMetaDataResponseHeader controllerMetaData = getControllerMetaData(controllerAddress); assert controllerMetaData != null; assert controllerMetaData.getControllerLeaderAddress() != null; final String leaderAddress = controllerMetaData.getControllerLeaderAddress(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA, null); final byte[] body = RemotingSerializable.encode(brokers); request.setBody(body); From cdeddd90caf7275e4a2b371622189e2c8fb7f5eb Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 20:01:43 +0800 Subject: [PATCH 5/9] code review --- .../rocketmq/controller/impl/DledgerController.java | 2 +- .../controller/impl/manager/ReplicasInfoManager.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java index a0fdb99d40c..eb85719d953 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DledgerController.java @@ -158,7 +158,7 @@ public CompletableFuture getReplicaInfo(final GetReplicaInfoReq public CompletableFuture getSyncStateData(List brokerNames) { return this.scheduler.appendEvent("getSyncStateData", - () -> this.replicasInfoManager.getInSyncStateData(brokerNames), false); + () -> this.replicasInfoManager.getSyncStateData(brokerNames), false); } @Override diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java index c213405595d..a34aedd13ea 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java @@ -280,7 +280,7 @@ public ControllerResult getReplicaInfo(final GetRe return result; } - public ControllerResult getInSyncStateData(final List brokerNames) { + public ControllerResult getSyncStateData(final List brokerNames) { final ControllerResult result = new ControllerResult<>(); final InSyncStateData inSyncStateData = new InSyncStateData(); for (String brokerName : brokerNames) { @@ -289,12 +289,14 @@ public ControllerResult getInSyncStateData(final List brokerNames) final InSyncReplicasInfo replicasInfo = this.inSyncReplicasInfoTable.get(brokerName); final BrokerInfo brokerInfo = this.replicaInfoTable.get(brokerName); final Set syncStateSet = replicasInfo.getSyncStateSet(); + final String master = replicasInfo.getMasterAddress(); final ArrayList inSyncMembers = new ArrayList<>(); syncStateSet.forEach(replicas -> { - inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerInfo.getBrokerId(replicas))); + long brokerId = StringUtils.equals(master, replicas) ? MixAll.MASTER_ID : brokerInfo.getBrokerId(replicas); + inSyncMembers.add(new InSyncStateData.InSyncMember(replicas, brokerId)); }); - final InSyncStateData.InSyncState inSyncState = new InSyncStateData.InSyncState(replicasInfo.getMasterAddress(), replicasInfo.getMasterEpoch(), replicasInfo.getSyncStateSetEpoch(), inSyncMembers); + final InSyncStateData.InSyncState inSyncState = new InSyncStateData.InSyncState(master, replicasInfo.getMasterEpoch(), replicasInfo.getSyncStateSetEpoch(), inSyncMembers); inSyncStateData.addInSyncState(brokerName, inSyncState); } } From 887e1790a008ad7942b6b22194e2bef130e993c7 Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 20:51:45 +0800 Subject: [PATCH 6/9] add tool get brokerEpochCache --- .../broker/hacontroller/ReplicasManager.java | 5 + .../processor/AdminBrokerProcessor.java | 47 +++++--- .../rocketmq/client/impl/MQClientAPIImpl.java | 15 +++ .../apache/rocketmq/common/EpochEntry.java | 4 +- .../rocketmq/common/protocol/RequestCode.java | 2 + .../common/protocol/body/EpochEntryCache.java | 75 ++++++++++++ .../ha/autoswitch/AutoSwitchHAService.java | 4 + .../tools/admin/DefaultMQAdminExt.java | 6 + .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 5 +- .../rocketmq/tools/command/CommandUtil.java | 9 ++ .../tools/command/MQAdminStartup.java | 4 +- .../broker/GetBrokerEpochCacheCommand.java | 113 ++++++++++++++++++ ...aCommand.java => SyncStateSetCommand.java} | 6 +- 14 files changed, 280 insertions(+), 21 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java rename tools/src/main/java/org/apache/rocketmq/tools/command/ha/{SyncStateDataCommand.java => SyncStateSetCommand.java} (97%) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java index 08faec2e3d1..fc0f252b4c1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/hacontroller/ReplicasManager.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.EpochEntry; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.ThreadFactoryImpl; @@ -409,4 +410,8 @@ public int getMasterEpoch() { public List getControllerAddresses() { return controllerAddresses; } + + public List getEpochEntries() { + return this.haService.getEpochEntries(); + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 0917858f7dc..4b015ffe46f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -20,7 +20,19 @@ import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -31,10 +43,12 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ExpressionMessageFilter; +import org.apache.rocketmq.broker.hacontroller.ReplicasManager; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -54,6 +68,7 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.message.MessageId; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.RequestCode; @@ -65,6 +80,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeQueueData; import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; import org.apache.rocketmq.common.protocol.body.KVTable; @@ -152,7 +168,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.store.ConsumeQueueExt; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; @@ -162,19 +177,6 @@ import org.apache.rocketmq.store.queue.CqUnit; import org.apache.rocketmq.store.queue.ReferredIterator; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentMap; - import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class AdminBrokerProcessor implements NettyRequestProcessor { @@ -295,6 +297,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.getBrokerHaStatus(ctx, request); case RequestCode.RESET_MASTER_FLUSH_OFFSET: return this.resetMasterFlushOffset(ctx, request); + case RequestCode.GET_BROKER_EPOCH_CACHE: + return this.getBrokerEpochCache(ctx, request); default: return getUnknownCmdResponse(ctx, request); } @@ -2354,6 +2358,21 @@ private RemotingCommand getBrokerHaStatus(ChannelHandlerContext ctx, RemotingCom return response; } + private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingCommand request) { + final ReplicasManager replicasManager = this.brokerController.getReplicasManager(); + assert replicasManager != null; + final BrokerConfig brokerConfig = this.brokerController.getBrokerConfig(); + final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), + brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries()); + + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + response.setBody(entryCache.encode()); + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + private RemotingCommand resetMasterFlushOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c01045344b1..cf4fe19ba7d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -85,6 +85,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; @@ -2891,4 +2892,18 @@ public InSyncStateData getInSyncStateData(final String controllerAddress, } throw new MQBrokerException(response.getCode(), response.getRemark()); } + + public EpochEntryCache getBrokerEpochCache(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_EPOCH_CACHE, null); + final RemotingCommand response = this.remotingClient.invokeSync(brokerAddr, request, 3000); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return RemotingSerializable.decode(response.getBody(), EpochEntryCache.class); + } + default: + break; + } + throw new MQBrokerException(response.getCode(), response.getRemark()); + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java index 6cd3ceb232f..8c71275b8ca 100644 --- a/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java +++ b/common/src/main/java/org/apache/rocketmq/common/EpochEntry.java @@ -17,7 +17,9 @@ package org.apache.rocketmq.common; -public class EpochEntry { +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class EpochEntry extends RemotingSerializable { private int epoch; private long startOffset; diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 8dbcd1a5011..b6313c58305 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@ -252,4 +252,6 @@ public class RequestCode { public static final int CONTROLLER_GET_METADATA_INFO = 1005; public static final int CONTROLLER_GET_SYNC_STATE_DATA = 1006; + + public static final int GET_BROKER_EPOCH_CACHE = 1007; } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java new file mode 100644 index 00000000000..46c5c94bae7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.protocol.body; + +import java.util.List; +import org.apache.rocketmq.common.EpochEntry; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + +public class EpochEntryCache extends RemotingSerializable { + private String clusterName; + private String brokerName; + private long brokerId; + private List epochList; + + public EpochEntryCache(String clusterName, String brokerName, long brokerId, List epochList) { + this.clusterName = clusterName; + this.brokerName = brokerName; + this.brokerId = brokerId; + this.epochList = epochList; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + public long getBrokerId() { + return brokerId; + } + + public void setBrokerId(long brokerId) { + this.brokerId = brokerId; + } + + public List getEpochList() { + return epochList; + } + + public void setEpochList(List epochList) { + this.epochList = epochList; + } + + @Override public String toString() { + return "EpochEntryCache{" + + "clusterName='" + clusterName + '\'' + + ", brokerName='" + brokerName + '\'' + + ", epochList=" + epochList + + '}'; + } +} diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java index 94176008274..773817072ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java @@ -286,6 +286,10 @@ public long truncateInvalidMsg() { return reputFromOffset; } + public List getEpochEntries() { + return this.epochCache.getAllEntries(); + } + class AutoSwitchAcceptSocketService extends AcceptSocketService { public AutoSwitchAcceptSocketService(int port) { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index fc1e76ee4c7..d7de1124d3d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; import org.apache.rocketmq.common.protocol.body.InSyncStateData; @@ -737,6 +738,11 @@ public InSyncStateData getInSyncStateData(String controllerAddress, return this.defaultMQAdminExtImpl.getInSyncStateData(controllerAddress, brokers); } + @Override public EpochEntryCache getBrokerEpochCache( + String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException { + return this.defaultMQAdminExtImpl.getBrokerEpochCache(brokerAddr); + } + @Override public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, MQBrokerException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 333d5e2e4f4..30216730927 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -73,6 +73,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; import org.apache.rocketmq.common.protocol.body.InSyncStateData; @@ -1611,6 +1612,11 @@ public void resetOffsetByQueueId(final String brokerAddr, final String consumeGr return this.mqClientInstance.getMQClientAPIImpl().getInSyncStateData(controllerAddress, brokers); } + @Override public EpochEntryCache getBrokerEpochCache( + String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException { + return this.mqClientInstance.getMQClientAPIImpl().getBrokerEpochCache(brokerAddr); + } + @Override public void resetMasterFlushOffset(String brokerAddr, long masterFlushOffset) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { this.mqClientInstance.getMQClientAPIImpl().resetMasterFlushOffset(brokerAddr, masterFlushOffset); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 34fb2d78af5..1b349227725 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; import org.apache.rocketmq.common.protocol.body.GroupList; import org.apache.rocketmq.common.protocol.body.HARuntimeInfo; import org.apache.rocketmq.common.protocol.body.InSyncStateData; @@ -383,7 +384,9 @@ MessageExt queryMessage(String clusterName, HARuntimeInfo getBrokerHAStatus(String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; - InSyncStateData getInSyncStateData(String controllerAddress, List brokers) throws RemotingException, InterruptedException, MQBrokerException ; + InSyncStateData getInSyncStateData(String controllerAddress, List brokers) throws RemotingException, InterruptedException, MQBrokerException; + + EpochEntryCache getBrokerEpochCache(String brokerAddr) throws RemotingException, InterruptedException, MQBrokerException; /** * Reset master flush offset in slave diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java index c2e4cb2e405..b7faee32da7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java @@ -135,6 +135,15 @@ public static Set fetchMasterAndSlaveAddrByClusterName(final MQAdminExt return brokerAddressSet; } + public static Set fetchMasterAndSlaveAddrByBrokerName(final MQAdminExt adminExt, final String brokerName) + throws InterruptedException, RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, MQBrokerException { + ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); + final BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName); + return new HashSet<>(brokerData.getBrokerAddrs().values()); + } + + public static Set fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName) throws Exception { ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 5939abe71e0..2494f210f2b 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -60,7 +60,7 @@ import org.apache.rocketmq.tools.command.export.ExportMetadataCommand; import org.apache.rocketmq.tools.command.export.ExportMetricsCommand; import org.apache.rocketmq.tools.command.ha.HAStatusSubCommand; -import org.apache.rocketmq.tools.command.ha.SyncStateDataCommand; +import org.apache.rocketmq.tools.command.ha.SyncStateSetCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand; @@ -247,7 +247,7 @@ public static void initCommand() { initCommand(new HAStatusSubCommand()); - initCommand(new SyncStateDataCommand()); + initCommand(new SyncStateSetCommand()); } private static void initLogback() throws JoranException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java new file mode 100644 index 00000000000..c10c210ad29 --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.broker; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.EpochEntry; +import org.apache.rocketmq.common.protocol.body.EpochEntryCache; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class GetBrokerEpochCacheCommand implements SubCommand { + @Override public String commandName() { + return "getBrokerEpochCache"; + } + + @Override public String commandDesc() { + return "Fetch broker epoch entries cache"; + } + + @Override public Options buildCommandlineOptions(Options options) { + Option opt = new Option("c", "clusterName", true, "which cluster"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("b", "brokerName", true, "which broker to fetch"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("f", "follow", true, "the interval(second) of get info"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override public void execute(CommandLine commandLine, Options options, + RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + try { + if (commandLine.hasOption('f')) { + String flushSecondStr = commandLine.getOptionValue('f'); + int flushSecond = 3; + if (flushSecondStr != null && !flushSecondStr.trim().equals("")) { + flushSecond = Integer.parseInt(flushSecondStr); + } + + defaultMQAdminExt.start(); + + while (true) { + this.innerExec(commandLine, options, defaultMQAdminExt); + Thread.sleep(flushSecond * 1000); + } + } else { + defaultMQAdminExt.start(); + + this.innerExec(commandLine, options, defaultMQAdminExt); + } + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } + + private void innerExec(CommandLine commandLine, Options options, + DefaultMQAdminExt defaultMQAdminExt) throws Exception { + if (commandLine.hasOption('b')) { + String brokerName = commandLine.getOptionValue('b').trim(); + final Set brokers = CommandUtil.fetchMasterAndSlaveAddrByBrokerName(defaultMQAdminExt, brokerName); + printData(brokers, defaultMQAdminExt); + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + Set brokers = CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); + printData(brokers, defaultMQAdminExt); + } else { + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } + } + + private void printData(Set brokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception { + for (String brokerAddr : brokers) { + final EpochEntryCache epochCache = defaultMQAdminExt.getBrokerEpochCache(brokerAddr); + System.out.printf("\n#clusterName\t%s\n#brokerName\t%s\n#brokerAddr\t%s\n#brokerId\t%d", + epochCache.getClusterName(), epochCache.getBrokerName(), brokerAddr, epochCache.getBrokerId()); + for (EpochEntry epochEntry : epochCache.getEpochList()) { + System.out.printf("\n#Epoch: %s", epochEntry.toString()); + } + } + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateSetCommand.java similarity index 97% rename from tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java rename to tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateSetCommand.java index cf07933d08c..52b48f8713d 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateDataCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/ha/SyncStateSetCommand.java @@ -31,7 +31,7 @@ import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; -public class SyncStateDataCommand implements SubCommand { +public class SyncStateSetCommand implements SubCommand { @Override public String commandName() { return "getSyncStateSet"; @@ -39,13 +39,13 @@ public String commandName() { @Override public String commandDesc() { - return "Fetch syncStateData for target brokers"; + return "Fetch syncStateSet for target brokers"; } @Override public Options buildCommandlineOptions(Options options) { Option opt = new Option("a", "controllerAddress", true, "the address of controller"); - opt.setRequired(false); + opt.setRequired(true); options.addOption(opt); opt = new Option("c", "clusterName", true, "which cluster"); From 2251c6454f1967264efe8777d67b918b69dd3c8f Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 21:04:02 +0800 Subject: [PATCH 7/9] init command --- .../java/org/apache/rocketmq/tools/command/MQAdminStartup.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 2494f210f2b..6be8b0efcb1 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -40,6 +40,7 @@ import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand; import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand; import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand; +import org.apache.rocketmq.tools.command.broker.GetBrokerEpochCacheCommand; import org.apache.rocketmq.tools.command.broker.ResetMasterFlushOffsetSubCommand; import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand; import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand; @@ -248,6 +249,7 @@ public static void initCommand() { initCommand(new HAStatusSubCommand()); initCommand(new SyncStateSetCommand()); + initCommand(new GetBrokerEpochCacheCommand()); } private static void initLogback() throws JoranException { From ba88ef48ef00872aaddb2ca98e330f35129b971a Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 21:26:20 +0800 Subject: [PATCH 8/9] set lastEpochEndOffset --- .../rocketmq/broker/processor/AdminBrokerProcessor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4b015ffe46f..a0090e474e4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -49,6 +49,7 @@ import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.EpochEntry; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -2364,7 +2365,12 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC final BrokerConfig brokerConfig = this.brokerController.getBrokerConfig(); final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries()); - + final List epochList = entryCache.getEpochList(); + if (!epochList.isEmpty()) { + final EpochEntry lastEntry = epochList.get(epochList.size() - 1); + final long maxPhyOffset = this.brokerController.getMessageStore().getMaxPhyOffset(); + lastEntry.setEndOffset(maxPhyOffset); + } final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setBody(entryCache.encode()); response.setCode(ResponseCode.SUCCESS); From 71107fda53adcccbb146b062a12c6becb618c51f Mon Sep 17 00:00:00 2001 From: hzh0425 <642256541@qq.com> Date: Fri, 27 May 2022 21:43:02 +0800 Subject: [PATCH 9/9] take maxPhyOffset in EpochCache --- .../broker/processor/AdminBrokerProcessor.java | 10 ++-------- .../common/protocol/body/EpochEntryCache.java | 16 ++++++++++++++-- .../broker/GetBrokerEpochCacheCommand.java | 8 +++++++- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a0090e474e4..3aaf4b9350c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -49,7 +49,6 @@ import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil; import org.apache.rocketmq.common.AclConfig; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.EpochEntry; import org.apache.rocketmq.common.LockCallback; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; @@ -2364,13 +2363,8 @@ private RemotingCommand getBrokerEpochCache(ChannelHandlerContext ctx, RemotingC assert replicasManager != null; final BrokerConfig brokerConfig = this.brokerController.getBrokerConfig(); final EpochEntryCache entryCache = new EpochEntryCache(brokerConfig.getBrokerClusterName(), - brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries()); - final List epochList = entryCache.getEpochList(); - if (!epochList.isEmpty()) { - final EpochEntry lastEntry = epochList.get(epochList.size() - 1); - final long maxPhyOffset = this.brokerController.getMessageStore().getMaxPhyOffset(); - lastEntry.setEndOffset(maxPhyOffset); - } + brokerConfig.getBrokerName(), brokerConfig.getBrokerId(), replicasManager.getEpochEntries(), this.brokerController.getMessageStore().getMaxPhyOffset()); + final RemotingCommand response = RemotingCommand.createResponseCommand(null); response.setBody(entryCache.encode()); response.setCode(ResponseCode.SUCCESS); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java index 46c5c94bae7..cdfaa908da8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/EpochEntryCache.java @@ -25,12 +25,14 @@ public class EpochEntryCache extends RemotingSerializable { private String brokerName; private long brokerId; private List epochList; + private long maxOffset; - public EpochEntryCache(String clusterName, String brokerName, long brokerId, List epochList) { + public EpochEntryCache(String clusterName, String brokerName, long brokerId, List epochList, long maxOffset) { this.clusterName = clusterName; this.brokerName = brokerName; this.brokerId = brokerId; this.epochList = epochList; + this.maxOffset = maxOffset; } public String getClusterName() { @@ -58,18 +60,28 @@ public void setBrokerId(long brokerId) { } public List getEpochList() { - return epochList; + return this.epochList; } public void setEpochList(List epochList) { this.epochList = epochList; } + public long getMaxOffset() { + return maxOffset; + } + + public void setMaxOffset(long maxOffset) { + this.maxOffset = maxOffset; + } + @Override public String toString() { return "EpochEntryCache{" + "clusterName='" + clusterName + '\'' + ", brokerName='" + brokerName + '\'' + + ", brokerId=" + brokerId + ", epochList=" + epochList + + ", maxOffset=" + maxOffset + '}'; } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java index c10c210ad29..28f181328de 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerEpochCacheCommand.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.tools.command.broker; +import java.util.List; import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; @@ -105,7 +106,12 @@ private void printData(Set brokers, DefaultMQAdminExt defaultMQAdminExt) final EpochEntryCache epochCache = defaultMQAdminExt.getBrokerEpochCache(brokerAddr); System.out.printf("\n#clusterName\t%s\n#brokerName\t%s\n#brokerAddr\t%s\n#brokerId\t%d", epochCache.getClusterName(), epochCache.getBrokerName(), brokerAddr, epochCache.getBrokerId()); - for (EpochEntry epochEntry : epochCache.getEpochList()) { + final List epochList = epochCache.getEpochList(); + for (int i = 0; i < epochList.size(); i++) { + final EpochEntry epochEntry = epochList.get(i); + if (i == epochList.size() - 1) { + epochEntry.setEndOffset(epochCache.getMaxOffset()); + } System.out.printf("\n#Epoch: %s", epochEntry.toString()); } }