深入理解与应用Zookeeper(六)zookeeper实战

深入理解与应用Zookeeper(六)zookeeper实战

1. 服务注册与发现

image.png

image.png

image.png

代码实现

服务注册方:product-service

服务注册方 (使用原生zk客户端)

实现开机将自己的信息(ip和端口)注册到zk中

//controller获取商品的接口
@RestController
@RequestMapping("/product")
public class ProductController {

    @RequestMapping("/getProduct/{id}")
    public Object getProduct(HttpServletRequest request, @PathVariable("id") String id) {
        return new Product(id,"name:"+request.getLocalPort());
    }
}

监听器

/**
 * @author amos
 * 与你同在
 * @create 2019/9/14 - 14:36
 *
 * 初始化监听器 当项目启动的时候就将自己的信息注册到zk中
 */

public class InitListener implements ServletContextListener {

    //获取服务的端口
    @Value("${server.port}")
    private int port;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext()).getAutowireCapableBeanFactory().autowireBean(this);
        try {
            //获取host
            String localHost = InetAddress.getLocalHost().getHostAddress();
            ServiceRegister.register(localHost, port);
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

商品实体类

public class Product {
    private  String id;

    private String name;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Product(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public Product() {
    }
}

实现向zk注册服务的类

/**
 * @author amos
 * 与你同在
 * @create 2019/9/14 - 14:41
 *
 * 服务注册:实现将服务的host和端口注册到zk的/services/products/child节点下
 */

public class ServiceRegister {

    private static final String BASE_SERVICE = "/services";

    private static final String SERVICE_NAME = "/products";

    public static void register(String address, int port){
        try {
            //获取与zk的连接
            ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 5000, (watchEvent) -> {});
            //判断节点是否已经存在
            Stat exists = zooKeeper.exists(BASE_SERVICE + SERVICE_NAME, false);
            //如果不存在则创建一个空的节点
            if (exists == null){
                zooKeeper.create(BASE_SERVICE + SERVICE_NAME, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            //向ZK注册的服务的路径 比如:localhost:8082
            String server_path = address + ":" + port;
            //创建临时顺序节点 临时节点保证了如果服务挂掉 就可以直接将该节点删除 顺序是为了便于区分服务 如订单服务1 订单服务2 订单服务3
            //将该服务的信息(端口和ip)存放到/services/products/child的节点下
            zooKeeper.create(BASE_SERVICE + SERVICE_NAME + "/child", server_path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception e) {
            e.printStackTrace();
        }


    }

}

服务调用方order-service

(使用原生zk客户端)实现开机从zk中拉取服务列表

@RequestMapping("/order")
@RestController
public class OrderController {

    @Resource
    private RestTemplate restTemplate;

    //随机负载均衡算法
    private LoadBalance loadBalance = new RandomLoadBalance();

    @RequestMapping("/getOrder/{id}")
    public Object getOrder(@PathVariable("id") String id ) {
        Product product = restTemplate.getForObject("http://"+loadBalance.choseServiceHost()+"/product/getProduct/1", Product.class);
        return new Order(id,"orderName",product);
    }
}

listener

/**
 * @author amos
 * 与你同在
 * @create 2019/9/14 - 15:18
 *
 * 服务发现功能:
 * 服务启动监听器 当服务启动的时候 连接zk从zk中拉取服务列表并且监听节点的变化 及时更新服务列表
 */

public class InitListener implements ServletContextListener {

    private static final String BASE_SERVICE = "/services";
    private static final String SERVICE_NAME = "/products";
    private ZooKeeper zooKeeper;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        try {
            //获取连接
            zooKeeper = new ZooKeeper("localhost:2181", 5000, (watchEvent) -> {
                //如果/services/products节点下的数据发生变化 就更新服务列表
                if (watchEvent.getType() == Watcher.Event.EventType.NodeDataChanged && watchEvent.getPath().equals(BASE_SERVICE + SERVICE_NAME)){
                    updateServiceList();
                }
            });
            //连接成功后刷新一个服务列表
            updateServiceList();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void updateServiceList(){
        try {
            //先获取所有的子节点(即服务列表)
            List<String> children = zooKeeper.getChildren(BASE_SERVICE + SERVICE_NAME, true);
            List<String> newServerList = new ArrayList<>();
            for (String subNode : children){
                //获取服务列表的内容
                byte[] data = zooKeeper.getData(BASE_SERVICE + SERVICE_NAME + "/" + subNode, false, null);
                //将data转化为String
                String host = new String(data, "UTF-8");
                System.out.println(host);
                //添加到list中
                newServerList.add(host);
            }
            //将新的服务列表更新给用于负载均衡的服务队列
            LoadBalance.SERVICE_LIST = newServerList;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

实体类:

//订单类
@Data
public class Order {
    private String id;
    private  String name;
    private Product product;
    public Order(String id, String name, Product product) {
        this.id = id;
        this.name = name;
        this.product = product;
    }

    public Order() {
    }
}


//商品类
@Data
public class Product {
    private  String id;

    private String name;

    public Product(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public Product() {
    }
}

工具类:负载均衡

//抽象类LoadBalance
public abstract class LoadBalance {

    public volatile static List<String> SERVICE_LIST;

    public abstract String choseServiceHost();

}
public class RandomLoadBalance extends LoadBalance {
    @Override
    public String choseServiceHost() {
        String result = "";
        if(!CollectionUtils.isEmpty(SERVICE_LIST)) {
            int index = new Random().nextInt(SERVICE_LIST.size());
            result = SERVICE_LIST.get(index);
        }
        return result ;
    }
}

2. zk实现集群选举

image.png

image.png

image.png

controller

定义获取服务信息的接口

@RestController
public class IndexController {
    //获取服务信息
    @GetMapping("/getServerInfo")
    public String getServerInfo(){
        return ElectionMaster.isSurvival ? "当前节点为主节点" : "当前节点为从节点";
    }

}

监听类

public class ElectionMaster {
    //判断主节点是否存活
    public static boolean isSurvival;
}
public class InitListener implements ServletContextListener {


    //创建zk连接
    ZkClient zkClient = new ZkClient("127.0.0.1:2181");
    private String path = "/election";

    //获取服务的端口
    @Value("${server.port}")
    private int port;

    //最开始项目启动时初始化操作
    private void init(){
        System.out.println("项目启动完成...");
        //创建节点 并且将自己选举为leader
        createEphemeral();
        //创建事件监听
        zkClient.subscribeDataChanges(path, new IZkDataListener() {

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }

            //节点被删除
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //主节点已经挂了 重新选举
                System.out.println("主节点已经挂了,重新开始选举");
                //Timer定时器使用了线程实现的定时功能
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        createEphemeral();
                    }
                }, 5000);
            }
        });
    }

    private void createEphemeral(){
        try {
            //创建临时节点
            zkClient.createEphemeral(path, port);
            //isSurvival是否存活 选举自己为master
            ElectionMaster.isSurvival = true;
            System.out.println("serverPort:" + port + ",选举成功");
        }catch (Exception e){
            ElectionMaster.isSurvival = false;
        }

    }
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        init();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        if (zkClient != null){
	  //关闭连接 避免延迟
            zkClient.close();
        }
    }
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×