网络基础以及Java网络编程
Java语言对网络编程提供了良好的支持,通过其提供的接口我们可以很方便地进行网络编程。下面先对网络编程的一些基础知识进行介绍,最后给出使用Java语言进行网络编程的实例。
  • 网络模型基础知识(一)——OSI七层参考模型   

    OSI在百度百科中的定义是这样写的: OSI是Open System Interconnection的缩写,意为开放式系统互联。国际标准化组织(ISO)制定了OSI模型,该模型定义了不同计算机互联的标准,是设计和描述计算机网络通信的基本框架。OSI模型把网络通信的工作分为7层,从下至上分别是物理层(Physical Layer),数据链路层(Data Link Layer),网络层(Network Layer),传输层(Transport Layer),会话层(Session Layer),表示层(Presentation Layer)和应用层(Application Layer);为方便记忆可以将七层从高到低视为:All People Seem To Need Data Processing。每一个大写字母与七层名称头一个字母相对应。--- 把网络模型进行分层好处1. 把开放系统的信息交换问题分解到一系列容易控制的软硬件模块层。2. 每一层都有明确的功能,有利于明确网络协议的国际标准,避免各层功能混乱。3. 每层利用紧邻的下层所提供的服务提供更高级的增值服务,更容易记住个层的功能。4. 层间的标准接口,方便工程模块化,有利于各不同制造厂家的设备互连。5. 降低了复杂度,使程序更容易修改,产品开发的速度更快。6. 创建了一个更好的互连环境,人们可以很容易的讨论和学习协议的规范细节。 分层原则+ 网络中各结点都有相同的层次+ 不同结点相同层次具有相同的功能+ 同一结点相邻层间通过接口通信+ 每一层可以使用下层提供的服务,并向上层提供服务+ 不同结点的同等层间通过协议来实现对等层间的通信 关于对等层OSI参考模型中处于同一层次的两端就是对等层,每层都利用下一层提供的服务与对等层进行通讯。例如,传输层与对端的传输层就是对等层。 为了使数据分组从源传送到目的地,源端OSI模型的每一层都必须与目的端的对等层进行通信,在这一过程中,每一层的协议在对等层之间交换信息,该信息称为协议数据单元(PDU)。位于源计算机的每个通信层,使用针对该层的PDU同目的计算机的对等层进行通信(虚拟通信,实际通信在最底层完成)。 发送方数据由最高层逐渐向下层传递,到接收方数据由最低层逐渐向高层传递。--- 关于PDUOSI参考模型中,对等层协议之间交换的信息单元统称为协议数据单元(PDU,Protocol Data Unit)。而传输层及以下各层的PDU另外还有各自特定的名称:+ 传输层——数据段(Segment)+ 网络层——分组(数据包)(Packet)+ 数据链路层——数据帧(Frame):数据链路层对数据帧的长度都有一个限制,也就是链路层所能承受的最大数据长度,这个值称为最大传输单元,即MTU;它包括三部分:帧头,数据部分,帧尾。其中,帧头和帧尾包含一些必要的控制信息,比如同步信息、地址信息、差错控制信息等;数据部分则包含网络层传下来的数据,比如IP数据包。+ 物理层——比特(Bit)PDU信息仅在接收设备的对等层被读取,然后被剥离,然后数据被交给下一层。在给定的某一 OSI 层,信息单元的数据部分包含来自于所有上层的头和尾以及数据,这称之为封装。传输数据(协议头+数据+协议尾)在发送端从上至下层层包装,在接收端由下至上层层解析并去头尾。比如,发送端的应用层将自己的协议头,数据和协议尾(如果有)传输给表示层;表示层将接收到的数据加上协议头和尾,一起传输给会话层;会话层、传输层、网络层、数据链路层、物理层依次进行各自协议的包装。接收端的物理层在接收到数据后,解析对应的协议头和尾,然后将去掉协议头尾的数据传输给数据链路层;数据链路层在接收到数据后,解析自己对应的协议,将去掉协议之后的数据传输给网络层;传输层、会话层、表示层、应用层依次进行各自协议的解析和去除。最终发送端和接收端的各层分别处理了相同的数据,实现虚拟通信。---- 七层模型由低到高 物理层(Physical Layer)OSI分层结构体系中最重要、最基础的一层,它建立在传输媒介基础上,起建立、维护和取消物理连接(激活物理连接,传送数据,终止物理连接)作用,实现设备之间的物理接口。物理层的PDU为比特(bit)流,其不考虑信息的意义和信息结构。与邻层:向数据链路层提供一个透明的位传输。典型设备:光纤、同轴电缆、双绞线、中继器和集线器。--- 数据链路层(Data Link Layer)将物理层提供的比特(Bit)信息封装成数据帧(Frame),起到在物理层之上建立、撤销、标识逻辑链接和链路复用以及差错校验等功能。通过使用接收系统的硬件地址或物理地址来寻址,建立相邻结点之间的数据链路;通过差错控制提供数据帧(Frame)在信道上无差错的传输,同时为其上面的网络层提供有效的服务,实现系统实体间二进制信息块的正确传输。 为网络层提供可靠无错误的数据信息(数据传送服务)。顺序控制,指对帧的收发顺序的控制.差错检测和恢复,还有链路标识,流量控制等等。差错检测多用方阵码校验和循环码校验来检测信道上数据的误码,而帧丢失等用序号检测。各种错误的恢复则常靠反馈重发技术来完成。代表协议:SDLC、HDLC、PPP、STP、帧中继等。典型设备:二层交换机、网桥、网卡。--- 网络层(Network Layer)网络层也称通信子网层,是高层协议之间的界面层,用于控制通信子网的操作,是通信子网与资源子网的接口。在计算机网络中进行通信的两个计算机之间可能会经过很多个数据链路,也可能还要经过很多通信子网。网络层的任务就是选择合适的网间路由和交换结点,确保数据及时传送。网络层将解封装数据链路层收到的帧,提取数据包,包中封装有网络层包头,其中含有逻辑地址信息源站点和目的站点地址的网络地址。主要功能是基于网络层地址(IP地址)进行不同网络系统间的路径选择。网络层传输的数据单元是分割和重新组合后的数据包(Packet)。通信子网:网络中实现网络通信功能的设备及其软件的集合,通信设备、网络通信协议、通信控制软件等属于通信子网,是网络的内层,负责信息的传输。把信息从一台主机传输到另一台主机,在OSI体系中的位置是下三层。资源子网:由计算机系统、终端、终端控制器、连网外设、各种软件资源与信息资源组成。设备工作在TCP/IP协议的应用层(对应OSI的会话层、表示层、应用层),是各种网络资源(硬件、软件、数据信息)的集合;第四层传输层是通信子网和资源子网的接口层。代表协议:IP、IPX、OSPF; 网络层为建立网络连接和上层提供服务: + 路由选择和中继; + 激活,终止网络连接; + 在一条数据链路上复用多条网络连接,多采取分时复用技术; + 差错检测与恢复; + 排序,流量控制; + 服务选择; + 网络管理;典型设备:网关、路由器--- 传输层(Transport Layer)传输层建立在网络层和会话层之间,实质上它是网络体系结构中高低层之间衔接的一个接口层。用一个寻址机制来标识一个特定的应用程序(端口号)。传输层不仅是一个单独的结构层,它还是整个分层体系协议的核心,没有传输层整个分层协议就没有意义。传输层的数据单元是由数据组织成的数据段(Segment)这个层负责获取全部信息,因此,它必须跟踪数据单元碎片、乱序到达的数据包和其它在传输过程中可能发生的危险。主要功能是从会话层接收数据,根据需要把数据切成较小的数据包,并把数据传送给网络层,确保数据包正确到达网络层,从而实现两层数据的透明传送。传输层是两台计算机经过网络进行数据通信时,第一个端到端的层次,具有缓冲作用。当网络层服务质量不能满足要求时,它将服务加以提高,以满足高层的要求;当网络层服务质量较好时,它只用很少的工作。传输层还可进行复用,即在一个网络连接上创建多个逻辑连接。传输层也称为运输层。传输层只存在于端开放系统中,是介于低三层通信子网系统和高三层之间的一层,但是很重要的一层。因为它是源端到目的端对数据传送进行控制从低到高的最后一层。它采用分流/合流、复用/介复用技术来调节上述通信子网的差异,使会话层感受不到。传输层还要具备差错恢复、流量控制等功能,以此对会话层屏蔽通信子网在这些方面的细节与差异。传输层面对的数据对象已不是网络地址和主机地址,而是和会话层的界面端口。上述功能的最终目的是为会话提供可靠的、无误的数据传输。传输层的服务一般要经历传输连接建立阶段、数据传送阶段、传输连接释放阶段3个阶段才算完成一个完整的服务过程。而在数据传送阶段又分为一般数据传送和加速数据传送两种。传输层服务分成5种类型。基本可以满足对传送质量、传送速度、传送费用的各种不同需要。 向上层提供的服务包括: + 无差错的有序的报文收发; + 提供可靠的传输连接; + 进行流量控制。代表协议:TCP(面向连接的传输协议,通过IP)、UDP(无连接网络协议,通过IP)、SPX等典型设备:终端设备(PC、手机、平板等) 端到端 端到端是网络连接。网络要通信,必须建立连接,不管有多远,中间有多少机器,都必须在两头(源和目的)间建立连接,一旦连接建立起来,就说已经是端到端连接了,即端到端是逻辑链路,这条路可能经过了很复杂的物理路线,但两端主机不管,只认为是有两端的连接,而且一旦通信完成,这个连接就释放了,物理线路可能又被别的应用用来建立连接了。 TCP就是用来建立这种端到端连接的一个具体协议,SPX也是。端到端是传输层的,你比如你要将数据从A传送到E,中间可能经过A→B→C→D→E,对于传输层来说他并不知道b,c,d的存在,他只认为我的报文数据是从a直接到e的,这就叫做端到端。 点到点点到点是物理拓扑,如光纤,就必须是点到点连接,DDN专线也是,即两头各一个机器中间不能有机器,是网络层的。--- 端到端传输的优点 + 链路建立后,发送端知道接收设备一定能收到 + 经过中间交换设备时不需要进行存储转发,因此传输延迟小 端到端传输的缺点 + 数据会一直发送,直到接收端收到数据为止,发送端的设备一直要参与传输 + 如果整个传输的延迟很长,那么对发送端的设备造成很大的浪费 + 如果接收设备关机或故障,那么端到端传输不可能实现。 点到点传输的优点 + 发送端设备送出数据后,它的任务已经完成,不需要参与整个传输过程,这样不+ 会浪费发送端设备的资源 + 即使接收端设备关机或故障,点到点传输也可以采用存储转发技术进行缓冲 点到点传输的缺点 + 发送端发出数据后,不知道接收端能否收到或何时能收到数据在一个网络系统的不同分层中,可能用到端到端传输,也可能用到点到点传输。如Internet网,IP及以下各层采用点到点传输,四层以上采用端到端传输。--- 会话层(Session Layer)这一层也可以称为会晤层或对话层,在会话层及以上的高层次中,数据传送的单位不再另外命名,统称为报文。会话层不参与具体的传输,它提供包括访问验证和会话管理在内的建立和维护应用之间通信的机制。如服务器验证用户登录便是由会话层完成的。会话层提供的服务可使应用建立和维持会话,并能使会话获得同步。会话层使用校验点可使通信会话在通信失效时从校验点继续恢复通信。这种能力对于传送大的文件极为重要。会话层、表示层、应用层构成开放系统的高3层,面对应用进程提供分布处理,对话管理,信息表示,恢复最后的差错等。会话层同样要担负应用进程服务要求,而运输层不能完成的那部分工作,给运输层功能差距以弥补。主要的功能是对话管理,数据流同步和重新同步。要完成这些功能,需要由大量的服务单元功能组合,已经制定的功能单元已有几十种。用户数据单元为SSDU,而协议数据单元为SPDU。会话用户之间的数据传送过程是将SSDU转变成SPDU进行的。 会话层的主要功能 + 会话层连接到传输层的映射; + 会话连接的流量控制; + 数据传输; + 会话连接恢复与释放; + 会话连接管理、差错控制。--- 表示层(Presentation Layer)表示层向上对应用层提供服务,向下接收来自会话层的服务。表示层是为在应用过程之间传送的信息提供表示方法的服务,它关心的只是发出信息的语法与语义。表示层要完成某些特定的功能,主要有不同数据编码格式的转换,提供数据压缩、解压缩服务,对数据进行加密、解密。例如图像格式的显示,就是由位于表示层的协议来支持。 为应用层提供服务 + 括语法选择:提供一种初始语法和以后修改这种选择的手段 + 语法转换等:涉及代码转换和字符集的转换、数据格式的修改以及对数据结构操作的适配--- 应用层(Application Layer)网络应用层是通信用户之间的窗口,为用户提供网络管理、文件传输、事务处理等服务。其中包含了若干个独立的、用户通用的服务协议模块。应用层是OSI的最高层,为网络用户之间的通信提供专用的程序。应用层的内容主要取决于用户的各自需要,这一层设计的主要问题是分布数据库、分布计算技术、网络操作系统和分布操作系统、远程文件传输、电子邮件、终端电话及远程作业登录与控制等。应用层为操作系统或网络应用程序提供访问网络服务的接口。代表协议:Telnet、FTP、HTTP、SNMP、DNS等 --- 比较TCP/IPTCP/IP模型实际上是OSI模型的一个浓缩版本,它只有四个层次:1. 应用层,对应着OSI的应用层、表示层、会话层2. 传输层,对应着OSI的传输层3. 网络层,对应着OSI的网络层4. 网络接口层,对应着OSI的数据链路层和物理层OSI模型的网络层同时支持面向连接(connection-oriented)和无连接(connectionless)的通信,但是传输层只支持面向连接的通信;TCP/IP模型的网络层只提供无连接的服务,但是传输层上同时提供两种通信模式。![图片](https://oomabc.com/staticsrc/img/201809/13/1536832688962343848573b18436882fed532872d4a38.jpg)

    TCP/IP   网络模型   OSI   2019-05-23 浏览(2899) 有用(0) 阅读原文>> [原创]
  • 网络模型基础知识(二)——TCP/IP四层模型   

    四层模型定义TCP/IP协议,也称TCP/IP协议簇或TCP/IP协议栈,是Internet的基础,也是当今最流行的组网形式。了TCP/IP协议簇包含了许多别的协议。其中比较重要的有SLIP、PPP、IP、ICMP、ARP、TCP、UDP、FTP、DNS、SMTP等。TCP/IP协议并不完全符合OSI的七层参考模型。OSI参考模型是在协议开发之前设计的,具有通用性,但只是理论上的模型,并没有成熟的产品支持;TCP/IP模型则是在协议集已有的情况下建立的,已经成为“实际上的国际标准”,它的每一层都呼叫其下一层所提供的网络来完成自己的需求。--- 网络接口层网络接口层也称为主机到网络层,与OSI参考模型中的物理层和数据链路层相对应,它负责监视数据在主机和网络之间的交换。实际上TCP/IP四层模型本身并未描述该层的协议实现,只是要求能够提供给其上层-网际互联层(也叫网络互连层)一个访问接口,以便在其上传递IP分组。所以这一层使用的接口由参与互联的各个网络使用其自己的物理层和数据链路层协议,然后与TCP/IP的网络接口层进行连接。该层有SLIP、PPP、ARP等协议。+ SLIP(Serial Line Internet Protocol),即串行线路网际协议,简称网际协议。它提供了一种在串行通信线路上封装IP数据报的简单方法,使用户通过电话线和Modem能方便地接入TCP/IP网络。 1. SLIP不支持在连接过程中的动态IP地址分配,通信双方必须事先告诉对方IP地址,这给没有固定IP地址的个人用户上Internet网带来了很大的不便。 2. SLIP帧中无协议类型字段,因此他只能支持Ip协议。 3. SLIP帧中无校验字段,因此链路层上无法检测出传输差错,必须由上层实体或具有纠错能力的MODEM来解决差错问题。+ PPP(Point to Point Protocol),即点对点协议。它是一种有效的点到点通信协议,解决了SLIP存在的上述问题,即可以支持多种网络层协议(如IP、IPX等),支持动态分配的IP地址;并且PPP帧中设置了校验字段,因而PPP在网络接口层上具有差错检验能力。 1. PPP具有动态分配IP地址的能力,允许在连接时刻协商IP地址。 2. PPP支持多种网络协议,比如TCP/IP、NetBEUI、NWLINK等。 3. PPP具有错误检测以及纠错能力,支持数据压缩。 4. PPP具有身份验证功能。 5. PPP可以用于多种类型的物理介质上,包括串口线、电话线、移动电话和光纤(例如SDH),PPP也用于Internet接入。+ ARP(Address Resolution Protocol),即地址解析协议。 1. 它根据IP地址获取物理地址的一个TCP/IP协议。主机发送信息时将包含目标IP地址的ARP请求广播到网络上的所有主机,并接收返回消息,以此确定目标的物理地址。 2. 收到返回消息后将该IP地址和物理地址存入本机ARP缓存中并保留一定时间,下次请求时直接查询ARP缓存以节约资源。--- 网际互联层网际互联层是整个TCP/IP协议栈的核心,也叫网络互连层,与OSI参考模型中的网络层相对应。它的功能是把分组发往目标网络或主机。同时,为了尽快地发送分组,可能需要沿不同的路径同时进行分组传递。因此,分组到达的顺序和发送的顺序可能不同,这就需要上层必须对分组进行排序。该层有3个主要协议:IP、IGMP、ICMP。+ IP(Internet Protocol),即互联网协议。 1. 它将多个网络连成一个互联网,可以把高层的数据以多个数据包的形式通过互联网分发出去。 2. 它的基本任务是通过互联网传送数据包,各个IP数据包之间是相互独立的。+ IGMP(Internet Group Management Protocol),即Internet组管理协议。 1. 是因特网协议家族中的一个组播协议。该协议运行在主机和组播路由器之间。+ ICMP(Internet Control Message Protocol),即Internet控制报文协议。 1. 它是TCP/IP协议族的一个子协议,用于在IP主机、路由器之间传递控制消息。 2. 控制消息是指网络是否通畅、主机是否可达、路由是否可用等网络本身的消息。这些控制消息虽然并不传输用户数据,但是对于用户数据的传递起着重要的作用。--- 传输层传输层与OSI参考模型中的传输层相对应,它的功能是使源端主机和目标端主机上的对等实体可以进行会话。在传输层定义了两种服务质量不同的协议,TCP和UDP。+ TCP(Transmission Control Protocol),即传输控制协议。 1. TCP协议是一个面向连接的、可靠的、基于字节流的通信协议,位于IP层之上,应用层之下的中间层。使用三次握手协议建立可靠连接。 2. 它将一台主机发出的字节流无差错地发往互联网上的其他主机。在发送端,它负责把上层传送下来的字节流分成报文段并传递给下层。在接收端,它负责把收到的报文进行重组后递交给上层。TCP协议还要处理端到端的流量控制,以避免缓慢接收的接收方没有足够的缓冲区接收发送方发送的大量数据。 + UDP(User Datagram Protocol),即用户数据报协议。 1. UDP协议是一个不可靠的、无连接协议,主要适用于不需要对报文进行排序和流量控制的场合。 2. 它不属于连接型协议,因而具有资源消耗小,处理速度快的优点,所以通常音频、视频和普通数据在传送时使用UDP较多,因为它们即使偶尔丢失一两个数据包,也不会对接收结果产生太大影响。比如我们聊天用的ICQ和QQ就是使用的UDP协议。 3. UDP报头使用两个字节存放端口号,所以端口号的有效范围是从0到65535。一般来说,大于49151的端口号都代表动态端口。--- 应用层应用层与OSI参考模型中的会话层、表示层和应用层相对应。它向用户提供一组常用的应用程序,比如电子邮件、文件传输访问、远程登录等。远程登录TELNET使用TELNET协议,提供在网络其它主机上注册的接口。TELNET会话提供了基于字符的虚拟终端。文件传输访问FTP使用FTP协议来提供网络内机器间的文件拷贝功能。这一层的协议主要包括如下几个:FTP、TELNET、DNS、SMTP、NFS、HTTP。+ FTP(File Transfer Protocol),即文件传输协议,一般上传下载用FTP服务,数据端口是20,控制端口是21。 1. FTP工具一般分为FLASHFXP、LEAPFTP、CuteFTP,合称FTP三剑客,以汉化版和破解版居多。国产简体中文版目前有8UFTP。+ Telnet(Teletype Network),是用户远程登录服务,工作在23端口,使用明码传送,保密性差、简单方便。 1. Telnet定义了一个远程系统登录网络虚拟终端的标准接口。客户机程序不必详细了解远程系统,他们只需构造使用标准接口的程序,然后通过账号面即可登录。 2. Telnet包括一个允许客户机和服务器协商选项的机制,而且它还提供一组标准选项。  3. Telnet对称处理连接的两端,即Telnet不强迫客户机从键盘输入,也不强迫客户机在屏幕上显示输出。+ DNS(Domain Name Service),即域名解析服务,提供域名到IP地址之间的转换,使用端口53。+ SMTP(Simple Mail Transfer Protocol),即简单邮件传输协议,用来控制信件的发送、中转,使用端口25。+ NFS(Network File System),即网络文件系统,用于网络中不同主机间的文件共享。+ HTTP(Hypertext Transfer Protocol),即超文本传输协议,用于实现互联网中的WWW服务,使用端口80。---- OSI七层参考模型和TCP/IP四层模型直观关系图。![图片](https://oomabc.com/staticsrc/img/201809/14/1536925378794337671d790d24b978a7340f14ecf5beb.jpg)

    TCP/IP   网络模型   四层模型   2018-09-15 浏览(2699) 有用(0) 阅读原文>> [原创]
  • Java网络编程之Netty框架学习(一)   

    零、前言Netty致力于成为一个异步的事件驱动的网络应用框架,同时,它也是NIO(非阻塞IO)的。通过它,程序员可以简单快速开发出一种可维护、易扩展且高性能的服务端-客户端通信协议。它通极大的简化了连接激活、消息发送、消息接收等环节,通过在这些环节添加事件监听从而将网络编程退化为流式处理,也就是说,对于普通开发者可忽略这些环节的细节和异步性(实际上每个环节的动作都是异步的),我们可以专注各个环节需要处理的业务逻辑。比如基于UPD、TCP的Socket通信服务开发。简单快速并不意味着放弃了维护性和高性能表现。得益于在像FTP、SMTP、HTTP以及其他二进制协议或者更老的一些基于文本的协议丰富的开发实践经验,Netty团队成功的设计了一种方式,可以同时兼顾易开发、高性能、稳定性和易扩展等方面。Netty的设计哲学让它在众多的网络通信框架中显得与众不同。无论是阅读API文档还是通过它来编程,你都能感受到Netty给你带来的舒适体验,它只可意会不可言传的设计哲学让你今后的编程生涯变得更加简单有趣。上面关于Netty的描述都是我根据官网([https://netty.io/wiki/user-guide-for-4.x.html] [nettyHome])瞎翻译的,看看就好了。![图片](https://oomabc.com/staticsrc/img/201811/18/1542524873785d96497fd31934e61923771fbc6aa5c5a.jpg)---- 一、开始前的准备有兴趣看到这里的,我相信你一定是“精通”java开发的码农,那么此时你拥有的JDK版本一定不低于1.6吧,那就ok了。同时,本例的项目是一个maven项目,因此我一起给出相关依赖:xml io.netty netty-buffer 4.1.16.Final io.netty netty-all 4.1.16.Final io.netty netty-transport-native-epoll 4.1.16.Final----- 二、Let's Do IT为什么Netty官网不提供一个Hello World的协议?因为他们觉得世界上最简单的协议是DISCARD,就是忽略所有请求的协议。那么如何实现这么高冷的协议呢?其实你只要忽略所有请求即可。看到这里,你是不是有点心痛?想到了自己那高冷的女神对自己爱理不理?别方,等你学会了Netty框架,用它的设计哲学去俘获女神的芳心吧。即使失败了也没关系,因为你已经掌握了Netty,一个让你余生都可以很happy的框架。好了,不废话了,我们先通过Netty来一探这高冷协议的真面目。---前面说过,Netty框架在通信各个环节都增加了事件监听接口,那么我们只要实现这些接口的方法即可。ChannelInboundHandler接口就提供了一系列事件处理方法,但是一开始我们不需要实现所有接口,只要继承ChannelInboundHandlerAdapter即可。它是一个事件处理适配器,我们继承它来重写相关方法。javaimport io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class DiscardServerHandler extends ChannelInboundHandlerAdapter { //服务端接收到客户端的消息将会触发该方法 //服务端就是你的女神 //而你,就是客户端 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //你发给女神的任何消息,其实女神都收到了,她只是忽略了,很气是不是? ((ByteBuf) msg).release(); } //这里是通信发生异常时触发的方法 //一旦女神对你感到厌烦了,她会将你拉黑 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}我们重写了channelRead方法,任何时候只要收到消息都会执行该方法,本例子中我们收到的消息类型是ByteBuf。它是一个引用计数对象,会占用内存,因此每个Handler都有责任在处理完传给自己的消息之后通过调用方法释放它们。一般来说,我们会通过如下方法释放:java@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); }}----简单的高冷协议已经实现了,接下来我们需要写一个main方法来启动服务端。javaimport io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import tcpip.oomabc.com.tone.netty.discard.handler.DiscardServerHandler;public class DiscardServer { private int port; public DiscardServer(int port) { super(); this.port port; } public void run() { EventLoopGroup bossGroup new NioEventLoopGroup(); EventLoopGroup workGroup new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { //这里使用了高冷协议处理器 ch.pipeline().addLast(new DiscardServerHandler()); } }).option(ChannelOption.SOBACKLOG, 128) .childOption(ChannelOption.SOKEEPALIVE, true); //绑定一个端口并启动监听,表示可以接受客户端连接 ChannelFuture f serverBootstrap.bind(port).sync(); //一直等待,直到手动关闭 socket //这里不会关闭,因为一直线程一直等待在这里 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //手动方式优雅地关闭服务端,本例子中不会执行到这里,因为前面是一个等待状态 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { int port 8881; if(args.length 0) { port Integer.parseInt(args[0]); } new DiscardServer(port).run(); }}EventLoopGroup是一个多线程的事件处理器,通过轮询方式获得I/O操作事件。Netty为不同的传输场景提供了不同的事件处理器。例子中实例化了两个EventLoopGroup对象,一个是负责接收网络连接的boss,另一个是负责处理boss接受且传入的网络连接的worker。----恭喜你,你已经完成了一个高冷协议服务端,通过main方法启动服务端吧。然后我们通过telnet来测试一下。bash15:41 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% telnet localhost 8881Trying ::1...Connected to localhost.Escape character is '^]'.hello world服务端只有连接信息,没有任何消息响应:bash15:41:43.338 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 3276815:41:43.338 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 215:41:43.338 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 1615:41:43.338 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 815:41:43.349 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true15:41:43.351 [nioEventLoopGroup-3-1] DEBUG i.n.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@10ef63f9---- 二、其它有趣的互动高冷服务端并不能算是一个正常的服务端,因为它并没有互动,有的只是交流黑洞。就像你和你的女神之间,并不是一段正常的交往,有的只是一厢情愿。我想你肯定希望女神可以跟你偶尔互动一下,即使是一个问候也好。那么,接下来就让我们忘记高冷,重新实现一个可以阅读所有客户端消息并在你准备放弃的时候回复一句“再见”的服务端协议。没什么特别的,只是依旧是重写了channelRead:java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in (ByteBuf) msg; try { //读取客户端发送的信息并打印 byte[] array new byte[in.readableBytes()]; while (in.isReadable()) { in.readBytes(array); } String line new String(array); //去掉结尾的换行 line line.trim(); if(line ! null && line.startsWith("bye")) { String quit "bye!\n"; byte[] bytes quit.getBytes(); final ByteBuf time ctx.alloc().buffer(bytes.length); time.writeBytes(bytes); ChannelFuture channelFuture ctx.writeAndFlush(time); //女神很贴心,给你发出“bye”之后并没有马上关机,而是在确认你收到她的信息之后才下线 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ctx.close(); } }); } else { System.out.println("recevied : [" + line + "]"); } } finally { //释放这一部分内存; //可以使用 in.release(); ReferenceCountUtil.release(msg); } }你给女神发的消息:bash14:14 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% telnet localhost 8881Trying ::1...Connected to localhost.Escape character is '^]'.helloHave you had your dinner already?byebye! 这里很重要,这是女神发给你的Connection closed by foreign host.14:14 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% 女神的那边:bash14:14:25.656 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 3276814:14:25.656 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 214:14:25.657 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 1614:14:25.657 [nioEventLoopGroup-3-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 814:14:25.667 [nioEventLoopGroup-3-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.bytebuf.checkAccessible: true14:14:25.668 [nioEventLoopGroup-3-1] DEBUG i.n.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@574516d4recevied : [hello]recevied : [Have you had your dinner already?]---虽然只是跟你“bye”,但比起单相思,你与女神之间好歹有了互动,这是好消息,不是吗!不过,我们既不能骄傲,也不可气馁,迎难而上是我辈最后的倔强。接下来,我们就来实现另一个有更多互动的服务端,很有意思的互动,就看你的心态了。java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //这里把接收到的客户端信息原封不动的发回给客户端 ctx.write(msg); //只有调用flush,上面的信息才会返回给客户端 ctx.flush(); //或者调用该方法,直接将信息返回给客户端 //ctx.writeAndFlush(msg); }为什么说很有意思?因为女神在收到你的消息之后,基本没看就把消息复制一遍又发给了你。有些心态不好的小伙伴就会觉得,女神太没意思了,看都不看就会复述。但是,我知道你不会这么想,因为你很傻乐,都看看你给女神发了什么:bash14:18 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% telnet localhost 8881Trying ::1...Connected to localhost.Escape character is '^]'.hellohelloI miss youI miss youI love you.I love you.惊不惊喜,刺不刺激,你发啥女神发啥。女神居然对你说“她想你”、“她爱你”,我想此刻你已经开始考虑你和女神的小孩学区房的问题了吧?---坚持到底就是胜利,我们来看看如何跟女神有更进一步的沟通。java //channel一旦建立,便会触发该方法 //channelRead,需要客户端发送内容才会触发 //所以,该方法先于channelRead触发 @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { final String info "I'm waiting for you! But I'm going to bed. bye! \n"; byte[] bytes info.getBytes(); //根据字符串对应字节长度,分配空间 final ByteBuf time ctx.alloc().buffer(bytes.length); time.writeBytes(bytes); //这是一个异步动作,返回的对象可能在下一行代码时并未执行 final ChannelFuture f ctx.writeAndFlush(time); //添加一个监听,只有当上一步的writeAndFlush动作实际完成之后,才会触发该监听事件 f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("operationComplete. send : " + info); //这个future是上面 writeAndFlush 动作返回的 assert f future; //我们在该方法中,执行channel关闭操作,实际上,这个动作也返回ChannelFuture, //所以,在netty中,这个关闭动作也不是马上执行完成的 ctx.close(); } }); //上面我们添加了一个自定义的Listener,或者直接使用预定义的Listener,如下:// f.addListener(ChannelFutureListener.CLOSE); }再来看看你们的沟通:bash14:27 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% telnet localhost 8881Trying ::1...Connected to localhost.Escape character is '^]'.I'm waiting for you! But I'm going to bed. bye! Connection closed by foreign host.14:27 wjyuian@wjyuianMacBookPro /data/docker/dockerfile/testdocker% 果然,女神已经在线等你很久了,你上线的第一时间,她就给你发来了消息。虽然她发完消息就下线了,但是好歹是她主动给你发的,是不是很激动,看来你和女神的发展机会已经成熟了,要好好把握!---注:文章中出现的协议一词是由官方文档中「protocol」一词直接翻译而来,可能不是很准确,大家可以去官网阅读。[nettyHome]: https://netty.io/wiki/user-guide-for-4.x.html

    网络编程   Java   Netty   2019-05-06 浏览(1776) 有用(0) 阅读原文>> [原创]
  • Java网络编程之Netty学习(二)—— 简单RPC实现   

    前言本章实现一个最简单的RPC小工具,简单的不能称之为框架。它是在学习Netty框架时的一个简单应用,包含的常规知识点有动态代理、可重入锁、Java反射、序列化。+ 动态代理 根据接口定义,生成一个该接口的一个实现对象,这是一个代理对象。在消费者调用接口的某个方法的时候,我们可以在方法前后增加自定义逻辑。本例中,增加的逻辑就是封装接口信息、方法以及参数信息,然后通过RPC服务类将封装的好的信息传递给生产者,然后将返回结果返回给消费者,完成一个RPC调用过程。+ 可重入锁 通过ReentrantLock以及它的Condition属性对象来实现将Netty的NIO特性转为常规RPC的同步性。 ReentrantLock是一个可重入锁,它与在方法或者代码块上使用synchronized具有相同的并发特性和隐性监视器锁访问语义,不过,它具有更好的扩展性。这个锁被最后获得且未释放它的线程所拥有,一个线程如果尝试去获得一个未被任何线程拥有或者已被自己拥有的锁,都会立即成功。 关于synchronized的隐性监视器锁(参考其它文章): + 同步方法通过ACCSYNCHRONIZED关键字隐式的对方法进行加锁。当线程要执行的方法被标注上ACCSYNCHRONIZED时,需要先获得锁才能执行该方法。 + 同步代码块通过monitorenter和monitorexit执行来进行加锁。当线程执行到monitorenter的时候要先获得所锁,才能执行后面的方法。当线程执行到monitorexit的时候则要释放锁。 + 每个对象自身维护这一个被加锁次数的计数器,当计数器数字为0时表示可以被任意线程获得锁。当计数器不为0时,只有获得锁的线程才能再次获得锁,所以synchronized也是可重入锁。+ Java反射 Java反射机制可以让我们在编译期(Compile Time)之外的运行期(Runtime)获得任何一个类的字节码。包括接口、变量、方法等信息。还可以让我们在运行期实例化对象。 在本例的生产者中,需要根据消费者封装的调用方法详细信息和参数,通过反射机制动态创建执行方法对象,然后通过Method.invoke来调用生产者中实际的业务Service的方法,最终实现RPC的远程实际调用。+ 序列化 Java 提供了一种对象序列化的机制,该机制中,一个对象可以被表示为一个字节序列,该字节序列包括该对象的数据、有关对象的类型的信息和存储在对象中数据的类型。 将序列化对象写入文件之后,可以从文件中读取出来,并且对它进行反序列化,也就是说,对象的类型信息、对象的数据,还有对象中的数据类型可以用来在内存中新建对象。 整个过程都是 Java 虚拟机(JVM)独立的,也就是说,在一个平台上序列化的对象可以在另一个完全不同的平台上反序列化该对象。ObjectInputStream 和 ObjectOutputStream 是高层次的数据流,它们包含反序列化和序列化对象的方法。 在本例中,消费者的请求参数以及生产者的返回结果都需要在不同服务器间的网络上进行传输,所以需要将信息进行序列化和反序列化。 ![图片](https://oomabc.com/staticsrc/img/201811/25/15430775764853961c5b74bae4b7a999a84a4d9ff7cec.jpg)--- 组成部分+ 测试Service和实现这部分是常规的业务代码,本例中定义了接口IRpcService、简单实现RpcServiceImpl、接口参数RpcTestQuery以及接口返回对象RpcTestResult。 其中RpcTestQuery和RpcTestResult需要进行序列化,因为它们需要在不同的虚拟机中传输。+ 动态代理这部分定义了一个通用的动态代理方法类GeneralProxy和一个继承InvocationHandler的虚拟的代理处理类RpcConsumerInvocationHandler。为什么说它是虚拟的,因为通常的Handler子类里面会持有一个被代理的接口实现对象,然后通过Method.invoke来达到接口实现类上调用指定方法的目的。但是本例中,消费者端并没有接口的实现类,所以不存在一个实际上需要被代理的对象,所以我称它为虚拟的代理。+ Consumer消费者部分的代码,主要包括接口IRpcSender、工具类RpcConsumerService和Rpc通信类RpcConsumerChannelHandler。RpcConsumerChannelHandler类继承了Netty框架的ChannelInboundHandlerAdapter类,提供了基本的网络通信能力,同时实现了IRpcSender接口,使之具备了Rpc封装和调用能力。其中,就用到了ReentrantLock.condition相关的await和signal方法,来实现NIO的异步通信转同步功能(参考了dubbo的实现)。RpcConsumerService封装了Netty初始化连接,同时持有Rpc通信类RpcConsumerChannelHandler的一个实例对象;它提供一个公开的静态方法public static Object rpc(final RpcRequest param),作用就是给动态代理处理类RpcConsumerInvocationHandler提供一个远程Rpc调用方法,让Service代理可以封装Rpc相关参数并通过网络传递给生产者。+ Provider生产者模块比较简单,所以将Rpc实现对应的请求类RpcRequest和响应类RpcResponse放在这里进行描述。比较重要的就一个Netty通信处理类RpcProviderHandler。在RpcProviderHandler中,当收到消费者的通信信息之后,通过反射机制动态生成请求方法对象,然后在接口的实现类对象上调用该方法,最后将调用结果通过Netty传递给消费者。下面将详细介绍下各个部分的代码实现,还是一贯的做法,部分思路解释将会以注释的形式穿插在代码中。---- 常规Service参数类RpcTestQuery:java//接口测试的参数public class RpcTestQuery implements Serializable { private static final long serialVersionUID 4463774199555246841L; private String word; private int pageNo; private Integer pageSize; public RpcTestQuery() { } //getter setter}返回结果类RpcTestResult:java//接口测试返回结果public class RpcTestResult implements Serializable { private static final long serialVersionUID -3162083033618094261L; private String rpcResult null; private List keywords; public RpcTestResult() { } //getter setter}测试接口定义以及其实现:javapublic interface IRpcService { //一个测试接口 RpcTestResult testRpc(RpcTestQuery query);}//测试rpc的service接口实现类public class RpcServiceImpl implements IRpcService { @Override public RpcTestResult testRpc(RpcTestQuery query) { System.out.println("testRpc : " + JSON.toJSONString(query)); RpcTestResult rpcResult new RpcTestResult(); //随便一个处理逻辑,表示调用成功 rpcResult.setRpcResult("这是一个基于netty的rpc小例子返回的结果"); Integer pageSize query.getPageSize(); pageSize pageSize null ? 20 : pageSize; int count 0; List words new ArrayList(pageSize); while(count ++ T newProxyInstanceWithoutInstance(Class clazz, InvocationHandler invocationHandler) { return newProxyInstanceByJDK(GeneralProxy.class.getClassLoader(), new Class[] {clazz}, invocationHandler);} @SuppressWarnings("unchecked")private static T newProxyInstanceByJDK(ClassLoader loader, Class[] interfaces, InvocationHandler invocationHandler) { if(interfaces null interfaces.length 0) { throw new RuntimeException("no interfaces"); } //这里就是使用JDK的Proxy类根据接口来生成其代理对象 return (T) Proxy.newProxyInstance(loader, interfaces, invocationHandler);}2. CGLIB动态代理,可以实现普通类的动态代理,但是不能代理final方法,会直接调用被代理的final方法。java@SuppressWarnings("unchecked")private static T createByCglib(Class clazz, GeneralHandler handler) { Enhancer enhancer new Enhancer(); //指定被代理对象的class即可,理论上,这里只要知道类签名即可,通过反射获得class enhancer.setSuperclass(clazz); //这里就是代理对象的invoke实现类,其功能类似JDK动态代理的InvocationHandler的实现类 enhancer.setCallback(handler); //生成对应的代理对象 T cglibService (T) enhancer.create(); return cglibService;}---- 消费者接口IRpcSender,定义了一个Rpc封装和调用方法:java/ 封装rpc接口 使之拥有rpc的基本信息封装能力和向生产者的调用能力 @author wjyuian /public interface IRpcSender { //暂时就一个方法,调用参数,返回结果;内部通过代理和netty与provider通信 RpcResponse send(RpcRequest param);} 消费者端最重要的一个类RpcConsumerChannelHandler它实现了IRpcSender接口,并继承ChannelInboundHandlerAdapter。承载了将Rpc调用参数进行封装并通过网络传递给生产者服务器,同时将异步通信转为同步并将生产者的处理结果返回给调用者的重任。java/ consumer端的ChannelHandler 继承 ChannelInboundHandlerAdapter,提供netty的通信能力 实现 IRpcSender 接口,实现发送rpc请求功能 @author wjyuian /public class RpcConsumerChannelHandler extends ChannelInboundHandlerAdapter implements IRpcSender { //持有一个通信处理上下文 private ChannelHandlerContext ctx; / 通过lock,done以及response的操作,将netty的nio调用转为同步rpc调用 / // 可重入锁,同一个线程可以重复获得 private final Lock lock new ReentrantLock(); private final Condition done lock.newCondition(); private volatile RpcResponse response null; public RpcConsumerChannelHandler() { } // 结果已经返回 private boolean isDone() { return response ! null; } @Override public RpcResponse send(RpcRequest param) { //重置response为null,因为本例中RpcConsumerChannelHandler对象被复用 //所以本例只实现了简单的rpc功能,不具备并发能力 response null; RpcLogUtil.log("RpcHandler send param, " + JSON.toJSONString(param)); // 分配内存信息 ByteBuf byteBuf ByteBufUtilsForSsdb.allocByteBufForPojo(ctx, param); // 立即将rpc调用信息传递给provider ctx.writeAndFlush(byteBuf); long timeout 3000; // 以下代码是dubbo中实现的nio转同步的方式 if (!isDone()) { long start System.currentTimeMillis(); // 可重入锁 lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() System.currentTimeMillis() - start timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { // 没有得到返回结果,超时 return null; } } // 返回结果 return response; } // 与生产者成功建立socket连接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { RpcLogUtil.log("consumer与provider端建立连接成功,channelActive"); // 初始化通信上下文对象 this.ctx ctx; } // 生产者将rpc调用结果返回给消费者时将会触发该方法 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf in (ByteBuf) msg; // 服务端返回结果,调用工具类读取信息 RpcResponse rpcResult ByteBufUtilsForSsdb.readPojoFromByteBuf(in, RpcResponse.class); // 得到结果之后,唤醒之前wait的对象 doReceived(rpcResult); } catch (Exception e) { e.printStackTrace(); } finally { // 读取结果之后,必须要释放内存引用;上一章有提到过这一点 ReferenceCountUtil.refCnt(msg); } } / 当netty的channelRead方法被触发得到服务端返回结果时调用; 作用是,设置返回结果,并唤醒之前发送参数是被阻塞的方法,以达到异步转同步的效果 / private void doReceived(RpcResponse res) { // 可重入锁,实现不同线程间的同步访问控制 lock.lock(); try { response res; if (done ! null) { // 唤醒 done.signal(); } } finally { lock.unlock(); } }} RpcConsumerService.java这是一个通信工具类,负责初始化Netty连接,并提供一个静态方法,实现Rpc封装、调用、返回结果。java/ consumer服务代理对象 整合了netty初始化,通过 ChannelHandler 来实现异步通信的交互 @author wjyuian /public class RpcConsumerService { private ChannelHandler channelHandler; private AtomicBoolean init new AtomicBoolean(false); private static final RpcConsumerService INSTANCE new RpcConsumerService(); private RpcConsumerService() { this.channelHandler new RpcConsumerChannelHandler(); } private ChannelFuture init(final String host, final int port) { RpcLogUtil.log("consumer初始化netty连接"); //异步方式建立连接,然后通过get实现同步,这里似乎是没有必要,当时想多了 //之所以没有修改,是为了记录当时的奇怪想法 //因为这里初始化netty连接,本身就是sync同步的。下面注释中给出了修改后的代码 Future future Executors.newFixedThreadPool(1).submit(new Callable() { @Override public ChannelFuture call() throws Exception { EventLoopGroup workerGroup new NioEventLoopGroup(); try { Bootstrap bootstrap new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SOKEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(channelHandler); } }); //同步方式简历连接 ChannelFuture ch bootstrap.connect(host, port).sync(); RpcLogUtil.log("bootstrap.connect(\"{}\", {}).sync()", host, port); getInit().set(true); return ch; } catch (Exception e) { e.printStackTrace(); } return null; } }); try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } / private ChannelFuture init(final String host, final int port) { RpcLogUtil.log("consumer初始化netty连接"); EventLoopGroup workerGroup new NioEventLoopGroup(); try { Bootstrap bootstrap new Bootstrap(); bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SOKEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(channelHandler); } }); //同步方式建立连接 ChannelFuture ch bootstrap.connect(host, port).sync(); RpcLogUtil.log("bootstrap.connect(\"{}\", {}).sync()", host, port); getInit().set(true); return ch; } catch (Exception e) { e.printStackTrace(); } return null; } / //暴露的rpc方法 public static Object rpc(final RpcRequest param) { if (!INSTANCE.getInit().get()) { //如果netty还没有初始化,则会先进行初始化 synchronized (RpcConsumerService.class) { ChannelFuture ch INSTANCE.init("localhost", 8881); ch.addListener(new ChannelFutureListener() { //这里是初始化成功之后的回调 //初始化本身是同步的,但是初始化到回调,是异步的 @Override public void operationComplete(ChannelFuture future) throws Exception { RpcLogUtil.log("consumer netty connected,operationComplete"); } }); } } RpcLogUtil.log("RpcNettyServiceProxy do rpc, " + JSON.toJSONString(param)); //获得封装了netty,具有与provider通信能力的对象 ChannelHandler handler INSTANCE.getChannelHandler(); IRpcSender rpcSender (IRpcSender) handler; //rpcSender封装了netty,通过ChannelHandlerContext发送请求 //调用接口并获得Rpc处理结果 RpcResponse rpcResult rpcSender.send(param); return rpcResult.getRpcData(); } public ChannelHandler getChannelHandler() { return channelHandler; } public AtomicBoolean getInit() { return init; } public void setInit(AtomicBoolean init) { this.init init; }}最后一个是整合上述类的动态代理类:java/ JDK动态代理 @author wjyuian /public class RpcConsumerInvocationHandler implements InvocationHandler { // 生成方法签名,主要是用于provider端选择service,暂时未使用 private String generateSign(Method method, Object[] args) { Class interfaceClass method.getDeclaringClass(); String methodName method.getName(); StringBuilder builder new StringBuilder(); builder.append(interfaceClass.getName()).append(".").append(methodName).append("("); Class[] parameterTypes method.getParameterTypes(); if (parameterTypes ! null && parameterTypes.length 0) { int index 0; for (Class parameterClass : parameterTypes) { if (index++ 1) { builder.append(","); } builder.append(parameterClass.getName()); } } builder.append(")") String sign builder.toString(); return sign; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String sign generateSign(method, args); RpcLogUtil.log("生成方法签名:" + sign); //Rpc请求参数,它与response参数定义写在了Provider端 RpcRequest param new RpcRequest(); param.setRpcSign(sign); //接口参数 param.setArgs(args); //接口class param.setClassName(method.getDeclaringClass().getName()); //方法名 param.setMethodName(method.getName()); //参数类型 param.setMethodParameters(method.getParameterTypes()); // 将方法签名和序列化参数,通过netty传递给服务端,服务端执行方法之后,返回结果obj RpcLogUtil.log("在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc"); Object obj RpcConsumerService.rpc(param); if (obj null) { return null; } Class returnType method.getReturnType(); //返回接触一致性判断 if (obj.getClass().equals(returnType)) { return obj; } return null; }}---- 生产者Rpc请求对象和响应对象:java/ rpc级别定义的参数类; 封装签名、类完整名、方法名、参数、返回结果类型 后续可能需要新增其它参数,例如请求ID标识、请求时间、接口校验等 @author wjyuian /public class RpcRequest implements Serializable { private static final long serialVersionUID -3734098094101234697L; private String rpcSign;//签名信息 //接口类名 private String className; //方法名 private String methodName; //方法签名的参数信息 private Class[] methodParameters; //方法传递的参数信息 private Object[] args; public RpcRequest() { super(); } //getter setter}//rpc级别的响应结果//后续可以增加其它返回,例如响应时间、响应服务器信息public class RpcResponse implements Serializable { private static final long serialVersionUID -2403718129501702483L; private Object rpcData; public RpcResponse() { //getter setter}同样的,生产者端使用Netty也需要一个ChannelInboundHandlerAdapter子类,来处理消费者发送的Rpc请求信息,并通过JDK动态代理调用本地的实际接口实现类,然后将接口返回结果封装成Rpc响应对象并发送给消费者。java//provider端,封装了netty的通信处理方式public class RpcProviderHandler extends ChannelInboundHandlerAdapter { // 接口的实际实现类对象,类似注入的service实例 private IRpcService rpcService new RpcServiceImpl(); //provider端接收consumer传递的请求参数,封装的RpcParameter对象 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf in (ByteBuf) msg; //获得netty接收的参数,这里是RpcRequest对象信息, //包含了接口、方法、参数等 RpcRequest rpcParameter ByteBufUtilsForSsdb.readPojoFromByteBuf(in, RpcRequest.class); if(rpcParameter null) { throw new RuntimeException("rpc exception"); } //通过反射获得rpc对应的方法 Class m Class.forName(rpcParameter.getClassName()); Method declaredMethod m.getDeclaredMethod(rpcParameter.getMethodName(), rpcParameter.getMethodParameters()); //调用生产者注入的service Object object declaredMethod.invoke(rpcService, rpcParameter.getArgs()); //封装结果 RpcResponse rpcResult new RpcResponse(); rpcResult.setRpcData(object); ByteBuf byteBuf ByteBufUtilsForSsdb.allocByteBufForPojo(ctx, rpcResult); //将接口返回结果包装成rpc对象,传递给调用端 ctx.writeAndFlush(byteBuf); } catch (Exception e) { e.printStackTrace(); } finally { //释放引用 ReferenceCountUtil.release(msg); } }} 测试Rpc至此,这个基于Netty的简单Rpc工具已经编码完成,接下来就是进行简单的测试了。首先写一个RpcProviderTest类,来模拟服务端启动。java/ rpc小框架测试,服务端模拟 @author wjyuian /public class RpcProviderTest { private int port; private RpcProviderTest(int port) { super(); this.port port; } public void run() { EventLoopGroup bossGroup new NioEventLoopGroup(); EventLoopGroup workGroup new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { //这里使用了Provider端的通信处理类 ch.pipeline().addLast(new RpcProviderHandler()); } }) .option(ChannelOption.SOBACKLOG, 128) .childOption(ChannelOption.SOKEEPALIVE, true); //绑定一个端口并启动监听,表示可以接受客户端连接 ChannelFuture f serverBootstrap.bind(port).sync(); //一直等待,直到手动关闭 socket //这里不会关闭,因为一直线程一直等待在这里 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //手动关闭服务端 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { //服务端绑定的端口 int port 8881; //启动provider端,模拟rpc服务提供者 new RpcProviderTest(port).run(); }}然后就差最后一步啦,写一个消费者模拟代码就行了。java/ rpc小框架测试,客户端模拟 @author wjyuian /public class RpcConsumerTest { public static void main(String[] args) { // 通过JDK的动态代理方式,初始化一个目标对象实例,在 RpcConsumerInvocationHandler 整合了rpc逻辑 //不需要代理已存在的对象实例 IRpcService nettyService GeneralProxy.newProxyInstanceWithoutInstance(IRpcService.class, new RpcConsumerInvocationHandler()); RpcTestQuery query new RpcTestQuery(); query.setWord("测试基于netty RPC"); query.setPageNo(1); query.setPageSize(20); // 包含netty连接初始化,这里就不计算时间 RpcTestResult testRpc nettyService.testRpc(query); // 统计rpc耗时 long b System.currentTimeMillis(); query.setWord("计时rpc"); testRpc nettyService.testRpc(query); System.out.println("rpc cost : " + (System.currentTimeMillis() - b)); System.out.println("以下内容是rpc接口获得的:"); List keywords testRpc.getKeywords(); System.out.println(testRpc.getRpcResult()); System.out.println(keywords); }}先启动生产者模拟,然后执行消费者模拟的main方法,可以看到如下日志(删除了netty启动、连接相关无关日志)。生产者端:shell 两条rpc处理日志,因为消费者端调用了两次接口,传入了不同的word参数值,表示生产者成功接收了参数,并在实际接口实现类中进行了处理testRpc : {"pageNo":1,"pageSize":20,"word":"测试基于netty RPC"}testRpc : {"pageNo":1,"pageSize":20,"word":"计时rpc"}消费者端:shell[RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery) 首先在动态代理对象中执行了目标方法:调用通信工具类的rpc方法[RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc 由于采用了延迟加载,所以第一次调用的时候,netty连接并未建立,所以需要初始化netty连接[RPC] consumer初始化netty连接 省略了连接日志 netty连接成功,首先触发了channelActive方法[RPC] consumer与provider端建立连接成功,channelActive[RPC] bootstrap.connect("localhost", 8881).sync() 其次调用了创建连接的监听事件方法[RPC] consumer netty connected,operationComplete 成功建立netty连接之后,就可以实际调用通信工具类的rpc方法了,打印了参数内容[RPC] RpcNettyServiceProxy do rpc, {"args":[{"pageNo":1,"pageSize":20,"word":"测试基于netty RPC"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"} 通过netty,将rpc请求对象发送给生产者[RPC] RpcHandler send param, {"args":[{"pageNo":1,"pageSize":20,"word":"测试基于netty RPC"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"} 省略了netty的通信日志 由于我们没有在消费者端的netty通信处理类的channelRead方法中添加日志,所以这里未打印响应结果,大伙可以自己添加更详细的日志,来追踪整个处理过程 下面是第二次进行rpc接口调用,所以不会有netty初始化日志[RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)[RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc[RPC] RpcNettyServiceProxy do rpc, {"args":[{"pageNo":1,"pageSize":20,"word":"计时rpc"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"}[RPC] RpcHandler send param, {"args":[{"pageNo":1,"pageSize":20,"word":"计时rpc"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"} 由于是本机模拟生产者和消费者,所以整个rpc调用过程耗时只有3ms。rpc cost : 3 下面是消费者端获得rpc结果之后打印的。以下内容是rpc接口获得的: 这一行内容表示,这次获得的结果确实是第二次传过去的参数计时rpc,而且经过了实际接口实现类的处理逻辑计时rpc / 这是一个基于netty的rpc小例子返回的结果[rpc1, rpc2, rpc3, rpc4, rpc5, rpc6, rpc7, rpc8, rpc9, rpc10, rpc11, rpc12, rpc13, rpc14, rpc15, rpc16, rpc17, rpc18, rpc19, rpc20] 小结就这样,基于Netty的简单Rpc功能已经实现并调试完成,但是这只是一个简单的例子,许多Rpc的基础功能都还没有,需要大家自行进一步扩展。还存在的问题:1. 并发性:本例中许多代码不具备并发性,甚至无法支持两个并发。2. 内存:本例中许多对象有复用或者单例的问题,特别是Netty通信的使用还不熟,所以只是实现了功能。如果需要实现简单的生产使用,需要考虑哪些对象是可以复用,哪些是完全不能复用,哪些是需要缓存,哪些是需要加锁等等一系列问题。3. 链路:本例并未提供一种机制来实现Rpc调用链路追踪,但这是非常重要的。4. 负债均衡:本例的代码也未提供负债均衡实现,甚至没有预留负债均衡扩展点。所以,实现Rpc功能和部署生产使用之间需要解决相当多的问题,大家共勉。

    Netty   Java   RPC   2019-05-06 浏览(1692) 有用(0) 阅读原文>> [原创]
  • Java网络编程之Netty学习(三)—— RPC的服务注册、发现、降级   

    前言上一篇[《Java网络编程之Netty学习(二)—— 简单RPC实现 》][prevLink] 简单介绍了如何使用netty实现远程过程调用。不过在很多方面都存在着问题,包括并发性、内存使用、负债均衡、服务发现、服务注册、服务降级。本章以及后续将会出现的章节,旨在逐步解决这些问题,当然解决这些问题的方式、方法都会是自己思考并结合dubbo相关源码。主要是通过这一系列的实践来学习Netty、dubbo等技术框架的应用。本章主要介绍如何通过zookeeper来实现Provider的注册、Counsumer订阅、本地降级。代码都是在本系列前置文章的源码的基础之上修改的,后续会逐步进行代码重构。经整理,本章内容的主要信息点如下:1. 获得provider服务器ip2. Zookeeper客户端curator的使用3. Zookeeper节点监听以及回调4. mock接口本地简单判断![图片](https://oomabc.com/staticsrc/img/201811/25/15430775764853961c5b74bae4b7a999a84a4d9ff7cec.jpg)--- 知识点介绍 如何获得本机的IP地址这里指的是获得服务器的IP地址,而不是获得请求客户端的IP地址。一般情况下,我们如果想获得客户端的IP地址,通常会从HttpServletRequest对象中获取,比如:java private static final String NGINXIPHEADER "X-Real-IP"; private static final String NGINXXForwardedFor "X-Forwarded-For"; / 功能描述: 获取ip(兼容nginx转发) @param request @return / public static String getIpAddr(HttpServletRequest request) { String ips request.getHeader(NGINXXForwardedFor); String[] ipArray org.apache.commons.lang3.StringUtils.split(ips, ","); if (ArrayUtils.isNotEmpty(ipArray)) { return org.apache.commons.lang3.StringUtils.trim(ipArray[0]); } else { String ip request.getHeader(NGINXIPHEADER); if (StringUtils.isEmpty(ip)) { ip request.getRemoteAddr(); } return ip; } }这个方法兼容了Nginx转发之后的IP传递,因为我们通常会在Nginx转发请求的时候,设置原始客户端的IP。恰巧是通过X-Forwarded-For和X-Real-IP两个变量传递的,因此上面的java代码中就是通过从Request对象中获取这两个变量来达到获取客户端原始IP的。bash server { listen 443 ssl; servername oomabc.com www.oomabc.com; ssl on; root /home/admin/run/deploy/ROOT; ......... location / { 我们先看看这里有个X-Forwarded-For变量, 这是一个squid开发的,用于识别通过HTTP代理或负载平衡器原始IP一个连接到Web服务器的客户机地址的非rfc标准, 如果有做X-Forwarded-For设置的话,每次经过proxy转发都会有记录,格式就是client1, proxy1, proxy2,以逗号隔开各个地址, 由于他是非rfc标准,所以默认是没有的,需要强制添加,在默认情况下经过proxy转发的请求,在后端看来远程地址都是proxy端的ip 。 也就是说在默认情况下我们使用request.getAttribute("X-Forwarded-For")获取不到用户的ip,如果我们想要通过这个变量获得用户的ip,我们需要自己在nginx添加如下配置: proxysetheader X-Forwarded-For $proxyaddxforwardedfor; $proxyaddxforwardedfor变量包含客户端请求头中的"X-Forwarded-For",与$remoteaddr两部分,他们之间用逗号分开。 还有一个$httpxforwardedfor变量,这个变量就是X-Forwarded-For, 由于之前我们说了,默认的这个X-Forwarded-For是为空的,所以当我们直接使用proxysetheader X-Forwarded-For $httpxforwardedfor时会发现, web服务器端使用request.getAttribute("X-Forwarded-For")获得的值是null。 proxysetheader X-Real-IP $remoteaddr; 将通过nginx获取到用户IP,被添加到header中,key为X-Real-IP,java中通过request.getHeader("X-Real-IP")获得该IP proxypass http://localhost:8081; proxysetheader Host $httphost; proxysetheader X-Forwarded-For $proxyaddxforwardedfor; proxysetheader X-Real-IP $remoteaddr; root html; index index.html index.htm; } }然鹅,这并不是本章的目标,这里需要的功能是获得服务器自身的局域网IP地址。以下获取IP方法,参考了dubbo中的实现。首先,我们可以通过InetAddress对象的静态方法InetAddress.getLocalHost来获得本机配置的IP信息。比如,Linux系统通常会在/etc/hosts中配置:shell[root@date-backup2 ~] more /etc/hosts127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4::1 localhost localhost.localdomain localhost6 localhost6.localdomain6测试该方法获取的就是localhost对于的IP,如果此配置文件配置的IP地址是不正确的,那这里获取的也是不对的。那么,在InetAddress.getLocalHost无法获得IP的情况下,我们还可以使用Java自带的另一个网络编程类NetworkInterface来获得真实的IP地址。getNetworkInterfaces方法会返回机器上所有的网络接口(网卡)信息,包括物理接口和虚拟接口。通过遍历所有网卡以及网卡对应的地址信息,返回第一个符合IP规则(IPv4、IPv6)的地址。它的核心方法如下:javapublic static final String LOCALHOST "127.0.0.1";public static final String ANYHOST "0.0.0.0";public static final Pattern IPPATTERN Pattern.compile("\\d{1,3}(\\.\\d{1,3}){3,5}$");private static boolean isValidAddress(InetAddress address) { if (address null address.isLoopbackAddress()) return false; String name address.getHostAddress(); //排除服务器默认配置的0.0.0.0,或者127.0.0.1;而且符合IP地址规则的(包括IPv4,IPv6) return (name ! null && !ANYHOST.equals(name) && !LOCALHOST.equals(name) && IPPATTERN.matcher(name).matches());} private static InetAddress getLocalAddress0() { InetAddress localAddress null; try { localAddress InetAddress.getLocalHost(); //如果是合理的ip,则直接返回 if (isValidAddress(localAddress)) { return localAddress; } } catch (Throwable e) { logger.warn("Failed to retriving ip address, " + e.getMessage(), e); } try { //Java网络接口,获取所有网络接口信息,包括物理的、虚拟的 Enumeration interfaces NetworkInterface.getNetworkInterfaces(); if (interfaces ! null) { while (interfaces.hasMoreElements()) { try { NetworkInterface network interfaces.nextElement(); Enumeration addresses network.getInetAddresses(); if (addresses ! null) { while (addresses.hasMoreElements()) { try { InetAddress address addresses.nextElement(); if (isValidAddress(address)) { return address; } } catch (Throwable e) { logger.warn("Failed to retriving ip address, " + e.getMessage(), e); } } } } catch (Throwable e) { logger.warn("Failed to retriving ip address, " + e.getMessage(), e); } } } } catch (Throwable e) { logger.warn("Failed to retriving ip address, " + e.getMessage(), e); } logger.error("Could not get local host ip address, will use 127.0.0.1 instead."); return localAddress;}看到这里,有人会发现一个问题:既然NetworkInterface.getNetworkInterfaces可以获得真实IP地址,而且不会有偏差,为何dubbo会将通过InetAddress.getLocalHost获得IP这个有误差的方法作为第一选择呢?我想,其实有两点:1. /etc/hosts文件作为服务器重要的基础配置,如果是默认配置,那么dubbo已经将0.0.0.0和127.0.0.1过滤了;而如果是用户修改过的配置,那么配置的正确性将有用户自行保证。2. 如果需要进行跨机房实现dubbo-rpc功能,那么这里就可以配置服务器的出口IP地址,此时服务注册的IP地址就是我们希望的IP,而不是局域网IP。(这一点是我猜测的)--- Curator框架的使用关于Curator的基础使用方式,已经在之前的系列文章[《从零开发参数同步框架(二)—— 前期准备之工具类》] [prevLink1]中介绍过了,这里就不做过多说明,直接给出本例中涉及的部分重要代码。主要就是服务注册和订阅以及发现功能的工具类:java//参数框架工具类public class GgFrameworkUtil extends GgFrameworkAbstract { // 标记client是否已经初始化 private final static AtomicBoolean isInit new AtomicBoolean(false); //框架在Zookeeper上的根目录,防止和其它用户冲突 public static final String RPCROOT "/testrpc"; // provider public static final String PROVIDER "providers"; // provider注册信息的缓存 private static final List CACHEDSERVICELIST new ArrayList(10); // consumer public static final String CONSUMER "consumers"; // consumer订阅信息的缓存 private static final List CACHEDCONSUMERLIST new ArrayList(10); // 注册zk地址,命名前缀解析 private static final String PREFIX "://"; // zk连接客户端,curator框架 private static CuratorFramework CLIENT null; public static boolean isFramewordInit() { return isInit.get(); } / 根据指定zookeeper服务地址,初始化zookeeper连接 / public static void initCuratorFramework(String zkHost) { if (CLIENT null) { CLIENT init(zkHost); } } // 格式化地址写法 private static String cleanZkHost(String host) { int index -1; if (StringUtils.isNotBlank(host) && (index host.indexOf(PREFIX)) 0) { return host.substring(index + PREFIX.length()); } return host; } / 根据Zookeeper地址,初始化Curator实例对象 @param zookeeperHost zookeeper地址,多个地址:zk1:port1,zk2:port2,zk3:port3 @return / private static CuratorFramework init(String zookeeperHost) { if (StringUtils.isBlank(zookeeperHost)) { log(LOGPREV, "[ERROR] curator client init failed as zookeeperHost is null"); return null; } final String dubboregistry cleanZkHost(zookeeperHost); CuratorFramework client CuratorFrameworkUtil.getInstance(dubboregistry); log(LOGPREV, "add ConnectionStateListener to client, zkHost {}", new Object[] { zookeeperHost }); // 添加客户端监听 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { // 客户端连接zk节点之后,连接状态变动时的处理 @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if (state null) { log(LOGPREV, "can not get connection state, zkHost {}", dubboregistry); return; } switch (state) { case CONNECTED:// 连接成功 isInit.set(true); log(LOGPREV, "connect to zookeeper successfully, zkHost {}", dubboregistry); registerProviderAndConsumers(); break; case SUSPENDED:// 连接挂起 log(LOGPREV, "connection is suspended, zkHost {}", dubboregistry); break; case RECONNECTED:// 挂起,丢失,只读的连接,被重新唤起 log(LOGPREV, "connection is reconnected, zkHost {}", dubboregistry); registerProviderAndConsumers(); break; case LOST:// 连接丢失 log(LOGPREV, "connection is lost, zkHost {}", dubboregistry); break; default:// 连接只读 log(LOGPREV, "connect to zookeeper failed, zkHost {}", dubboregistry); break; } } }); // 启动连接 CuratorFrameworkState state client.getState(); // 重复启动会报错:Cannot be started more than once if (state ! null && state ! CuratorFrameworkState.STARTED) { log(LOGPREV, "curator client start to connected zookeeper, zkHost {}", new Object[] { zookeeperHost }); client.start(); } return client; } //zk客户端一旦初始化成功,系统会立即将之前准备注册的服务以及需要订阅的接口,全部在zk接口进行绑定 private static void registerProviderAndConsumers() { if (CACHEDSERVICELIST ! null) { synchronized (CACHEDSERVICELIST) { if (!CollectionUtils.isEmpty(CACHEDSERVICELIST)) { for (RpcServiceRegister service : CACHEDSERVICELIST) { registerServiceToZk(service); } CACHEDSERVICELIST.clear(); } } } if (CACHEDCONSUMERLIST ! null) { synchronized (CACHEDCONSUMERLIST) { if (!CollectionUtils.isEmpty(CACHEDCONSUMERLIST)) { for (RpcInterfaceRegisterBean service : CACHEDCONSUMERLIST) { registerInterfaceToZk(service); } CACHEDCONSUMERLIST.clear(); } } } } public static CuratorFramework getClient() { return CLIENT; } / 节点创建确认 如果节点不存在,则创建 / private static void createIfNotExists(String nodePath) { try { if (CLIENT ! null && CLIENT.checkExists().forPath(nodePath) null) { String s CLIENT.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(nodePath, "node".getBytes()); log(LOGPREV, "[GgFramework] create zk node, path {}, s {}", nodePath, s); } } catch (Exception e) { e.printStackTrace(); } } // 注册provider,rpc层面,如果zk客户端未初始化成功,则会暂存 public static synchronized void registerServiceToRpc(RpcServiceRegister providerService) throws Exception { if (isInit.get()) { registerServiceToZk(providerService); } else { CACHEDSERVICELIST.add(providerService); } } // 注册consumer,rpc层面 public static synchronized void registerInterfaceToRpc(RpcInterfaceRegisterBean consumeService) throws Exception { if (isInit.get()) { registerInterfaceToZk(consumeService); } else { CACHEDCONSUMERLIST.add(consumeService); } } // 实际将provider注册到zk节点 private static void registerServiceToZk(RpcServiceRegister providerService) { try { String path buildPathString(RPCROOT, providerService.getClassName(), PROVIDER, SSDBCoderUtil.encodeBase64(providerService.getRegisterInfo())); createIfNotExists(path); byte[] bs SSDBCoderUtil.encode(providerService); RpcLogUtil.log("provider注册到Zookeeper, {} : {}", new Object[] { path, bs }); getClient().setData().forPath(path, bs); } catch (Exception e) { e.printStackTrace(); } } // 注册consumer到zk节点,并监听节点上是否有provider注册上来 private static void registerInterfaceToZk(RpcInterfaceRegisterBean consumeService) { try { String path buildPathString(RPCROOT, consumeService.getClassName(), CONSUMER, SSDBCoderUtil.encodeBase64(consumeService.getRegisterInfo())); createIfNotExists(path); RpcLogUtil.log("consumer注册到Zookeeper并监听服务节点, " + path); getClient().setData().forPath(path, SSDBCoderUtil.encode(consumeService)); //这里创建的是子节点监听事件;当前节点或者其子节点发生变动,会触发事件 final PathChildrenCache nodeCache createConsumerNodeCache(consumeService); nodeCache.start(); // 绑定事件 nodeCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { ChildData data event.getData(); createIfNotExists(data.getPath()); byte[] bs client.getData().forPath(data.getPath()); //此处只考虑了单consumer和单provider;否则需要判断触发节点事件的来源到底是provider还是其他consumer RpcServiceRegister service SSDBCoderUtil.decode(bs); switch (event.getType()) { case CHILDADDED: // 子节点被添加,这里是服务注册成功 RpcLogUtil.log("provider 注册成功 {}", new Object[] {data.getPath()}); break; case CHILDREMOVED: // 子节点被删除 RpcLogUtil.log("provider 宕机 {}", new Object[] {data.getPath()}); break; case CHILDUPDATED: // 子节点数据变化 RpcLogUtil.log("provider 信息变动 {}", new Object[] {data.getPath()}); break; default: break; } if(service ! null) { RpcLogUtil.log("provider 信息 :{}, {} - {}:{}", new Object[] { service.getClassName(), service.getInstanceName(), service.getHost(), service.getPort() }); RpcConsumerService.setProviderHostAndPort(service.getHost(), service.getPort()); } } }, Executors.newFixedThreadPool(1)); } catch (Exception e) { e.printStackTrace(); } } / consumer节点,需要监听的是provider变动 / private static PathChildrenCache createConsumerNodeCache(RpcInterfaceRegisterBean consumeService) { String path buildPathString(RPCROOT, consumeService.getClassName(), PROVIDER); createIfNotExists(path); return new PathChildrenCache(GgFrameworkUtil.getClient(), path, false); }}--- 节点的监听及回调部分读者可能已经发现,节点的监听和回调方法已经出现在前一节中,就是方法registerInterfaceToZk。--- 本地降级我这做的比较简单,就是判断consumer的netty是否初始化成功,未初始化则进行降级处理。降级的实现类也与dubbo类似,需要在interface相同目录下增加一个interfaceMock(读:mark)实现:javapublic class IRpcServiceMock implements IRpcService { @Override public RpcTestResult testRpc(RpcTestQuery query) { RpcTestResult result new RpcTestResult(); result.setRpcResult("this is a mock result"); return result; }}然后在消费者端工具类增加简单判断:java// 是否降级// 暂时这里只考虑netty是否成功初始化来确定// 未实现:provider从正常到宕机的情况,目前的测试代码结构比较混乱,这个功能在后续代码结构升级子再实现。public static boolean isMock() { return !INSTANCE.init.get();}通过反射实现降级:java private static final Map MOCKSERVICE new HashMap(); //获得降级结果 private static Object getRpcMockResult(RpcRequest param) { try { long begin System.nanoTime(); String mockServiceClassName param.getClassName() + "Mock"; RpcMockServiceBean mockServiceBean MOCKSERVICE.get(mockServiceClassName); if (mockServiceBean null) { Class m Class.forName(mockServiceClassName); if (m null) { // 没有实现mock,需要抛错,这里先返回null return null; } mockServiceBean new RpcMockServiceBean(m, m.newInstance()); MOCKSERVICE.put(mockServiceClassName, mockServiceBean); } // 反射获得方法 Method declaredMethod mockServiceBean.getClazz().getDeclaredMethod(param.getMethodName(), param.getMethodParameters()); // 调用mock的service Object mockResult declaredMethod.invoke(mockServiceBean.getInstance(), param.getArgs()); RpcLogUtil.log("mock service result, cost : {} 纳秒", new Object[] { System.nanoTime() - begin }); return mockResult; } catch (Exception e) { e.printStackTrace(); } return null; }--- 消费者端通信工具类修改上一章介绍过的工具类,本章增加了降级处理、异步初始化化netty和服务发现回调功能。下面还是会老样子,直接提出后修改后的代码,部分重要逻辑在代码注释中体现。我实在是不善于进行书面描述,即使描述了也是流水账。javapublic class RpcConsumerService { private ChannelHandler channelHandler; private AtomicBoolean init new AtomicBoolean(false); private static final RpcConsumerService INSTANCE new RpcConsumerService(); private RpcConsumerService() { this.channelHandler new RpcConsumerChannelHandler(); } private ChannelFuture init(final String host, final int port) { //。。。该方法不变 } private static final Map MOCKSERVICE new HashMap(); //获得降级结果 private static Object getRpcMockResult(RpcRequest param) { //降级代码,上面已给出 } // 是否降级 // 暂时这里只考虑netty是否成功初始化来确定 // 未实现:provider从正常到宕机的情况,目前的测试代码结构比较混乱,这个功能在后续代码结构升级子再实现。 public static boolean isMock() { return !INSTANCE.init.get(); } // 初始化rpc,consumer在调用方法之前,需要先进行zk注册,即服务订阅,这里模拟consumer加载 public static void initRpc(RpcZkConfiger zkConfiger, List interfaces) { INSTANCE.lock.lock(); try { RpcLogUtil.log("consumer准备注册到Zookeeper节点"); //这里准备注册consumer需要的服务节点;成功之后会唤醒netty初始化代码 RpcRegisterUtils.registerInterface(zkConfiger, interfaces); INSTANCE.done.await(3000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } finally { INSTANCE.lock.unlock(); } // 初始化netty,这里可能已经初始化了 initNetty(); } / 流程: 1、consumer调用initRpc,将订阅信息注册到zk,本方法进行线程阻塞 2、consumer连接zk,并监听事件;事件触发表示连接成功,唤醒1中阻塞的线程进行3 3、调用initNetty方法进行netty通信初始化 4、consumer连接zk,超过3s未唤醒1中阻塞的线程,则阻塞超时,直接调用initNetty;此时并没有获得provider的ip和port,所以netty初始化失败 5、provider注册服务成功之后,会触发2中设置的监听事件,从而重新调用initNetty / private final Lock lock new ReentrantLock(); private final Condition done lock.newCondition(); private String host; private int port; // consumer发现provider注册了之后,会调用该信息,进行netty初始化确认 public static void setProviderHostAndPort(String host, int port) { INSTANCE.lock.lock(); try { RpcLogUtil.log("消费者成功连接Zookeeper,获得provider服务器信息,host{}, port{}", new Object[] { host, port }); INSTANCE.host host; INSTANCE.port port; INSTANCE.done.signal(); } catch (Exception e) { e.printStackTrace(); } finally { INSTANCE.lock.unlock(); initNetty(); } } // 是否获得了provider的服务器信息 private boolean hasHostAndPort() { return StringUtils.isNotBlank(host) && port 0; } // consumer初始化netty连接 private synchronized static void initNetty() { RpcLogUtil.log("initNetty hasHostAndPort : {}", INSTANCE.hasHostAndPort()); if (!INSTANCE.getInit().get() && INSTANCE.hasHostAndPort()) { // 如果netty还没有初始化,则会先初始化 synchronized (INSTANCE) { ChannelFuture ch INSTANCE.init(INSTANCE.host, INSTANCE.port); if (ch ! null) { ch.addListener(new ChannelFutureListener() { // 这里是初始化成功之后的回调 // 初始化本身是同步的,但是初始化到回调,是异步的 @Override public void operationComplete(ChannelFuture future) throws Exception { RpcLogUtil.log("operationComplete ", new Object[] {}); } }); } } } } // 暴露的rpc方法 public static Object rpc(final RpcRequest param) { long begin System.currentTimeMillis(); // 增加mock判断 if (isMock()) { return getRpcMockResult(param); } RpcLogUtil.log("RpcNettyServiceProxy do rpc, " + JSON.toJSONString(param)); // 获得封装了netty,具有与provider通信能力的对象 ChannelHandler handler INSTANCE.getChannelHandler(); IRpcSender rpcSender (IRpcSender) handler; // rpcSender封装了netty,通过ChannelHandlerContext发送请求 RpcResponse rpcResult rpcSender.send(param); RpcLogUtil.log("rpc remote cost {} ms, local rpc cost {} ms.", new Object[] { rpcResult.getCost(), System.currentTimeMillis() - begin }); return rpcResult.getRpcData(); } public ChannelHandler getChannelHandler() { return channelHandler; } public AtomicBoolean getInit() { return init; } public void setInit(AtomicBoolean init) { this.init init; }}--- 基础信息定义服务信息类:java/ 服务注册、订阅信息 @author wjyuian /public class RpcRegisterBean implements Serializable { private static final long serialVersionUID 821334386613317181L; public RpcRegisterBean() { } private String className; // 接口类 private String instanceName; //实例 private Integer timeout; // 超时 private String host; private Integer port; public String getRegisterInfo() { return JunitConsoleOut.consoleLine("testrpc://{}:{}", new Object[] { host, port }); }}RpcServiceRegister和RpcInterfaceRegisterBean都是它的子类。Zookeeper配置信息类:javapublic class RpcZkConfiger { private String hostPorts; public RpcZkConfiger(String hostPorts) { super(); this.hostPorts hostPorts; }}--- 功能测试provider端的测试模拟类,RpcProviderTest.java,较上一章的改动就是,启动时指定了服务端口以及增加Zookeeper注册服务:java/ rpc小框架测试,服务端模拟 @author wjyuian /public class RpcProviderTest { private int port; private RpcProviderTest(int port) { super(); this.port port; } public void run() { EventLoopGroup bossGroup new NioEventLoopGroup(); EventLoopGroup workGroup new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new RpcProviderHandler()); } }) .option(ChannelOption.SOBACKLOG, 128) .childOption(ChannelOption.SOKEEPALIVE, true); //绑定一个端口并启动监听,表示可以接受客户端连接 ChannelFuture f serverBootstrap.bind(port).sync(); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { RpcLogUtil.log("provider init ok, register service", new Object[] { }); //服务启动成功(netty连接成功),向zk注册服务 RpcZkConfiger zkConfiger new RpcZkConfiger("192.168.100.213:2181,192.168.100.215:2181"); RpcRegisterUtils.registerService(zkConfiger, generateServices()); } }); //一直等待,直到手动关闭 socket //这里不会关闭,因为一直线程一直等待在这里 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //手动关闭服务端 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } //模拟注册服务所需的对象 private static RpcServiceRegister createService(String className, String instance) { RpcServiceRegister service new RpcServiceRegister(); service.setClassName(className); service.setInstanceName(instance); service.setHost(RpcIpUtils.getLocalAddress().getHostAddress()); service.setPort(8881); return service; } private static List generateServices() { List list new ArrayList(); list.add(createService("tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService", "rpcService")); return list; } public static void main(String[] args) { //服务端绑定的端口 int port 8881; //启动provider端,模拟rpc服务提供者 new RpcProviderTest(port).run(); }}consumer端测试模拟类RpcConsumerTest.java,主要是增加了手动初始化Rpc,也就是收集订阅的服务信息然后注册到zk上。java/ rpc小框架测试,客户端模拟 @author wjyuian /public class RpcConsumerTest { //模拟消费者端,需要订阅的服务 private static RpcInterfaceRegisterBean createService(String className, String instance) { RpcInterfaceRegisterBean service new RpcInterfaceRegisterBean(); service.setClassName(className); service.setInstanceName(instance); return service; } private static List generateServices() { List list new ArrayList(); //接口类 和 consumer端将会生成的动态代理bean的名字 list.add(createService("tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService", "rpcService")); return list; } public static void main(String[] args) throws InterruptedException { //为了测试mock,这里将consumer的rpc连接初始化单独调用 //如果不调用初始化,则会走mock实现,符合正常的使用场景 //如果调用初始化,但是provider未启动,也会走mock实现 RpcZkConfiger zkConfiger new RpcZkConfiger("192.168.100.213:2181,192.168.100.215:2181"); RpcConsumerService.initRpc(zkConfiger, generateServices()); // 通过JDK的动态代理方式,初始化一个目标对象实例,在 RpcConsumerInvocationHandler 整合了rpc逻辑 //不需要代理已存在的对象实例 IRpcService nettyService GeneralProxy.newProxyInstanceWithoutInstance(IRpcService.class, new RpcConsumerInvocationHandler()); RpcTestQuery query new RpcTestQuery(); query.setWord("测试基于netty RPC"); query.setPageNo(1); query.setPageSize(20); // 包含netty连接初始化,不计算时间 RpcTestResult testRpc nettyService.testRpc(query); // 统计rpc耗时 long b System.currentTimeMillis(); query.setWord("计时rpc"); testRpc nettyService.testRpc(query); RpcLogUtil.log("rpc cost : " + (System.currentTimeMillis() - b)); RpcLogUtil.log("------------------ break line ------------------"); RpcLogUtil.log("以下内容从rpc-mock接口获得的:"); List keywords testRpc.getKeywords(); RpcLogUtil.log(testRpc.getRpcResult()); RpcLogUtil.log("keywords {}", keywords); RpcLogUtil.log("------------------ break line ------------------"); // consumer模拟先启动,调用mock实现,然后等待10s,等待provider启动,consumer自动发现服务,然后继续调用 TimeUnit.SECONDS.sleep(10); //第一次调用,耗时会稍微长一点 testRpc nettyService.testRpc(query); RpcLogUtil.log("------------------ break line ------------------"); RpcLogUtil.log("provider注册服务之后,consumer将自动获取服务信息,并进行下一次请求"); b System.currentTimeMillis(); query.setWord("consumer启动之后,provider再注册"); testRpc nettyService.testRpc(query); RpcLogUtil.log("rpc cost : " + (System.currentTimeMillis() - b)); RpcLogUtil.log("以下内容从rpc-remote接口获得的:"); keywords testRpc.getKeywords(); RpcLogUtil.log(testRpc.getRpcResult()); RpcLogUtil.log("keywords {}", keywords); }}我们的测试流程分为以下几步:1. 启动consumer。 2. 虽然调用了初始化Rpc方法,但是由于provider未注册服务,所以consumer无法获得服务信息,第一次调用接口是本地的降级实现。 3. consumer此时会监听订阅服务节点的信息变动。 4. 本例中设置了sleep 10ms,为的是就在这段时间内,provider启动之后,consumer可以继续调用接口,完成整个测试流程。2. 启动provider。 3. provider将提供的服务注册到zk节点,会触发consumer添加的监听事件 4. consumer得到服务注册成功的信息之后,会调用其rpc初始化方法(初始化netty连接)。 5. consumer会在sleep之后,继续调用接口;首次调用会耗时比较长,因为涉及到资源初始化;后续调用恢复正常。3. 测试流程完成。--- 测试日志 consumer启动:bash2018-11-29 11:18:49 [RPC] consumer准备注册到Zookeeper节点2018-11-29 11:18:49 [RPC] init curator client, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:49 [RPC] add ConnectionStateListener to client, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:49 [RPC] curator client start to connected zookeeper, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:49 [RPC] connect to zookeeper successfully, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:49 [RPC] [GgFramework] create zk node, path /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/consumers/testrpc%3A%2F%2Fnull%3Anull, s /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/consumers/testrpc%3A%2F%2Fnull%3Anull consumer成功连接zk,并在此时对订阅服务对应的zk节点进行监听; 这里打印的是consumer注册信息的节点,属于/consumer路径下;而consumer真正监听的是/provider路径; 这一点可以从GgFrameworkUtil.createConsumerNodeCache方法中发现,consumer监听是它关心的provider节点;而provider不需要关系consumer节点2018-11-29 11:18:49 [RPC] consumer注册到Zookeeper并监听服务节点, /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/consumers/testrpc%3A%2F%2Fnull%3Anull 由于provider未注册服务,所以之类初始化失败,即无法获得接口服务信息,后续调用的接口将会进行本地降级处理2018-11-29 11:18:52 [RPC] initNetty hashHostAndPort : false2018-11-29 11:18:52 [RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)2018-11-29 11:18:52 [RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc2018-11-29 11:18:52 [RPC] mock service result, cost : 637000 纳秒2018-11-29 11:18:52 [RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)2018-11-29 11:18:52 [RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc2018-11-29 11:18:52 [RPC] mock service result, cost : 14000 纳秒2018-11-29 11:18:52 [RPC] rpc cost : 02018-11-29 11:18:52 [RPC] ------------------ break line ------------------2018-11-29 11:18:52 [RPC] 以下内容从rpc-mock接口获得的: 从结果对象的内容可以发现,确实是我们降级实现service返回的结果;验证了我们的降级功能2018-11-29 11:18:52 [RPC] this is a mock result2018-11-29 11:18:52 [RPC] keywords null2018-11-29 11:18:52 [RPC] ------------------ break line ------------------ provider启动:bash2018-11-29 11:18:58 [RPC] provider init ok, register service2018-11-29 11:18:58 [RPC] init curator client, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:58 [RPC] add ConnectionStateListener to client, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:58 [RPC] curator client start to connected zookeeper, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:58 [RPC] connect to zookeeper successfully, zkHost 192.168.100.213:2181,192.168.100.215:21812018-11-29 11:18:59 [RPC] [GgFramework] create zk node, path /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/providers/testrpc%3A%2F%2F10.10.1.99%3A8881, s /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/providers/testrpc%3A%2F%2F10.10.1.99%3A8881 provider成功将自己提供的服务信息注册到zk节点,这个节点是consumer当初监听的节点的子节点2018-11-29 11:18:59 [RPC] provider注册到Zookeeper, /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/providers/testrpc%3A%2F%2F10.10.1.99%3A8881 : [...] consumer发现provder注册服务成功:bash consumer第一次初始化时,在zk接口上注册的监听事件触发,获得provider的服务信息,所以会立即调用初始化2018-11-29 11:18:59 [RPC] provider 注册成功 /testrpc/tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService/providers/testrpc%3A%2F%2F10.10.1.99%3A88812018-11-29 11:18:59 [RPC] provider 信息 :tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService, rpcService - 10.10.1.99:88812018-11-29 11:18:59 [RPC] 消费者成功连接Zookeeper,获得provider服务器信息,host10.10.1.99, port88812018-11-29 11:18:59 [RPC] initNetty hashHostAndPort : true2018-11-29 11:18:59 [RPC] consumer初始化netty连接2018-11-29 11:18:59 [RPC] consumer与provider端建立连接成功,channelActive2018-11-29 11:18:59 [RPC] bootstrap.connect("10.10.1.99", 8881).sync()2018-11-29 11:18:59 [RPC] operationComplete consumer在sleep之后调用接口:bash consumer在provider注册之后,连续调用两次接口,第一次是为了资源初始化,第二次是本例关心的调用2018-11-29 11:19:02 [RPC] testRpc : {"pageNo":1,"pageSize":20,"word":"计时rpc"}2018-11-29 11:19:02 [RPC] testRpc : {"pageNo":1,"pageSize":20,"word":"consumer启动之后,provider再注册"} provider接到接口调用,并完成rpc调用之后,返回给consumer结果:bash2018-11-29 11:19:02 [RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)2018-11-29 11:19:02 [RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc2018-11-29 11:19:02 [RPC] RpcNettyServiceProxy do rpc, {"args":[{"pageNo":1,"pageSize":20,"word":"计时rpc"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"}2018-11-29 11:19:02 [RPC] RpcHandler send param, {"args":[{"pageNo":1,"pageSize":20,"word":"计时rpc"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"} 第一次调用:服务端service耗时66ms,整个rpc过程耗时189ms2018-11-29 11:19:02 [RPC] rpc remote cost 66 ms, local rpc cost 189 ms.2018-11-29 11:19:02 [RPC] ------------------ break line ------------------2018-11-29 11:19:02 [RPC] provider注册服务之后,consumer将自动获取服务信息,并进行下一次请求2018-11-29 11:19:02 [RPC] 生成方法签名:tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)2018-11-29 11:19:02 [RPC] 在JDK代理方法中执行rpc调用, RpcConsumerServiceProxy.rpc2018-11-29 11:19:02 [RPC] RpcNettyServiceProxy do rpc, {"args":[{"pageNo":1,"pageSize":20,"word":"consumer启动之后,provider再注册"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"}2018-11-29 11:19:02 [RPC] RpcHandler send param, {"args":[{"pageNo":1,"pageSize":20,"word":"consumer启动之后,provider再注册"}],"className":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService","methodName":"testRpc","methodParameters":["tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery"],"rpcSign":"tcpip.oomabc.com.tone.netty.reflect.rpc.service.IRpcService.testRpc(tcpip.oomabc.com.tone.netty.reflect.rpc.service.RpcTestQuery)"} 第二次调用,服务端service耗时1ms,整个rpc过程耗时3ms,包括网络开销。不过这里是本地调用2018-11-29 11:19:02 [RPC] rpc remote cost 1 ms, local rpc cost 3 ms.2018-11-29 11:19:02 [RPC] rpc cost : 32018-11-29 11:19:02 [RPC] 以下内容从rpc-remote接口获得的:2018-11-29 11:19:02 [RPC] consumer启动之后,provider再注册 / 这是一个基于netty的rpc小例子返回的结果2018-11-29 11:19:02 [RPC] keywords [rpc1, rpc2, rpc3, rpc4, rpc5, rpc6, rpc7, rpc8, rpc9, rpc10, rpc11, rpc12, rpc13, rpc14, rpc15, rpc16, rpc17, rpc18, rpc19, rpc20]另外一个,Rpc注册工具类,用于服务注册、服务订阅:java//zk注册工具类public class RpcRegisterUtils { // 注册provider /// dubbo/com.hunteron.api.group.remote.v2.IGroupUserRemoteService/providers/dubbo://ip:port/sign public static void registerService(RpcZkConfiger conf, List services) { String key conf.getHostPorts(); System.out.println(key); if (CollectionUtils.isEmpty(services)) { return; } //确保zk连接已建立 GgFrameworkUtil.initCuratorFramework(conf.getHostPorts()); try { for (RpcServiceRegister service : services) { GgFrameworkUtil.registerServiceToRpc(service); } } catch (Exception e) { e.printStackTrace(); } } // 注册consumer /// dubbo/com.hunteron.api.group.remote.v2.IGroupUserRemoteService/consumers/consumers://ip:port/sign public static void registerInterface(RpcZkConfiger conf, List interfaces) { if (CollectionUtils.isEmpty(interfaces)) { return; } //初始化zk连接 GgFrameworkUtil.initCuratorFramework(conf.getHostPorts()); try { for (RpcInterfaceRegisterBean interface : interfaces) { GgFrameworkUtil.registerInterfaceToRpc(interface); } } catch (Exception e) { e.printStackTrace(); } }} 小结至此,本例已经实现了开头提出的几点功能,包括本地降级、服务注册、服务订阅以及发现。不过依旧只是在本地实现而已,还无法投入甚至是测试环境的使用,因为关于内存使用、netty使用和多线程并发的问题依然存在。在后续可能出现的章节中,将会一步一步对各个功能进行改造,让它逐渐成为至少在测试环境可以投入使用的一个简单rpc框架,希望如此。[prevLink]: https://oomabc.com/articledetail?atclidf097a312b2e543a4a070d961556ac90c[prevLink1]: https://oomabc.com/articledetail?atclide0a2741b298441bd9cc0bfe90618e906

    Netty   Java   Dubbo   RPC   2019-05-06 浏览(2164) 有用(0) 阅读原文>> [原创]
  • blogTest
    分享文章
     
    使用APP的"扫一扫"功能,扫描左边的二维码,即可将网页分享给别人。
    你也可以扫描右边本博客的小程序二维码,实时关注最新文章。