Skip to content

Commit

Permalink
Merge pull request #21 from maobaolong/supportOzone
Browse files Browse the repository at this point in the history
Support ozone om and scm
  • Loading branch information
maobaolong authored Sep 27, 2021
2 parents a86906b + 5ad99ab commit e3ac385
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 42 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
target
conf/*.properties
conf/*.sh
conf/*.sh
ratis-shell
*.tar.gz
10 changes: 3 additions & 7 deletions conf/ratis-shell-site.properties.template
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
ratis.shell.alluxio-master.groupid=02511d47-d67c-49a3-9011-abb3109a44c1
ratis.shell.alluxio-master.peers=localhost:19200,localhost:19201,localhost:19202
ratis.shell.alluxio-jobmaster.groupid=02511d47-d67c-49a3-9011-abb3109a44c1
ratis.shell.alluxio-jobmaster.peers=localhost:19200,localhost:19201,localhost:19202
ratis.shell.ozone-om.groupid=02511d47-d67c-49a3-9011-abb3109a44c1
ratis.shell.ozone-om.peers=localhost:19200,localhost:19201,localhost:19202
ratis.shell.ozone-scm.groupid=02511d47-d67c-49a3-9011-abb3109a44c1
ratis.shell.ozone-scm.peers=localhost:19200,localhost:19201,localhost:19202
ratis.shell.alluxio-jobmaster.peers=localhost:20003,localhost:20013,localhost:20023
ratis.shell.ozone-om.peers=localhost:9872,localhost:19872,localhost:29872
ratis.shell.ozone-scm.peers=localhost:9894,localhost:19894,localhost:29894
2 changes: 1 addition & 1 deletion src/main/java/opendataio/ratisshell/cli/RaftUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static RaftClient createClient(
RaftClientConfigKeys.Rpc.setRequestTimeout(properties,
TimeDuration.valueOf(15, TimeUnit.SECONDS));
ExponentialBackoffRetry retryPolicy = ExponentialBackoffRetry.newBuilder()
.setBaseSleepTime(TimeDuration.valueOf(100, TimeUnit.MILLISECONDS))
.setBaseSleepTime(TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS))
.setMaxAttempts(10)
.setMaxSleepTime(
TimeDuration.valueOf(100_000, TimeUnit.MILLISECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

Expand All @@ -33,7 +34,6 @@ public abstract class AbstractRatisCommand implements Command {
UUID.fromString("1-1-1-1-1"));
protected final PrintStream mPrintStream;
protected RaftGroup mRaftGroup;
protected List<RaftPeer> peers;

protected AbstractRatisCommand(Context context) {
mPrintStream = context.getPrintStream();
Expand Down Expand Up @@ -61,44 +61,56 @@ public int run(CommandLine cl) throws IOException {
addresses.add(addr);
}

RaftGroupId raftGroupId = DEFAULT_RAFT_GROUP_ID;
RaftGroupId raftGroupIdFromConfig = DEFAULT_RAFT_GROUP_ID;
if (cl.hasOption(GROUPID_OPTION_NAME)) {
raftGroupId = RaftGroupId.valueOf(
raftGroupIdFromConfig = RaftGroupId.valueOf(
UUID.fromString(cl.getOptionValue(GROUPID_OPTION_NAME)));
} else {
if (cl.hasOption(SERVICE_ID_OPTION_NAME)) {
PropertyKey groupIdKey =
PropertyKey.Template.RATIS_SHELL_GROUP_ID.format(
cl.getOptionValue(SERVICE_ID_OPTION_NAME));
try {
raftGroupId =
raftGroupIdFromConfig =
RaftGroupId.valueOf(UUID.fromString(conf.get(groupIdKey)));
} catch (IllegalArgumentException e) {
// do nothing
}
}
}

peers = addresses.stream()
List<RaftPeer> peers = addresses.stream()
.map(addr -> RaftPeer.newBuilder()
.setId(RaftUtils.getPeerId(addr))
.setAddress(addr)
.build()
).collect(Collectors.toList());
mRaftGroup = RaftGroup.valueOf(raftGroupId, peers);
if (raftGroupId == DEFAULT_RAFT_GROUP_ID) {
try (RaftClient client = RaftUtils.createClient(mRaftGroup)) {
List<RaftGroupId> groupIds =
client.getGroupManagementApi((peers.get(0).getId())).list()
.getGroupIds();
if (groupIds.size() == 1) {
mRaftGroup = RaftGroup.valueOf(groupIds.get(0), peers);
} else {
mRaftGroup = RaftGroup.valueOf(raftGroupIdFromConfig, peers);
try (RaftClient client = RaftUtils.createClient(mRaftGroup)) {
RaftGroupId remoteGroupId;
// TODO(maobaolong): failover to other peer if communicate failure
List<RaftGroupId> groupIds =
client.getGroupManagementApi((peers.get(0).getId())).list()
.getGroupIds();
if (groupIds.size() == 1) {
remoteGroupId = groupIds.get(0);
} else {
final UUID raftGroupUuid = raftGroupIdFromConfig.getUuid();
Optional<RaftGroupId> raftGroupId =
groupIds.stream().filter(r -> raftGroupUuid.equals(r.getUuid()))
.findFirst();
if (!raftGroupId.isPresent()) {
mPrintStream.println(
"there are more than one group, you should specific one." + groupIds);
"there are more than one group, you should specific one."
+ groupIds);
return -1;
} else {
remoteGroupId = raftGroupId.get();
}
}
// TODO(maobaolong): failover to other peer if communicate failure
mRaftGroup = client.getGroupManagementApi((peers.get(0).getId()))
.info(remoteGroupId).getGroup();
}
return 0;
}
Expand All @@ -119,7 +131,7 @@ public void validateArgs(CommandLine cl) throws IllegalArgumentException {
* @param roleInfo the role info
* @return the leader id
*/
protected String getLeaderId(RaftProtos.RoleInfoProto roleInfo) {
protected String getLeaderAddress(RaftProtos.RoleInfoProto roleInfo) {
if (roleInfo == null) {
return null;
}
Expand All @@ -130,7 +142,7 @@ protected String getLeaderId(RaftProtos.RoleInfoProto roleInfo) {
if (followerInfo == null) {
return null;
}
return followerInfo.getLeaderInfo().getId().getId().toStringUtf8();
return followerInfo.getLeaderInfo().getId().getAddress();
}

protected void processReply(RaftClientReply reply, String msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.ratis.protocol.RaftPeerId;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -38,23 +37,22 @@ public int run(CommandLine cl) throws IOException {
super.run(cl);

String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
InetSocketAddress serverAddress = RaftUtils.stringToAddress(strAddr);
// if you cannot find the address in the quorum, throw exception.
if (peers.stream().map(RaftPeer::getAddress).noneMatch(addr -> addr.equals(strAddr))) {
throw new IOException(String.format("<%s> is not part of the quorum <%s>.",
strAddr, peers.stream().map(RaftPeer::getAddress).collect(Collectors.toList())));
}

RaftPeerId newLeaderPeerId =
RaftUtils.getPeerId(serverAddress.getHostString(), serverAddress.getPort());
RaftPeerId newLeaderId = null;
// update priorities to enable transfer
List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
for (RaftPeer peer : peers) {
for (RaftPeer peer : mRaftGroup.getPeers()) {
peersWithNewPriorities.add(
RaftPeer.newBuilder(peer)
.setPriority(peer.getId().equals(newLeaderPeerId) ? 2 : 1)
.setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
.build()
);
if (peer.getAddress().equals(strAddr)) {
newLeaderId = peer.getId();
}
}
if (newLeaderId == null) {
return -2;
}
try (RaftClient client = RaftUtils.createClient(mRaftGroup)) {
String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
Expand All @@ -67,12 +65,11 @@ public int run(CommandLine cl) throws IOException {
"failed to set priorities before initiating election");
// transfer leadership
mPrintStream.printf(
"Transferring leadership to server with address <%s> and with RaftPeerId <%s>%n",
serverAddress, newLeaderPeerId);
"Transferring leadership to server with address <%s>", strAddr);
try {
Thread.sleep(3_000);
RaftClientReply transferLeadershipReply =
client.admin().transferLeadership(newLeaderPeerId, 60_000);
client.admin().transferLeadership(newLeaderId, 60_000);
processReply(transferLeadershipReply, "election failed");
} catch (Throwable t) {
mPrintStream.printf("caught an error when executing transfer: %s%n", t.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ public int run(CommandLine cl) throws IOException {
mPrintStream.println("group id: " + mRaftGroup.getGroupId().getUuid());
try (RaftClient client = RaftUtils.createClient(mRaftGroup)) {
GroupInfoReply reply =
client.getGroupManagementApi(peers.get(0).getId()).info(mRaftGroup.getGroupId());
client.getGroupManagementApi(
mRaftGroup.getPeers().stream()
.findFirst()
.get()
.getId())
.info(mRaftGroup.getGroupId());
processReply(reply,
"failed to get info");
mPrintStream.println("leader id: " + getLeaderId(reply.getRoleInfoProto()));
mPrintStream.println("leader address: " + getLeaderAddress(reply.getRoleInfoProto()));
mPrintStream.println(reply.getCommitInfos());
}
return 0;
Expand Down

0 comments on commit e3ac385

Please sign in to comment.