假装高手系列-撸一个参数同步框架
结合学习Zookeeper以及实际应用,从零开始开发一个基于zk的内存参数同步框架,包括参数的集中管理、参数推送、配置中心以及持久化等功能。市面上开源框架很多,这个框架只是实现了小部分功能,感兴趣的可以作为学习入口。
  • 从零开发参数同步框架(一)—— 项目起源   

    前言从事Java开发已经有八个多年头了,从开始参加工作,就一直自诩热爱技术、喜欢钻研、喜欢写代码。三年前,我开始意识到,自己也并没有哪一个方面的技术达到足够的深度,甚至技术广度也远远不足。所以,从那时开始,有意识的折腾一些整体性的应用,俗称造轮子。三年前,我刚好从工作了四年多的爱裳邦购离职加入了现在这家公司——猎上网。新公司接触的项目确实比较多,加上一年后升职为后端技术经理,接触并负责公司所有的后端项目。当时,公司处于一个高速发展的阶段,所以很多基础建设不是很全面,整个技术部门都是以快速支撑为目的。当时公司很多项目已经开始服务化部署,因此项目划分很细,不同的项目大概有20来个,线上的容灾部署约有60台虚拟机。加上,公司业务发展迅速,各种活动、产品迭代频繁,而当时项目参数配置还是传统的resource.properties方式。所以矛盾很明显,这种参数配置方式需要重启项目才能使更改之后的值生效,而且几十台项目的参数无法集中管理。于是,能够集中化管理所有项目参数并实时推送的一个框架/平台对我们来说就显得非常重要和必须了。原谅我当时孤陋寡闻,并没有接触过阿波罗这个框架,加上一些的小自私(锻炼下技术),我决定自己利用闲暇时间,研发一个适用于公司的参数同步框架。--- 设计思路通过一个中心化数据管理平台,集中管理所有项目的配置参数,尤其是需要实时动态管理的那些。定义:参数管理平台称为服务端,所有接入平台的项目称为客户端。它有以下功能:1. 所有参数可以按照项目、描述等信息进行管理和查询。2. 所有参数支持动态修改,并由平台实时推送到那些使用它的服务器上。3. 所有参数都保留完整的修改历史,方便回退。4. 所有参数都以字符串形式存储,通过工具类为各个项目转换实际类型。5. 所有项目新增的参数,第一次都会同步到这个系统中。具体逻辑是,所有项目会从参数管理平台同步对应的参数,如果没有同步到自己需要的参数,则会将这些参数同步给平台进行持久化。需要解决的技术难点就是:服务端要能实时通知到客户端,但是不能耦合客户端的定位信息,包括ip、项目名、端口等。不过,转眼我就想到了Zookeeper,它不就是一个天然的服务端/客户端的连接框架么? ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。那么工作流程就是:1. 客户端在Zookeeper上注册自己的信息,比如:ip、端口、项目、关注的参数等;2. 当服务端参数发生变化,服务端可以从Zookeeper获取监听这些参数的客户端,然后通过注册过的信息将参数推送给客户端;3. 客户端接到通知后,将推送过来的参数同步到自己的内存。问题是,所有客户端也需要提供一个统一的接收参数的接口。这样一来,所有客户端需要实现自己的代码。这种方式也不是不行。不过,我采取了另一种方式:1. 客户端在Zookeeper上注册自己需要监听的参数;2. 服务端在参数发生变化的时候,通知那些对应的服务器,通过Zookeeper默认的心跳即可实现通知功能,并传输相关信息;3. 客户端在得到通知之后,从服务端请求对应参数,然后同步到自己的内存参数管理中;4. 客户端同步参数的方式可以封装为统一的一个工具类,客户端只需增加简单的代码;5. 服务端不会耦合客户端的信息,它只与Zookeeper相连;客户端除了与Zookeeper相连,还需要配置服务端参数获取接口,或者通过dubbo来实现服务端和客户端的彻底解耦,两者都通过Zookeeper来交互;服务端就是服务提供者,客户端就是消费者。这里推荐一个好用的Zookeeper客户端jar包:Curator。 Apache Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。--- 架构图、模块图我随便画了一下这个框架的整体架构图、框架的模块图以及边缘的扩展模块。(非专业,不是非常专业) 架构图![图片](https://oomabc.com/staticsrc/img/201810/21/154011830059420f79f6beae941d6aa53faecd224a4ff.jpg) 模块图![图片](https://oomabc.com/staticsrc/img/201810/21/154011835895372b54f3fb74b48d6b47e3a8a912cc248.jpg) --- 模块相关解释完整的参数同步平台分为核心框架、Spring集成、参数管理三个大模块。 一、核心框架1. core:主要包含了核心监听节点类、zk节点类、节点变动触发接口类。其功能是,在zookeeper上注册相关节点并监听该节点,当节点发生变动,会以异步方式触发对应接口,执行自定义动作。zk节点类包含了基本的监听信息。2. 配置类:用于管理参数信息。3. 工具类:负责初始化zk客户端、维护配置中心信息、建立服务端数据广播节点、建立客户端订阅节点。负责收集所有项目的节点信息以及配置触发接口。负责所有节点的初始化以及其状态维护,包括重来、挂起等。4. 客户端:核心是提供一个注册方法,收集客户端注册的节点信息。在项目加载并连接zk成功之后调用工具类将注册的节点信息注册到zk节点,并监听其信息变动。5. 服务端:只提供一个广播方法,发布参数变动消息给zk节点,调用工具类实现该功能。 二、Spring集成1. 工具类:组装节点监听对象中封装的各个对象。包括zk节点类、变动触发接口类、连接池等信息的判断,以及调用核心框架的客户端方法进行节点注册和初始化。2. Spring加载:在Spring容器加载完成的时候,调用工具类进行参数包扫描,以及从配置文件获取节点监听对象信息,并初始化。3. 配置加载:从本地配置文件加载信息,主要是zk地址,用于初始化zk连接和配置中心。4. 节点监听:封装了注册节点、节点变动触发接口实现、连接池等信息。 三、参数管理1. 注解类:用在内存参数变量上,注明了参数的唯一标识字符串、参数描述以及参数数据类型。2. 包扫描类:扫描基础package,将加了注解类的所有变量信息收集并统一管理。提供了参数值映射赋值、新参数回调等功能。3. 参数实体接口:定义了参数名、参数描述和参数类型的get/set方法。4. 回调接口:发现新参数时,需要回调的接口,用于将新参数同步会管理平台。--- 预告下一篇[《从零开发参数同步框架(二)—— 前期准备之工具类》][nextLink]介绍核心框架的工具类,这个工具类提供了整个框架的核心方法,非常重要。[nextLink]: https://oomabc.com/articledetail?atclide0a2741b298441bd9cc0bfe90618e906

    参数同步   zookeeper   2018-10-22 浏览(2787) 有用(0) 阅读原文>> [原创]
  • 从零开发参数同步框架(二)—— 前期准备之工具类   

    零、基础定义 约定本框架对与监听参数的设计功能是通用的,它维护一个监听的数据节点信息,包含监听的db、table以及id信息。在zookeeper上,创建一个广播节点(broadcasetnode)作为根节点;在根节点下的目录结构是/broadcasetnode/db/table/id,每个客户端可以监听对应的节点以获取不同范围的触发事件。 GgTable当平台管理的参数发生变动时,服务端代码会将该对象传递给zk节点,然后通过监听事件传递给对应的客户端。它保存了监听的详情,包括监听的数据库、数据表、数据字段。框架默认实现的功能是触发table级别的事件,另外两个级别需要大家自行实现,比较简单。javaimport java.io.Serializable;public class GgTable implements Serializable { private static final long serialVersionUID 5755663143906038786L; //数据库 private String db; //表 private String table; //默认的id主键值,long型 private Long id; //自定义字段名 private String field; //field 相对于 数据记录的 value值 private Object value; public GgTable(String db, String table) { super(); this.db db; this.table table; } //get、set、equals、toString等方法省略} Curator实例缓存CuratorFrameworkUtil工具类对Curator实例对象CuratorFramework进行了缓存,缓存的Key就是zk地址,它负责创建最简洁的连接对象并复用。这个简洁的对象与框架路径无关。javapublic class CuratorFrameworkUtil extends GgFrameworkAbstract { private static final String LOGPREV "[CuratorFrameworkUtil] "; //注册zk地址,命名前缀解析 private static final String PREFIX "://"; //zk连接客户端,curator框架 private static Map CLIENTS new HashMap(2); public synchronized static CuratorFramework getInstance(String zkHost) { CuratorFramework client getFromClients(zkHost); if(client null) { client init(zkHost); setToClients(zkHost, client); } return client; } private static CuratorFramework getFromClients(String zkHost) { return CLIENTS.get(zkHost); } private static void setToClients(String zkHost, CuratorFramework client) { if(client ! null) { CLIENTS.put(zkHost, client); } } private static String cleanZkHost(String host) { int index -1; if(StringUtils.isNotBlank(host) && (index host.indexOf(PREFIX)) 0) { return host.substring(index + PREFIX.length()); } return host; } private static CuratorFramework init(String zookeeperHost) { log(LOGPREV, "init curator client, zkHost {}", new Object[]{ zookeeperHost }); String dubboregistry cleanZkHost(zookeeperHost); CuratorFramework client null; RetryPolicy retryPolicy new ExponentialBackoffRetry(10000, 8); client CuratorFrameworkFactory.builder() //zookeeper连接地址和重试逻辑 .connectString(dubboregistry).retryPolicy(retryPolicy) //节点命名空间,这里当时随便写了一个。。。 .namespace("watchtest").build(); //连接实例的启动交给第一个获得实例的用户// client.start(); return client; }} 广播节点本框架默认只实现了广播功能(目前没有找到订阅功能存在的必要)。广播节点就是整个框架在zk中的更目录,防止与其他共用zk的框架发生目录冲突。java //数据广播节点 public static final String broadcasetnode "/broadcasetnode";--- 一、功能概述 初始化zk客户端利用Apache Curator工具包,提供一个初始化zk客户端的方法。可以按照指定zk地址或者使用默认配置的地址进行连接。在节点连接成功之后,调用节点注册方法注册客户端收集的节点细腻,同时维护连接状态并提供断开/挂起重连功能。 节点注册提供一个线程安全的节点注册功能,这只是一个方法入口,实际的客户端节点注册代码在客户端本身。 节点数据判断用于对节点变动时服务端传递给客户的数据实体(GgTable)的判断。比如GgTable相关字段是否完整。 zk节点路径判断对与将要创建的zk路径,确认其是否存在,如果不存在将会先创建该路径。 广播方法为服务端提供一个广播方法,当平台管理的参数发生变动时,服务端将会调用该方法进行通知,通知所有监听的客户端此参数发生变动需要重新加载。------ 二、功能详述 初始化zk客户端默认提供两个public方法用于初始化zk连接:一个是无参数,一个是有参数。无参数使用的是配置文件默认配置,有参数可以指定zk地址。java / 初始化zookeeper连接,根据【resource.properties】配置的【ggframework.zookeeper】地址 / public static void initCuratorFramework() { if(CLIENT null) { CLIENT init(); } } //无参数初始化方法和有参数初始化方法最终调用同一个init方法 private static CuratorFramework init() { //默认初始化,读取配置 String dubboregistry PropertieUtil.getStrProp("resource", "ggframework.zookeeper"); return init(dubboregistry); } / 根据指定zookeeper服务地址,初始化zookeeper连接 @param zkHost 指定一个zk集群地址 / public static void initCuratorFramework(String zkHost) { if(CLIENT null) { CLIENT init(zkHost); } }实际的init方法负责参数判断、参数格式化、客户端创建以及相关状态维护。注意两个地方:1. 给连接实例添加状态监听事件:当连接成功、失败、挂起、重连时,执行响应操作。2. 连接实例避免重复启动:因为实例启动交给了客户端,所以有可能发生重复启动,重复启动会报错:Cannot be started more than once。java / 根据Zookeeper地址,初始化Curator实例对象 @param zookeeperHost zookeeper地址,多个地址:zk1:port1,zk2:port2,zk3:port3 @return / private static CuratorFramework init(String zookeeperHost) { if(StringUtils.isBlank(zookeeperHost)) { log(LOGPREV, "[ERROR] curator client init failed as zookeeperHost is null"); return null; } final String dubboregistry cleanZkHost(zookeeperHost); CuratorFramework client CuratorFrameworkUtil.getInstance(dubboregistry); log(LOGPREV, "add ConnectionStateListener to client, zkHost {}", new Object[]{ zookeeperHost }); //添加客户端监听 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { //客户端连接zk节点之后,连接状态变动时的处理 @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if(state null) { log(LOGPREV, "can not get connection state, zkHost {}", dubboregistry); return; } switch (state) { case CONNECTED://连接成功 log(LOGPREV, "connect to zookeeper successfully, zkHost {}", dubboregistry); addNodeListenerFromNodeCacheList(); break; case SUSPENDED://连接挂起 log(LOGPREV, "connection is suspended, zkHost {}", dubboregistry); break; case RECONNECTED://挂起,丢失,只读的连接,被重新唤起 log(LOGPREV, "connection is reconnected, zkHost {}", dubboregistry); addNodeListenerFromNodeCacheList(); break; case LOST://连接丢失 log(LOGPREV, "connection is lost, zkHost {}", dubboregistry); break; default://连接只读 log(LOGPREV, "connect to zookeeper failed, zkHost {}", dubboregistry); break; } } }); //启动连接 CuratorFrameworkState state client.getState(); //重复启动会报错:Cannot be started more than once if(state ! null && state ! CuratorFrameworkState.STARTED) { log(LOGPREV, "curator client start to connected zookeeper, zkHost {}", new Object[]{ zookeeperHost }); client.start(); } return client; } 节点注册java //从节点监听缓存列表加载监听事件,如果没有加载过的话 private static void addNodeListenerFromNodeCacheList() { if(isInit.compareAndSet(false, true)) { try { log(LOGPREV, "try to add nodeListener from node cache list"); GgFrameworkClient.addListenerFromCache(); } catch (Exception e) { log(LOGPREV, "[ERROR] add node listener from node cache list error, {}", e.getMessage()); } } }--- 完整的代码GgFrameworkAbstract.javajava/ 抽象类,保存公用的一些变量、方法 目前只有日期格式化设置和打印日志方法 @author wjyuian /public abstract class GgFrameworkAbstract { public static final SimpleDateFormat FORMATFULL new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); protected static void log(String prev, String line, Object...objects) { System.out.println("[" + FORMATFULL.format(new Date()) + "] " +prev + JunitConsoleOut.consoleLine(line, objects)); }}核心工具类:GgFrameworkUtil.javajava//参数框架工具类public class GgFrameworkUtil extends GgFrameworkAbstract { private static final String LOGPREV "[GgFrameworkUtil] "; //标记client是否已经初始化 private static AtomicBoolean isInit new AtomicBoolean(false); //数据广播节点 public static final String broadcasetnode "/broadcasetnode"; //订阅监听节点;目前版本未实现订阅功能 public static final String notifynode "/notifynode"; //注册zk地址,命名前缀解析 private static final String PREFIX "://"; //zk连接客户端,curator框架 private static CuratorFramework CLIENT null; public static final String FRAMEWORKPREFIX "GgFramework"; //配置中心 public static final String CONFIGERCENTER "/ggconfigercenter"; public static boolean isFramewordInit() { return isInit.get(); } / 初始化zookeeper连接,根据【resource.properties】配置的【ggframework.zookeeper】地址 @see [相关类/方法](可选) @since [产品/模块版本](可选) / public static void initCuratorFramework() { if(CLIENT null) { CLIENT init(); } } / 根据指定zookeeper服务地址,初始化zookeeper连接 @param zkHost @see [相关类/方法](可选) @since [产品/模块版本](可选) / public static void initCuratorFramework(String zkHost) { if(CLIENT null) { CLIENT init(zkHost); } } //格式化地址写法 private static String cleanZkHost(String host) { int index -1; if(StringUtils.isNotBlank(host) && (index host.indexOf(PREFIX)) 0) { return host.substring(index + PREFIX.length()); } return host; } / 根据Zookeeper地址,初始化Curator实例对象 @param zookeeperHost zookeeper地址,多个地址:zk1:port1,zk2:port2,zk3:port3 @return / private static CuratorFramework init(String zookeeperHost) { if(StringUtils.isBlank(zookeeperHost)) { log(LOGPREV, "[ERROR] curator client init failed as zookeeperHost is null"); return null; } final String dubboregistry cleanZkHost(zookeeperHost); CuratorFramework client CuratorFrameworkUtil.getInstance(dubboregistry); log(LOGPREV, "add ConnectionStateListener to client, zkHost {}", new Object[]{ zookeeperHost }); //添加客户端监听 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { //客户端连接zk节点之后,连接状态变动时的处理 @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if(state null) { log(LOGPREV, "can not get connection state, zkHost {}", dubboregistry); return; } switch (state) { case CONNECTED://连接成功 log(LOGPREV, "connect to zookeeper successfully, zkHost {}", dubboregistry); addNodeListenerFromNodeCacheList(); break; case SUSPENDED://连接挂起 log(LOGPREV, "connection is suspended, zkHost {}", dubboregistry); break; case RECONNECTED://挂起,丢失,只读的连接,被重新唤起 log(LOGPREV, "connection is reconnected, zkHost {}", dubboregistry); addNodeListenerFromNodeCacheList(); break; case LOST://连接丢失 log(LOGPREV, "connection is lost, zkHost {}", dubboregistry); break; default://连接只读 log(LOGPREV, "connect to zookeeper failed, zkHost {}", dubboregistry); break; } } }); //获得当前连接状态 CuratorFrameworkState state client.getState(); //重复启动会报错:Cannot be started more than once if(state ! null && state ! CuratorFrameworkState.STARTED) { log(LOGPREV, "curator client start to connected zookeeper, zkHost {}", new Object[]{ zookeeperHost }); client.start(); } return client; } //从节点监听缓存列表加载监听事件,如果没有加载过的话 private static void addNodeListenerFromNodeCacheList() { if(isInit.compareAndSet(false, true)) { try { log(LOGPREV, "try to add nodeListener from node cache list"); GgFrameworkClient.addListenerFromCache(); } catch (Exception e) { log(LOGPREV, "[ERROR] add node listener from node cache list error, {}", e.getMessage()); } } } private static CuratorFramework init() { //默认初始化,读取配置 String dubboregistry PropertieUtil.getStrProp("resource", "ggframework.zookeeper"); return init(dubboregistry); } public static CuratorFramework getClient() { return CLIENT; } //判断表是否设置了 db和table public static boolean isZkTableDbAndTableValid(GgTable table) { if(table null) { return false; } return StringTools.isNotBlank(table.getDb()) && StringTools.isNotBlank(table.getTable()); } //判断是否设置了 field和value public static boolean isZkTableFieldAndValueValid(GgTable table) { return StringTools.isNotBlank(table.getField()) && StringTools.isNotBlank(table.getValue()); } / 节点创建确认 如果节点不存在,则创建 / private static void createIfNotExists(String nodePath) { try { if (CLIENT ! null && CLIENT.checkExists().forPath(nodePath) null) { String s CLIENT.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL) .forPath(nodePath, "node".getBytes()); log(LOGPREV, "[GgFramework] create zk node, path {}, s {}", nodePath, s); } } catch (Exception e) { e.printStackTrace(); } } //节点数据判断方法 private static void checkZkTableForDbAndTable(String method, GgTable table) throws IllegalArgumentException { if(!isZkTableDbAndTableValid(table)) { throw new IllegalArgumentException("使用【" + method + "】时必须设置ZkTable的db和table"); } } //节点数据判断方法 private static void checkZkTableForFieldAndValue(String method, GgTable table) throws IllegalArgumentException { if(!isZkTableFieldAndValueValid(table)) { throw new IllegalArgumentException("使用【" + method + "】时必须设置ZkTable的field和value"); } } //数据修改广播,广播的节点是:db/table;服务端调用 public static void broadcastNodeDataChanged(GgTable table) throws Exception { checkZkTableForDbAndTable("org.common4s.zookeeper.util.CuratorUtil.broadcastNodeDataChanged(ZkTable)", table); String path broadcasetnode + "/" + table.getDb() + "/" + table.getTable(); createIfNotExists(path); getClient().setData().forPath(path, SSDBCoderUtil.encode(table)); } / 数据修改监听节点创建 在指定的广播节点进行数据监听 / public static NodeCache broadcastNotifyNodeChanged(GgTable table) { checkZkTableForDbAndTable("org.common4s.zookeeper.util.CuratorUtil.broadcastNotifyNodeChanged(ZkTable)", table); String path broadcasetnode + "/" + table.getDb() + "/" + table.getTable(); createIfNotExists(path); return new NodeCache(GgFrameworkUtil.getClient(), path, false); } //以下是配置中心的代码,比较简单 public static String startSyncConfigerAndGetPath() { String path CONFIGERCENTER + "/properties"; createIfNotExists(path); return path; } public static void pushDataToNode(String path, ConfigerData data) throws Exception { getClient().setData().forPath(path, SSDBCoderUtil.encode(data)); } public static ConfigerData getAllConfiger() throws Exception { String path CONFIGERCENTER + "/properties"; createIfNotExists(path); byte[] forPath getClient().getData().forPath(path); try { ConfigerData data SSDBCoderUtil.decode(forPath); return data; } catch (Exception e) { e.printStackTrace(); } return null; }}---下一章[《从零开发参数同步框架(三)—— 服务端编码 》] [nextLink]会介绍框架的服务端编码以及配套的管理模块核心代码。[nextLink]: https://oomabc.com/articledetail?atclidf83aba9c66a442e594d084bee123ce43

    参数同步   Curator   2018-10-24 浏览(3720) 有用(0) 阅读原文>> [原创]
  • 从零开发参数同步框架(三)—— 服务端编码   

    服务端功能仅就框架而言,服务端(GgFrameworkServer)代码的功能只有一个,向客户端发送(方法是:broadcast(GgTable table))某个数据发生变动的信息。注意:这里服务端提供的broadcast方法不会指定通知哪个客户端,只会通知参数对应的节点,这样就达到与客户端解耦的目的。客户端也无需知道谁是服务端,它只要告诉zookeeper自己想要监听的参数信息即可。不过,作为一个完整的参数同步框架,其服务端还需要提供完整的如下功能:1. 参数管理:这是一个可视化界面,方便用户进行参数查询、分组查询、参数权限控制等。2. 修改日志:记录每一个参数的详细修改记录,包括参数初始导入信息、修改信息。信息包含时间、当时值、修改之后的值。3. 变动通知:当参数发生变动时,需要服务端通过框架代码提供的功能(GgFrameworkServer.broadcast),向客户端发送变动通知同时传递约定好的数据(本框架中就是GgTable)。4. 参数查询:当客户端接到变动通知后,将会通过这个接口从服务端查询参数的最新值。这是一个dubbo接口,因此服务端和客户端同样也无需知晓对方的地址,可以解耦。接口接收的主要参数是客户端指定的参数key列表。--- 服务端初始化在web项目启动之后,调用GgFrameworkUtil.initCuratorFramework方法既可以初始化对zookeeper的连接:java Object zkHostString props.get("ggframework.zookeeper"); //直接指定zookeeper地址 if(zkHostString ! null && String.valueOf(zkHostString).length() 0) { GgFrameworkUtil.initCuratorFramework(String.valueOf(zkHostString)); } else { //从默认配置文件加载 GgFrameworkUtil.initCuratorFramework(); }项目启动之后的地方,可以通过继承PropertyPlaceholderConfigurer并重写protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException来实现。或者通过继承ContextLoaderListener并且web.xml中配置启动项来实现:xml com.hunteron.webinit.InitialContextListener --- 变动通知参数管理、修改日志没什么好说的,只是普通的CRUD操作。变动通知,就是当发生参数的update操作时,调用GgFrameworkServer.broadcast方法即可。下面详细说说约定的GgTable的信息设置。参数的update方法接收的参数有参数key、参数值、参数描述以及其他修改人等信息,实际上还有一些信息是我们约定的,比如参数存放的数据库、数据表。所以我们需要将数据库、数据表、参数key、参数值等信息封装到GgTable:javaGgTable table new GgTable();table.setDb("searchmanager"); //约定的参数数据库table.setTable("tbmgrparameters"); //约定的参数数据表table.setId(mgr.getId().longValue()); //参数主键,未使用table.setValue(value); //参数实际值,目前未使用table.setField("id"); //本意是设定value值对应的含义if (pm ! null) { //这里表示field为paramKey,对应的值是value,也就是key:value table.setField(pm.getParamKey());} //下面就是发送通知try { log.debug("update parameter and notify zookeeper [{}]", table.toString()); GgFrameworkServer.broadcast(table);} catch (Exception e) { e.printStackTrace();}GgFrameworkServer.java:javapublic class GgFrameworkServer { public static void broadcast(GgTable table) throws Exception { //发布数据更新 GgFrameworkUtil.broadcastNodeDataChanged(table); } / 广播数据修改节点 往数据广播节点,发布数据修改消息 这是直接指定数据库和数据表的方法 @param dbName 对应数据库 @param tableName 对应表明 @throws Exception / public static void broadcast(String dbName, String tableName) throws Exception { GgTable table new GgTable(); table.setDb(dbName); table.setTable(tableName); broadcast(table); }} -------- 参数查询+ 服务端提供一个参数查询接口,用于客户端查询指定key的参数值。javapublic MgrParametersResult queryParameters(MgrParameterQuery query);下面是参数(配置)查询的参数(param)输入:指定一批参数的key,向服务端查询这些key对于的详细信息:javapublic class MgrParameterQuery implements Serializable { private static final long serialVersionUID -7799195093065688094L; private List parameterKeys new ArrayList(); public MgrParameterQuery() { } public List getParameterKeys() { return parameterKeys; } public void setParameterKeys(List parameterKeys) { this.parameterKeys parameterKeys; } public void addParameterKey(String key) { if(!parameterKeys.contains(key)) { parameterKeys.add(key); } }}下面是参数查询返回的结果:javapublic class MgrParametersResult implements Serializable { private static final long serialVersionUID -2054733663746480487L; //返回参数实体列表 //MgrParametersWord实现了接口MySqlParameter, //为了使用com.hunteron.common4s.jdk.parameter.MySqlParameterTools.reloadParameter(Class, List, MySqlParameterCallback)接口 private List list new ArrayList(2); //...... 省略代码} 关于com.hunteron.common4s.jdk.parameter.MySqlParameterTools类的细节,将在下一张说明。+ 另外针对新参数,服务端将再提供一个参数录入接口:javapublic int insertParameter(String key, Object value, String info);至此,参数同步框架的服务端代码以及配套的管理模块核心代码已经介绍完毕。下一章[《从零开发参数同步框架(四)—— 客户端编码》][nextLink]介绍参数同步框架的客户端代码实现以及使用实例。[nextLink]: https://oomabc.com/articledetail?atclid95d35fad522243248ca51eca353faace

    参数同步   2019-02-20 浏览(2362) 有用(0) 阅读原文>> [原创]
  • 从零开发参数同步框架(四)—— 客户端编码   

    客户端框架的客户端代码完成的主要功能分为以下几个方面:1. 初始化zk连接(initClient):根据默认或者指定的zk集群地址,创建一个Curator维护的zk连接对象。2. 添加节点监听(listener):根据指定的数据库+数据表,在zk上创建对应的节点,并添加监听事件。特别地,如果调用该方法的时候客户端已完成zk连接的初始化,则直接调用addListenerToNode方法,为实际zk节点添加监听事件,否则会将节点信息暂存在内存中,等待zk连接完成之后回调addListenerFromCache方法3. 给节点添加监听事件(addListenerToNode):为实际的zk节点(与listener方法添加的数据库、数据表关联)添加监听事件,当这个节点(本系列文章中的参数表,即服务端管理的配置持久化表)的数据发生变更,会在当前客户端上触发刚刚添加的监听事件。4. 根据缓存给节点添加监听事件(addListenerFromCache):根据内存中所有节点信息,通过调用addListenerToNode方法,为每个节点创建监听事件。然后从内存删除已创建事件的节点信息。5. 释放节点的监听事件(releaseListener):删除已创建事件的节点上的监听事件。--- 客户端功能详解 初始化zk连接这个比较简单,直接调用之前介绍的工具类即可:java //初始化连接zookeeper public static void initClient(String zkHost) { if(StringUtils.isBlank(zkHost)) { GgFrameworkUtil.initCuratorFramework(); } else { GgFrameworkUtil.initCuratorFramework(zkHost); } } 添加节点监听这个方法有三个参数: GgTable:维护这个节点的监听目标,指定数据库、数据表即可。 IChangedService:这个接口就是当GgTable对应节点信息发生变动,即服务端调用了broadcast(GgTable table)方法,客户端触发监听事件后将会调用的类,具体调用的是IChangedService.excute(GgTable)方法。这个方法的参数正式GgTable,也就是服务端传入的对象。客户端可以自己实现IChangedService接口,定制不同的逻辑。 ExecutorService:用于异步维护监听事件的连接池。具体代码如下:java / 监听数据更新 / public static void listener(GgTable ggTable, IChangedService service, ExecutorService pool) throws Exception { //判断,zk连接是否已经创建,已创建则直接可以进行实际注册,否则只能将信息暂存 if(GgFrameworkUtil.isFramewordInit()) { addListenerToNode(new ListenerNode(ggTable, service, pool)); } else { //未初始化完成,则先加入list,等待初始化完成后自动进行监听 synchronized (LISTENERNODES) { / 获得同步锁, 目的是防止 addListenerFromCache方法获得锁之后,已经将 LISTENERNODES中的事件全部监听,然后清空,这里下一行的add方法对应的事件将会丢失, 因此,在它获得同步锁之后,需要再次判断是否初始化完成;如果完成了,则直接添加事件监听;否则加入列表等待监听 / if(GgFrameworkUtil.isFramewordInit()) { addListenerToNode(new ListenerNode(ggTable, service, pool)); } else { log(LOGPREV, "add changedService [{}] into node cache list for ggTable [{}]", new Object[]{ ggTable, service.getClass() }); LISTENERNODES.add(new ListenerNode(ggTable, service, pool)); } } } } 给节点添加监听事件它是实际上给zk节点添加监听事件的方法,它会进行后续操作必要信息的判空、从通过工具类获得zk节点对象、添加监听事件、缓存zk节点信息(用于释放)。java //添加节点监听事件到zk节点 private static void addListenerToNode(ListenerNode listenerNode) throws Exception { addListener(listenerNode.getGgTable(), listenerNode.getService(), listenerNode.getPool()); } private static void addListener(GgTable ggTable, IChangedService service, ExecutorService pool) throws Exception { if(ggTable null service null pool null pool.isShutdown()) { log(LOGPREV, "[ERROR] addListener(GgTable ggTable, IChangedService service, ExecutorService pool)"); throw new RuntimeException("arguments for addListener(GgTable ggTable, IChangedService service, ExecutorService pool) is null"); } //创建节点 log(LOGPREV, "[{}] add nodeListener for ggtable {}", GgFrameworkUtil.FRAMEWORKPREFIX, ggTable); //通过工具类,获得ggTable对应的节点,如果不存在,则会创建一个并返回 final NodeCache nodeCache GgFrameworkUtil.broadcastNotifyNodeChanged(ggTable); nodeCache.start(true); //绑定事件 final IChangedService temp service; //给节点添加一个监听事件,节点的监听事件可以存在很多个 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception {//节点信息变更触发事件 GgTable data null; try { data SSDBCoderUtil.decode(nodeCache.getCurrentData().getData()); } catch (Exception e) { log(LOGPREV, "[ERROR] nodeChanged for ggtable, changedService {}", temp.getClass()); } //节点事件触发后,调用IChangedService接口,进行回调 temp.excute(data); } }, pool); //缓存ggTable对应的节点信息,用于后面的释放操作 nodeCacheMap.put(ggTable, nodeCache); } 根据缓存给节点添加监听事件当zk连接初始化完成并且连接状态为CONNECTED或RECONNECTED的时候,将会触发这个方法。它会给在zk初始化完成之前,客户端注册进来的节点重新添加监听事件,防止遗漏。这个方法中,同步块很重要:java //从缓存加载节点事件监听 public static void addListenerFromCache() { if(LISTENERNODES.size() 0) { //先判断是否有缓存信息 synchronized (LISTENERNODES) { //如果有,则对缓存对象进行加锁 / 这里再次判断缓存数量,是防止第一步判断数量到加锁的时候,缓存被清空 / if(LISTENERNODES.size() 0) { log(LOGPREV, "[{}] add nodeListener from node cache list", GgFrameworkUtil.FRAMEWORKPREFIX); for(ListenerNode node : LISTENERNODES) { try { //调用实际方法,给节点添加监听事件 addListenerToNode(node); } catch (Exception e) { e.printStackTrace(); } } //情况缓存信息 //TODO 如果可以分辨事件是否重复添加,这里倒是可以不清空,有待后续研究 LISTENERNODES.clear(); } } } }--- 完整源代码节点事件释放比较简单,不再单独给出源码,这里直接给出客户端类完整代码。java//参数框架客户端public class GgFrameworkClient extends GgFrameworkAbstract { private static final String LOGPREV "[GgFrameworkClient] "; public static final Map nodeCacheMap new HashMap(); //临时存储的待监听节点,监听之后会清空 private static final List LISTENERNODES new ArrayList(10); //初始化连接zookeeper public static void initClient(String zkHost) { if(StringUtils.isBlank(zkHost)) { GgFrameworkUtil.initCuratorFramework(); } else { GgFrameworkUtil.initCuratorFramework(zkHost); } } / 监听数据更新 / public static void listener(GgTable ggTable, IChangedService service, ExecutorService pool) throws Exception { if(GgFrameworkUtil.isFramewordInit()) { addListenerToNode(new ListenerNode(ggTable, service, pool)); } else { //未初始化完成,则先加入list,等待初始化完成后自动进行监听 synchronized (LISTENERNODES) { / 获得同步锁, 目的是防止 addListenerFromCache方法获得锁之后,已经将 LISTENERNODES中的事件全部监听,然后清空,这里下一行的add方法对应的事件将会丢失, 因此,在它获得同步锁之后,需要再次判断是否初始化完成;如果完成了,则直接添加事件监听;否则加入列表等待监听 / if(GgFrameworkUtil.isFramewordInit()) { addListenerToNode(new ListenerNode(ggTable, service, pool)); } else { log(LOGPREV, "add changedService [{}] into node cache list for ggTable [{}]", new Object[]{ ggTable, service.getClass() }); LISTENERNODES.add(new ListenerNode(ggTable, service, pool)); } } } } //从缓存加载节点事件监听 public static void addListenerFromCache() { if(LISTENERNODES.size() 0) { synchronized (LISTENERNODES) { if(LISTENERNODES.size() 0) { log(LOGPREV, "[{}] add nodeListener from node cache list", GgFrameworkUtil.FRAMEWORKPREFIX); for(ListenerNode node : LISTENERNODES) { try { addListenerToNode(node); } catch (Exception e) { e.printStackTrace(); } } LISTENERNODES.clear(); } } } } //添加节点事件到zk节点 private static void addListenerToNode(ListenerNode listenerNode) throws Exception { addListener(listenerNode.getGgTable(), listenerNode.getService(), listenerNode.getPool()); } private static void addListener(GgTable ggTable, IChangedService service, ExecutorService pool) throws Exception { if(ggTable null service null pool null pool.isShutdown()) { log(LOGPREV, "[ERROR] addListener(GgTable ggTable, IChangedService service, ExecutorService pool)"); throw new RuntimeException("arguments for addListener(GgTable ggTable, IChangedService service, ExecutorService pool) is null"); } //创建节点 log(LOGPREV, "[{}] add nodeListener for ggtable {}", GgFrameworkUtil.FRAMEWORKPREFIX, ggTable); final NodeCache nodeCache GgFrameworkUtil.broadcastNotifyNodeChanged(ggTable); nodeCache.start(true); //绑定事件 final IChangedService temp service; nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception {//节点信息变更触发事件 GgTable data null; try { data SSDBCoderUtil.decode(nodeCache.getCurrentData().getData()); } catch (Exception e) { log(LOGPREV, "[ERROR] nodeChanged for ggtable, changedService {}", temp.getClass()); } temp.excute(data); } }, pool); nodeCacheMap.put(ggTable, nodeCache); } public static void listener(String dbName, String tableName, IChangedService service, ExecutorService pool) throws Exception { listener(new GgTable(dbName, tableName), service, pool); } / 取消监听 / public static void releaseListener(GgTable ggTable) throws IOException { NodeCache cache nodeCacheMap.get(ggTable); if(cache ! null) { cache.close(); } }}--- 客户端配套客户端在接入参数同步框架的时候,需要实现接口IChangedService,在方法void excute(GgTable table)中执行相关操作。当服务端,配置内容发生变更会触发此方法的执行,然后在此方法中进行最新参数查询以及新增参数导入。查询以及导入的方法是服务端管理模块提供的dubbo接口。java@Service("searchMangerChangedService")public class SearchMangerChangedService extends AbstractSelectTab implements IChangedService { public static Long parameterVersion; //参数管理平台提供的dubbo接口 @Resource(name "parameterService") private ParameterService parameterService; @Override public void excute(GgTable table) { MgrParameterQuery query new MgrParameterQuery(); query.setParameterKeys(MySqlParameterTools.getAllKeys(HDParameterFromMySql.class)); MgrParametersResult queryParameters parameterService.queryParameters(query); if(queryParameters null) { return; } parameterVersion queryParameters.getParameterVersion(); List list queryParameters.getList(); try { //通过反射的方式更新参数定义类中的参数值 //回调接口,用于新增参数导入 MySqlParameterTools.reloadParameter(HDParameterFromMySql.class, list, new MySqlParameterCallback() { @Override public void call(String key, Object value, String info) { parameterService.insertParameter(key, value, info); } }); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } }} 几点说明 1. HDParameterFromMySql.class是参数定义类,其中定义了一些静态变量,通过MySqlParameterField注解来配置参数相关信息,被注解的参数必须是public、static的,不能是final。javapublic class HDParameterFromMySql { / example @Description key 对应searchmanager 配置的参数 key / @MySqlParameterField(key "HDPARATERTEST", info "测试hd对接参数", type Type.INTEGER) public static int HDPARATERTEST 0; @MySqlParameterField(key "HDHOTSEARCHWORDS", info "热门搜索词", type Type.STRING) public static String HDHOTSEARCHWORDS "Java,PHP,WEB前端,算法,技术经理,技术总监,android,C/C++,产品经理";} 2. MySqlParameterTools是一个客户端扩展工具类。通过反射的方式从参数定义类中获取注解定义的参数名列表。额外,它还提供一个重要方法用于最新参数值赋值和新增参数回调导入。这个工具类的完整代码如下:java/ 参数工具类 负责项目扫描,发现参数注解; @author wjyuian /public class MySqlParameterTools { //保存客户端代码中,所有添加指定注解的Field对象,用于参数赋值 private static Map parameterFromAnotation new HashMap(); / 根据spring扫描的结果,获取带有参数注解的类 / public static void getParameterInfoFromAnotation(Class clazz) { if(clazz null) { return; } Field[] declaredFields clazz.getDeclaredFields(); if (declaredFields ! null && declaredFields.length 0) { for (Field field : declaredFields) { //获取参数注解 MySqlParameterField fieldAnotation field.getAnnotation(MySqlParameterField.class); if (fieldAnotation null) { continue; } //生成参数key String parameterKey fieldAnotation.key(); if(StringUtils.isBlank(parameterKey)) { parameterKey clazz.getName() + "." + field.getName(); } System.out.println("[COMMON4S-JDK] : scan parameter with MySqlParameterField " + (clazz.getName() + "." + field.getName())); List list parameterFromAnotation.get(parameterKey); if (CollectionUtil.isEmpty(list)) { list new ArrayList(2); list.add(field); parameterFromAnotation.put(fieldAnotation.key(), list); } else { list.add(field); } } } } public static void reloadParameter(Class clazz, List parameters) throws IllegalArgumentException, IllegalAccessException { reloadParameter(clazz, parameters, null); } public static void reloadParameter(Class clazz, List parameters, MySqlParameterCallback callback) throws IllegalArgumentException, IllegalAccessException { System.out.println("[COMMON4S-JDK] : load parameters. " + clazz); if (clazz null) { return; } //参数键值对 Map parameterMap new HashMap(); if (CollectionUtil.isNotEmpty(parameters)) { for (MySqlParameter p : parameters) { parameterMap.put(p.getKey(), p.getValue()); } } //给指定的参数定义类,按照参数键和当前键值对信息进行赋值 Field[] fields clazz.getDeclaredFields(); if (fields ! null) { for (Field f : fields) { processFieldValue(clazz, f, parameterMap, callback); } } //对全局参数信息进赋值 if (parameterFromAnotation ! null) { for (Map.Entry en : parameterFromAnotation.entrySet()) { List fieldList en.getValue(); if (CollectionUtil.isEmpty(fieldList)) { continue; } for(Field field : fieldList) { //此时传入的clazz是不对的,不过不影响, //因为我们规定注解的字段必须是public和static的,所以实际上不需要通过clazz来设定field的值 processFieldValue(clazz, field, parameterMap, callback); } } } } private static void processFieldValue(Class clazz, Field field, Map parameterMap, MySqlParameterCallback callback) throws IllegalArgumentException, IllegalAccessException { if(field null) { return; } MySqlParameterField p field.getAnnotation(MySqlParameterField.class); if (p null) { return; } String key p.key(); String info p.info(); Type type p.type(); Object old field.get(clazz); Object newValue parameterMap.get(key); if (newValue ! null) { field.set(clazz, value(old, newValue, type)); } else if (callback ! null) { callback.call(key, old, info); } } private static Object value(Object old, Object now, Type type) { switch (type) { case INTEGER: return getInt(old, now); case STRING: return getString(old, now); case FLOAT: return getFloat(old, now); case LONG: return getLong(old, now); default: return getInt(old, now); } } private static Object getFloat(Object old, Object now) { Float o StringTools.getFloat(old, -1); return StringTools.getFloat(now, o); } private static Object getString(Object old, Object now) { return now.toString(); } private static Object getInt(Object old, Object now) { int o StringTools.getInt(old, -1); return StringTools.getInt(now, o); } private static Object getLong(Object old, Object now) { long o StringTools.getLong(old, -1); return StringTools.getLong(now, o); } public static List getAllKeys(Class clazz) { Set keys new HashSet(); if (clazz null) return null; Field[] fields clazz.getDeclaredFields(); for (Field field : fields) { // 参数类中如果有无注解字段,则跳过 MySqlParameterField anno field.getAnnotation(MySqlParameterField.class); if (anno ! null) { keys.add(anno.key()); } } if (parameterFromAnotation ! null && parameterFromAnotation.size() 0) { keys.addAll(parameterFromAnotation.keySet()); } return new ArrayList(keys); } public static String getKey(Class clazz, String fieldName) { if (clazz null) return null; try { Field field clazz.getField(fieldName); if (field null) return null; return field.getAnnotation(MySqlParameterField.class).key(); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (SecurityException e) { e.printStackTrace(); } return null; }}--- 接入web项目以上代码都准备好之后,需要在web项目启动之时初始化相关配置,包括zk连接、事件监听和参数扫描、加载。这里是通过继承ContextLoaderListener类实现,上一章有介绍过两种方法。当web项目初始化完成之后,会调用void contextInitialized ( ServletContextEvent sce )方法。所以我们只要在这个方法中添加如下代码:java //初始化zk连接 GgFrameworkUtil.initCuratorFramework(); //创建连接池 ExecutorService pool Executors.newCachedThreadPool(); //创建需要监听的信息,这里是按约定的库和表 GgTable table new GgTable(); table.setDb("searchmanager"); table.setTable("tbmgrparameters"); try { //添加监听节点信息 GgFrameworkClient.listener(table, new IChangedService() { @Override public void excute(GgTable table) { cacheService.loadPortraitParameters(); } }, pool); } catch (Exception e) { e.printStackTrace(); }这样,我们就成功的将服务器项目作为参数同步框架的客户端角色,接入到参数管理平台了。---下一章[《从零开发参数同步框架(五)—— Spring集成》] [nextLink] 会介绍框架与Spring的集成方法以及相关代码,减少相关编码,通过配置方式集成Spring、连接池等。[nextLink]: https://oomabc.com/articledetail?atclid7e0a040816a742e68cc833d4771d218b

    参数同步   2018-10-27 浏览(1339) 有用(0) 阅读原文>> [原创]
  • 从零开发参数同步框架(五)—— Spring集成   

    前面几章详细介绍了这个参数同步框架代码中的工具类、服务端、客户端等代码以及相关解释。不过由于个人的语言组织能力比较弱,可能写的比较乱,部分名词容易混淆,还请各位看官连看带猜吧。截止到上一章为止,参数同步框架以及可以投入生产使用了。只是不管是客户端还是服务端的项目,都需要在web项目启动之后,手动调用一遍zk连接初始化的代码。我们回顾一下服务端初始化:java Object zkHostString props.get("ggframework.zookeeper"); //直接指定zookeeper地址 if(zkHostString ! null && String.valueOf(zkHostString).length() 0) { GgFrameworkUtil.initCuratorFramework(String.valueOf(zkHostString)); } else { //从默认配置文件加载 GgFrameworkUtil.initCuratorFramework(); }客户端初始化:java //初始化zk连接 GgFrameworkUtil.initCuratorFramework(); //创建连接池 ExecutorService pool Executors.newCachedThreadPool(); //创建需要监听的信息,这里是按约定的库和表 GgTable table new GgTable(); table.setDb("searchmanager"); table.setTable("tbmgrparameters"); try { //添加监听节点信息 GgFrameworkClient.listener(table, new IChangedService() { @Override public void excute(GgTable table) { cacheService.loadPortraitParameters(); } }, pool); } catch (Exception e) { e.printStackTrace(); }从上面的代码中不难看出,服务端和客户端都有相同的代码,那就是初始化zk连接:GgFrameworkUtil.initCuratorFramework();。客户端多出的一段逻辑是往zk节点添加监听事件,传入的参数是TgTable、IChangedService和ExecutorService。在web容器加载完成或者说Spring容器初始化完成之后执行客户端或者服务端zk连接初始化,除了第三章介绍的两种方法之外,我们还可以通过继承InstantiationAwareBeanPostProcessorAdapter类来实现。我们需要实现该类对应接口org.springframework.beans.factory.config.BeanPostProcessor的postProcessAfterInitialization方法。然后在该方法中进行zk连接初始化、添加节点监听事件,其实就是将上面客户端初始化代码的逻辑整合到该方法中。具体的,我准备了两个类,一个是专门对接Spring的ParameterSpringHandler,另一个是专门对接参数框架的类ParameterChangedListener。--- 相关代码下面给出源码,关于这两个类的详细逻辑直接在注释中体现:java/ 这类是用于对接参数框架的,它维护了连接池、GgTable相关参数以及IChangedService接口; 这些信息是用于zk连接初始化之后,给节点添加监听事件。 我们暂且叫这个类为参数变动监听器,随便取的,不用当真。 /public class ParameterChangedListener { //用于获得线程执行对象 private ThreadPoolTaskExecutor threadPoolTaskExecutor; //线程执行对象,它与threadPoolTaskExecutor两者,设置任意一个即可 private ExecutorService executorService; //GgTable的数据库 private String tableDb; //GgTable的数据表 private String tableName; //监听事件触发的方法 private IChangedService changedService; //默认立即执行;true表示初始化之后立即模拟触发一次事件 private Boolean exeImmediate true; public ParameterChangedListener() { } //此处省略 getter setter 方法这个类会在对接Spring的时候用到。java/ 这个了用于对接Spring,实现了postProcessAfterInitialization方法。 该方法会在Spring容器初始化完成之后进行调用 /public class ParameterSpringHandler extends InstantiationAwareBeanPostProcessorAdapter { //zk连接初始化的地址 private String zkHost; public ParameterSpringHandler() { } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //如果是参数变动监听器,则通过工具类加载参数的事件监听 if(bean instanceof ParameterChangedListener) { System.out.println("[GgFrameworkClient] parameter postProcessAfterInitialization " + beanName); //这是整合Spring框架提供的另一个工具类 //我也不知道为什么前前后后,我写了这么多工具类 ParameterSpringTools.loadParameterChangedListener((ParameterChangedListener) bean, zkHost); } //参数扫描 //这里会扫面配置中所有的类型 //它会发现其中所有public、static且打上注解MySqlParameterField的参数;然后从参数管理平台同步最新值或者插入初始值。 MySqlParameterTools.getParameterInfoFromAnotation(bean.getClass()); //执行父类的方法 return super.postProcessAfterInitialization(bean, beanName); } public String getZkHost() { return zkHost; } public void setZkHost(String zkHost) { this.zkHost zkHost; }}既然,本章又多出了一个工具类,不可避免要给出源码(我喜欢在源码中,通过注释写逻辑,这样就不用我组织语言了,只要按照代码一步一步注释,又方便,大家又看得懂):java//参数框架集成Spring的工具类public class ParameterSpringTools { //参数框架是否初始化了zk连接,原子标记对象 private static AtomicBoolean isGgFrameworkClientInit new AtomicBoolean(false); private static void initGgFrameworkClient(String zkHost) { GgFrameworkClient.initClient(zkHost); } //在上面介绍的Spring对接类中,针对参数变动监听器执行的方法 private static void addChangedService(ParameterChangedListener listener) { //先判断是否设置了线程池管理服务对象 ExecutorService pool listener.getExecutorService(); if(pool null) { ThreadPoolTaskExecutor threadPoolTaskExecutor listener.getThreadPoolTaskExecutor(); //如果Spring线程池管理对象也未设置,则抛错 if(threadPoolTaskExecutor null) { throw new RuntimeException("both pool and threadPoolTaskExecutor is null"); } //从Spring线程池对象中获取 pool threadPoolTaskExecutor.getThreadPoolExecutor(); } //以下是判断GgTable相关信息是否合法 String tableDb listener.getTableDb(); if(StringUtils.isBlank(tableDb)) { throw new RuntimeException("tableDb is null"); } String tableName listener.getTableName(); if(StringUtils.isBlank(tableName)) { throw new RuntimeException("tableName is null"); } GgTable table new GgTable(tableDb, tableName); //如果未实现IChangedService,则抛错 IChangedService changedService listener.getChangedService(); if(changedService null) { throw new RuntimeException("changedService is null"); } //下面的就是正常的初始化代码 try { GgFrameworkClient.listener(table, changedService, pool); //如果exeImmidatetrue,则立即模拟触发监听事件 Boolean exeImmidate listener.getExeImmediate(); if(exeImmidate ! null && exeImmidate) { changedService.excute(table); } } catch (Exception e) { e.printStackTrace(); } } //工具类暴露的方法,提供给对接Spring的类使用 public static void loadParameterChangedListener(ParameterChangedListener listener, String zkHost) { if(!isGgFrameworkClientInit.get()) { initGgFrameworkClient(zkHost); isGgFrameworkClientInit.set(true); } if(listener ! null) { addChangedService(listener); } }}--- 集成配置在xml配置文件中使用:java 其中searchMangerChangedService就是IChangedService的实现类,里面是客户端自己定义的逻辑。---本系列最后一章[《从零开发参数同步框架(六)—— 简版配置中心》][nextLink]将会介绍基于参数同步框架实现的配置中心,其实非常简单,就是基于zk连接可以做的其他事而已。[nextLink]: https://oomabc.com/articledetail?atclid40129845bc9e47f4a49da6b56c90098d

    参数同步   Spring   2018-10-30 浏览(2101) 有用(0) 阅读原文>> [原创]
  • 从零开发参数同步框架(六)—— 简版配置中心   

    零、前言前面五章已经详细介绍了如何从零开始开发一个参数同步框架。如果按照前面五章一步一步实践,应该是可以成功开发出一个可投入使用的框架的。如果不行,那应该是文章文字组织的问题了,没有将很多内容描述的通俗易懂。我们抽时间从头读一遍每一篇文章,争取逐步修改、统一名词、理清逻辑,各位海涵。 一、关于配置中心本文中定义:将那些公共配置信息统一推送到zk节点,需要使用的客户端按需加载即可。公共配置:所有客户端都会使用的、并且其值都是一致的。例如:1. mySQL数据库连接信息:url、driverClass、username、password。shelljdbc.oneblog.urljdbc:mysql://192.168.1.1:3306/oneblog?useUnicodetrue&characterEncodingUTF-8&autoReconnecttruejdbc.oneblog.usernametestjdbc.oneblog.passwordtest2. redis配置信息:host、port。shellredis.host192.168.1.2redis.port63793. zookeeper配置信息:host:port。shellzk1.address192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:21814. hessian服务地址:host:port/service。Properties 账户服务auth.remote.urlhttp://192.168.1.4:8680/remote 文件服务file.remote.urlhttp://192.168.1.5:8028/remote 文章服务article.remote.urlhttp://192.168.1.6:8780/remote5. mongodb配置信息:hostport、username、password、db。shellmongo.hostportmongo.uat.com:27017mongo.usernameoneblogmongo.password123456mongo.dboneblog 以上需要存放在配置中心,以下是各个客户端根据实际负载选择的个性化参数,不必放在配置中心mongo.connectionsPerHost10mongo.threadsAllowedToBlockForConnectionMultiplier5mongo.connectTimeout10000mongo.maxWaitTime15000mongo.autoConnectRetrytruemongo.socketKeepAlivetruemongo.socketTimeout15000mongo.slaveOktrue--- 二、设计思路 功能在web项目加载本地配置文件的时候,先从配置中心加载所有配置参数,在内存中根据参数名(key)进行参数赋值。原则如下:1. key只存在于配置中心,则忽略;2. key只存在于本地,则使用本地值;3. key同时存在于配置中心和本地,则通过override参数来确定是以配置中心为主还是本地为住。4. 如有需要,可以在参数级别实现override功能。 分析先来回顾下,我们在web项目中配置参数文件的方式,通常我们会这样做:xml 我们查看PropertyPlaceholderConfigurer源码发现中一个方法protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)。通过注释,我们了解到它是负责将收集好的配置文件信息填充到xml的配置文件中的,就是那些占位符设置的值(${property.key})。因此,我们需要在此方法之前,将配置中心的参数加载完成。那么,我们就写一个类继承此类,重写该方法即可,在重写的方法的最后调用父类的此方法。--- 开始编码这里定义的类名叫GgFrameworkPropertyPlaceholderConfiger,并且定义一个全局的覆盖参数Boolean overrideValue。在重写的方法里我们的流程就是:1. 初始化zk连接,还是调用前几章提供的方法GgFrameworkUtil.initCuratorFramework。2. 从配置中心获取配置内容。3. 根据之前讨论的规则进行赋值即可。4. 最后调用父类的同名方法,执行后续占位符赋值操作。到这里为止,我们缺少的就是从配置中心获取配置内容的方法以及配置中心内容维护的方法。 推送配置信息定义一个配置信息类,很简单,里面维护了一个Map,key、value都是字符串。java//参数信息类public class ConfigerData implements Serializable { private static final long serialVersionUID -1837290930935157693L; private Map valueMap new HashMap(); public ConfigerData() { } public void push(String key, String value) { if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { valueMap.put(key, value); } } public Map getValueMap() { return valueMap; } public void setValueMap(Map valueMap) { this.valueMap valueMap; }}我们会将该类的实例对象序列化并存储在zk节点上。java//参数框架客户端public class GgFrameworkClient extends GgFrameworkAbstract { //...... //统一配置推送 public static void pushConfiger(Map valueMap) { //获取配置中心的统一节点路径 String configerPath GgFrameworkUtil.startSyncConfigerAndGetPath(); try { ConfigerData configerData new ConfigerData(); configerData.setValueMap(valueMap); //将配置对象,推送到zk节点 GgFrameworkUtil.pushDataToNode(configerPath, configerData); } catch (Exception e) { e.printStackTrace(); } }}//参数框架工具类public class GgFrameworkUtil extends GgFrameworkAbstract { //...... public static String startSyncConfigerAndGetPath() { String path CONFIGERCENTER + "/properties"; createIfNotExists(path); return path; } public static void pushDataToNode(String path, ConfigerData data) throws Exception { getClient().setData().forPath(path, SSDBCoderUtil.encode(data)); }} 获取配置java//参数框架工具类public class GgFrameworkUtil extends GgFrameworkAbstract { //...... public static ConfigerData getAllConfiger() throws Exception { String path CONFIGERCENTER + "/properties"; createIfNotExists(path); byte[] forPath getClient().getData().forPath(path); try { ConfigerData data SSDBCoderUtil.decode(forPath); return data; } catch (Exception e) { e.printStackTrace(); } return null; }}--- 客户端使用xml classpath:resource.properties classpath:mongodb.properties classpath:mongodbreplset.properties 这样,项目启动之后,客户端会将配置中心所有配置信息按照规则加载到项目内存中,以供使用。目前还没有很简洁的方法来指定每个客户端所需要的key集合,每次都是加载全部。因此,客户端的对于参数的命名必须全局规范避免出现同名不同义的参数。至此,整个系列的文章已经写完,仅供个人记录和大家参考。

    参数同步   2019-02-21 浏览(2173) 有用(0) 阅读原文>> [原创]
  • blogTest
    分享文章
     
    使用APP的"扫一扫"功能,扫描左边的二维码,即可将网页分享给别人。
    你也可以扫描右边本博客的小程序二维码,实时关注最新文章。