深入理解与应用Zookeeper(七)实现简单的分布式锁

深入理解与应用Zookeeper(七)实现简单的分布式锁

模拟双十一高并发环境下产生全局唯一的订单号

[1]zk实现分布式锁1.0版本 基于同名节点的分布式锁

image.png

image.png

image.png

image.png

谁先获得锁 谁就能创建zk下的节点/lock 当其他进程试图获取锁时 如果/lock节点已经存在 则抛出异常 此时其他进程就需要等待 当某个进程完成任务时/lock节点自动删除 剩下的进程继续抢夺锁。

自定义接口 Lock

其中包含两个方法 获取所与释放锁

public interface Lock {

    //获取锁资源
    void getLock();

    //释放锁
    void unlock();

}
public abstract class AbstractLock implements Lock{

    public void getLock(){
        //尝试获取锁资源
        if (tryLock()) {
            System.out.println("###获取锁资源###");
        }else {
            //等待 如果节点/lock一直存在 该进程就一直阻塞在此处
            waitLock();
            //重新获取锁资源 递归调用直到获取锁才停止
            getLock();
        }

    }
    //获取锁资源
    public abstract boolean tryLock();
    //等待锁资源
    public abstract void   waitLock();
}

自定义zk分布式锁的抽象类

//将重复代码写入到子类中
public abstract class ZookeeperAbstractLock extends AbstractLock{

    //zk连接地址
    private static final String CONNECT_STRING = "127.0.0.1:2181";
    //创建zk连接
    protected ZkClient zkClient = new ZkClient(CONNECT_STRING);

    protected static final String PATH = "/lock";

    protected static final String PATH2 = "/lock2";


}

zk分布式锁的实现类

public class ZookeeperDistributeLock extends ZookeeperAbstractLock {

    private CountDownLatch countDownLatch = null;


    @Override
    //尝试获得锁  如果锁已经被占据(即该节点已经被前一个人创建则会报错 这时候就会返回false)
    public boolean tryLock() {
        try {
            //创建临时节点
            zkClient.createEphemeral(PATH);
            return true;
        }catch (Exception e){
            return false;
        }
    }

    @Override
    public void waitLock() {
        //创建监听zk数据改变的监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            //当节点被删除时调用
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //如果节点被删除 就唤醒等待的进程 发送给获取锁的进程信号:你可以获取锁了
                if (countDownLatch != null){
                    countDownLatch.countDown();
                }
            }
        };
        //注册事件 监听节点
        zkClient.subscribeDataChanges(PATH, iZkDataListener);
        //如果节点存在
        if (zkClient.exists(PATH)){
            //令进程阻塞
            countDownLatch = new CountDownLatch(1);
            try {
                //令该获取锁的进程阻塞 等待锁 一直等到事件通知
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //该进程任务执行完毕 删除监听
        zkClient.unsubscribeDataChanges(PATH, iZkDataListener);
    }

    @Override
    public void unlock() {
        //释放锁
        if (zkClient != null){
            zkClient.delete(PATH);
            zkClient.close();
            System.out.println("释放锁资源...");
        }
    }
}

产生订单号的类

public class OrderNumGenerator {

    //全局订单id
    public static int count = 0;
    public static Object lock = new Object();

    public String getNumber(){
        synchronized (lock){
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return simpleDateFormat.format(new Date()) + "-" + ++count;
        }
    }
}

订单服务的启动类

public class OrderService implements Runnable{

    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();

    private Lock lock = new ZookeeperDistributeLock();

    @Override
    public void run() {
        getNumber();
    }
    public void getNumber(){
        try {
            lock.getLock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
             lock.unlock();
        }
    }

    public static void main(String[] args) {
        System.out.println("###生成唯一订单号###");
        for (int i = 0; i < 50; i++){
            new Thread(new OrderService()).start();
        }
    }
}

这样实现的锁使系统的工作效率比较低 所以需要改进

[2]zk实现分布式锁2.0版本 高性能分布式锁

image.png

image.png

这里每个进程都能创建一个顺序临时的节点,每个进程只需要关心它前一个进程是否完成工作(即观察它前一个节点是否存在)如果已经完成工作(完成工作后该进程对应的顺序临时节点会被删除)则判断自己是否为第一个节点 如果自己是第一个节点就去获取锁 如果不是第一个节点 则继续等待

public class ZookeeperDistributeLock2 extends ZookeeperAbstractLock {

    private CountDownLatch countDownLatch = null;
    private String beforePath;//当前请求节点的前一个节点
    private String currentPath;//当前请求的节点

    public ZookeeperDistributeLock2(){
        if (! zkClient.exists(PATH2)){
            zkClient.createPersistent(PATH2);
        }
    }
    @Override
    //尝试获得锁  如果锁已经被占据(即该节点已经被前一个人创建则会报错 这时候就会返回false)
    public boolean tryLock() {
        //如果currentPath为空则为第一次尝试加锁
        if (currentPath == null || currentPath.length() <= 0){
            //创建一个临时顺序节点 值为lock
            currentPath = zkClient.createEphemeralSequential(PATH2 + "/", "lock");
        }
        //获取所有临时顺序节点 并且将它们按递增排序 顺序临时节点长度为10 0000000400
        List<String> children = zkClient.getChildren(PATH2);
        Collections.sort(children);
        //如果当前节点是第一个 则可以获取锁 返回true
        if (currentPath.equals(PATH2 + "/" + children.get(0))){
            return true;
        }else {
            //如果当前节点不是第一个节点 则获取它前一个节点并且复制给beforePath
            //获取currentPath的下标
            int index = Collections.binarySearch(children, currentPath.substring(7));
            //赋值给beforePath
            beforePath = PATH2 + "/" + children.get(index - 1);
        }
        return false;
    }

    @Override
    public void waitLock() {
        //创建监听zk数据改变的监听器
        IZkDataListener listener = new IZkDataListener() {
            //当节点被删除时 调用
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //如果节点被删除 就唤醒等待的进程 发送给获取锁的进程信号:你可以获取锁了
                if (countDownLatch != null){
                    countDownLatch.countDown();
                }
            }
        };
        //注册事件 这里只需要监听前一个节点就可以了 当前一个节点删除的时候 该进程就停止等待
        zkClient.subscribeDataChanges(beforePath, listener);
        if (zkClient.exists(beforePath)){
            //节点存在 令该进程阻塞
            countDownLatch = new CountDownLatch(1);
            try {
                //令该获取锁的进程阻塞 等待锁 一直等到事件通知
                countDownLatch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //该进程任务执行完毕 删除监听
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    public void unlock() {
        if (zkClient != null){
            //当前节点一删除 就释放锁
            zkClient.delete(currentPath);
            zkClient.close();
            System.out.println("释放锁资源...");
        }
    }
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×