本文共 4371 字,大约阅读时间需要 14 分钟。
Master选举是一个在分布式系统中非常常见的应用场景。在分布式系统中Master往往用来协调集群中其他系统单元,具有对分布式系统状态变更的决定权。例如,在一些读写分离的应用场景中,客户端的写请求往往是由Master来处理的;而在另一些场景中,Master则常常负责处理一些复杂的逻辑,并将处理结果同步给集群中的其他单元。
在zookeeper中实现Master选举可以利用强一致性,它能够很好的保证在分布式高并发下节点创建的全局唯一性。
现假定有3个client同时向zookeeper server发出创建临时节点/lock的请求,其中能成功注册的就是master节点。由于是临时节点,当master节点崩溃或主动放弃master时/lock节点便会销毁。其它的client又能重新选举master。
Curator把原生API的节点创建、事件监听和自动选举进行整合封装,提供了一套简单易用的解决方案,同时也对master提供了直接支持。
Curator提供了两种选举方案:Leader Latch和Leader Election。下面分别介绍这两种选举方案。
maven依赖
org.apache.curator curator-recipes 4.0.0
Leader Latch
随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。其中spark使用的就是这种方法。 实现代码:public class Leader1 { private static final String PATH = "/leader"; private static final int COUNT = 5; public static void main(String[] args) throws Exception { Listclients = new ArrayList<>(); List leaders = new ArrayList<>(); for (int i = 1; i <= COUNT; i++) { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.161:2181"). sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(3,5)). namespace("test").build(); client.start(); clients.add(client); final LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i); example.addListener(new LeaderLatchListener() { @Override public void isLeader() { System.out.println(example.getId() + ":I am leader. I will do something!"); } @Override public void notLeader() { System.out.println(example.getId() + ":I am not leader. I will do nothing!"); } }); example.start(); leaders.add(example); } Thread.sleep(Integer.MAX_VALUE); for(LeaderLatch leader:leaders){ CloseableUtils.closeQuietly(leader); } for(CuratorFramework client:clients){ CloseableUtils.closeQuietly(client); } }}
运行结果:
Client #2:I am leader. I will do something!
本例中启动了5个client,其中会有一个会被随机选为leader。通过注册监听的方式来判断自己是否成为leader。调用close()方法释放当前领导权。
Leader Election
通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。 实现代码:public class Leader2 { private static final String PATH = "/leader"; private static final int COUNT = 5; public static void main(String[] args) throws Exception { Listclients = new ArrayList<>(); List selectors = new ArrayList<>(); for (int i = 1; i <= COUNT; i++) { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.1.161:2181"). sessionTimeoutMs(10000).retryPolicy(new RetryNTimes(3,5)). namespace("test").build(); client.start(); clients.add(client); final String name = "client#" + i; LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(name + ":I am leader."); Thread.sleep(2000); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); leaderSelector.autoRequeue(); leaderSelector.start(); selectors.add(leaderSelector); } Thread.sleep(Integer.MAX_VALUE); for(CuratorFramework client : clients){ CloseableUtils.closeQuietly(client); } for(LeaderSelector selector : selectors){ CloseableUtils.closeQuietly(selector); } }}
运行结果:
client#4:I am leader. client#5:I am leader. client#1:I am leader. client#3:I am leader. client#2:I am leader. client#4:I am leader. client#5:I am leader. client#1:I am leader. client#3:I am leader.
本示例创建了5个LeaderSelector并对起添加监听,当被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中autoRequeue()方法的调用确保此实例在释放领导权后还可能获得领导权。
https://cloud.tencent.com/developer/article/1015380