首页 >> 大全

架构设计:系统间通信(1)——概述从“聊天”开始上篇

2023-07-27 大全 24 作者:考证青年

从这篇博文开始,我们将进入一个新文章系列。这个文章系列专门整理总结了目前系统间通信的主要原理、手段和实现。我们将讲解典型的信息格式、讲解传统的RMI调用并延伸出来重点讲解RPC调用和使用案例;最后我们还会讲到SOA架构的实现,包括ESB实现和服务注册/治理的实现,同样包括原理、实现和使用案例。

系统间通信是架构师需要掌握的又一个关键技术领域,如果说理解和掌握负载均衡层技术需要您有一定的linux系统知识和操作系统知识的话,那么理解和掌握系统间通信层技术,需要您有一定的编程经验(最好是JAVA编程经验,因为我们会主要以JAVA技术作为实例演示)。

1、一个场景

首先我们来看一个显示场景:在现实生活中有两个人技术人员A和B,在进行一问一答形式的交流。如下图所示:

这里写图片描述

我们来看这幅图的中的几个要点:

2、信息格式

很明显通过中文的交谈,两个人相互明白了对方的意图。为了保证信息传递的高效性,我们一定会将信息做成某种参与者都理解的格式。例如:中文有其特定的语法结构,例如主谓宾,定状补。

在计算机领域为了保证信息能够被处理,信息也会被做成特定的格式,而且要确保目标能够明白这种格式。常用的信息格式包括:

这里写图片描述

这里有一篇介绍TLV的文章:《通信协议之序列化TLV》,TLV格式所携带的内容是最有效的,它就连JSON中用于分割层次的“{}”符号都没有。

在这个系列的博文中,我们不会把信息格式作为一个重点,但是会花一些篇幅去比较各种信息格式在网络上传输的速度、性能,并为大家介绍几种典型的信息格式选型场景。

3、网络协议

如文中第一张图描述的场景,有一个我们看不到但是却很重要的元素:空气。声音在空气中完成传播,真空无法传播声音。同样信息是在网络中完成传播的,没有网络就没法传播信息。网络协议就是计算机领域的“空气”,下图中我们以OSI模型作为参考:

这里写图片描述

这里写图片描述

在这个系列的博文中,我们不会把网络协议作为一个重点。这是因为网络网络协议的知识是一个相对独立的的知识领域,十几篇文章都不一定讲得清楚。如果您对网络协议有兴趣,这里推荐两本书:《TCP/IP详解.卷1-协议》和《TCP/IP详解.卷2-实现》。

4、通信方式/框架

聊聊架构_通信网的结构与架构有什么区别_

在文章最前面我们看到其中一个人规定了一种沟通方式:“你必须把我说的话听完,然后给我反馈后。我才会问第二个问题”。这种沟通方式虽然沟通效率不高,但是很有效:一个问题一个问题的处理。

但是如果参与沟通的人处理信息的能力比较强,那么他们还可以采用另一种沟通方式:“我给我提的问题编了一个号,在问完第X个问题后,我不会等待你返回,就会问第X+1个问题,同样你在听完我第X个问题后,一边处理我的问题,一边听我第X+1个问题。”

实际上以上两种现实中的沟通方式,在计算机领域是可以找到对应的通信方式的,这就是我们这个系列的博文会着重讲的BIO(阻塞模式)通信和NIO(非阻塞模式)。

4-1、BIO通信方式

以前大多数网络通信方式都是阻塞模式的,即:

如下图所示:

这里写图片描述

传统的BIO通信方式存在几个问题:

上面说的情况是服务器只有一个线程的情况,那么读者会直接提出我们可以使用多线程技术来解决这个问题:

如下图所示:

这里写图片描述

但是使用线程来解决这个问题实际上是有局限性的:

那么,如果你真想单纯使用线程解决阻塞的问题,那么您自己都可以算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。

4-2、BIO通信方式深入分析

在这个系列的博文中,通信方式/框架将作为一个重点进行讲解。包括NIO的原理,并通过讲解Netty的使用、JAVA原生NIO框架的使用,去熟悉这些核心原理。

实际上从上文中我们可以看出,BIO的问题关键不在于是否使用了多线程(包括线程池)处理这次请求,而在于()、read()的操作点都是被阻塞。要测试这个问题,也很简单。我们模拟了20个客户端(用20根线程模拟),利用JAVA的同步计数器,保证这20个客户都初始化完成后然后同时向服务器发送请求,然后我们来观察一下这边接受信息的情况。

4-2-1、模拟20个客户端并发请求,服务器端使用单线程:

package testBSocket;import java.util.concurrent.CountDownLatch;public class SocketClientDaemon {public static void main(String[] args) throws Exception {Integer clientNumber = 20;CountDownLatch countDownLatch = new CountDownLatch(clientNumber);//分别开始启动这20个客户端for(int index = 0 ; index < clientNumber ; index++ , countDownLatch.countDown()) {SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);new Thread(client).start();}//这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态synchronized (SocketClientDaemon.class) {SocketClientDaemon.class.wait();}}
}

package testBSocket;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;/*** 一个SocketClientRequestThread线程模拟一个客户端请求。* @author yinwenjie*/
public class SocketClientRequestThread implements Runnable {static {BasicConfigurator.configure();}/*** 日志*/private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);private CountDownLatch countDownLatch;/*** 这个线层的编号* @param countDownLatch*/private Integer clientIndex;/*** countDownLatch是java提供的同步计数器。* 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性* @param countDownLatch*/public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) {this.countDownLatch = countDownLatch;this.clientIndex = clientIndex;}@Overridepublic void run() {Socket socket = null;OutputStream clientRequest = null;InputStream clientResponse = null;try {socket = new Socket("localhost",83);clientRequest = socket.getOutputStream();clientResponse = socket.getInputStream();//等待,直到SocketClientDaemon完成所有线程的启动,然后所有线程一起发送请求this.countDownLatch.await();//发送请求信息clientRequest.write(("这是第" + this.clientIndex + " 个客户端的请求。").getBytes());clientRequest.flush();//在这里等待,直到服务器返回信息SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "个客户端的请求发送完成,等待服务器返回信息");int maxLen = 1024;byte[] contextBytes = new byte[maxLen];int realLen;String message = "";//程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {message += new String(contextBytes , 0 , realLen);}SocketClientRequestThread.LOGGER.info("接收到来自服务器的信息:" + message);} catch (Exception e) {SocketClientRequestThread.LOGGER.error(e.getMessage(), e);} finally {try {if(clientRequest != null) {clientRequest.close();}if(clientResponse != null) {clientResponse.close();}} catch (IOException e) {SocketClientRequestThread.LOGGER.error(e.getMessage(), e);}}}
}

package testBSocket;import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;public class SocketServer1 {static {BasicConfigurator.configure();}/*** 日志*/private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);public static void main(String[] args) throws Exception{ServerSocket serverSocket = new ServerSocket(83);try {while(true) {Socket socket = serverSocket.accept();//下面我们收取信息InputStream in = socket.getInputStream();OutputStream out = socket.getOutputStream();Integer sourcePort = socket.getPort();int maxLen = 2048;byte[] contextBytes = new byte[maxLen];//这里也会被阻塞,直到有数据准备好int realLen = in.read(contextBytes, 0, maxLen);//读取信息String message = new String(contextBytes , 0 , realLen);//下面打印信息SocketServer1.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);//下面开始发送信息out.write("回发响应信息!".getBytes());//关闭out.close();in.close();socket.close();}} catch(Exception e) {SocketServer1.LOGGER.error(e.getMessage(), e);} finally {if(serverSocket != null) {serverSocket.close();}}}
}

4-2-2、就像上文所述我们可以使用多线程来优化服务器端的处理过程:

客户端代码和上文一样,最主要是更改服务器端的代码:

package testBSocket;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;public class SocketServer2 {static {BasicConfigurator.configure();}private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);public static void main(String[] args) throws Exception{ServerSocket serverSocket = new ServerSocket(83);try {while(true) {Socket socket = serverSocket.accept();//当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。//最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况SocketServerThread socketServerThread = new SocketServerThread(socket);new Thread(socketServerThread).start();}} catch(Exception e) {SocketServer2.LOGGER.error(e.getMessage(), e);} finally {if(serverSocket != null) {serverSocket.close();}}}
}/*** 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。* 但还是改变不了socket被一个一个的做accept()的情况。* @author yinwenjie*/
class SocketServerThread implements Runnable {/*** 日志*/private static final Log LOGGER = LogFactory.getLog(SocketServerThread.class);private Socket socket;public SocketServerThread (Socket socket) {this.socket = socket;}@Overridepublic void run() {InputStream in = null;OutputStream out = null;try {//下面我们收取信息in = socket.getInputStream();out = socket.getOutputStream();Integer sourcePort = socket.getPort();int maxLen = 1024;byte[] contextBytes = new byte[maxLen];//使用线程,同样无法解决read方法的阻塞问题,//也就是说read方法处同样会被阻塞,直到操作系统有数据准备好int realLen = in.read(contextBytes, 0, maxLen);//读取信息String message = new String(contextBytes , 0 , realLen);//下面打印信息SocketServerThread.LOGGER.info("服务器收到来自于端口:" + sourcePort + "的信息:" + message);//下面开始发送信息out.write("回发响应信息!".getBytes());} catch(Exception e) {SocketServerThread.LOGGER.error(e.getMessage(), e);} finally {//试图关闭try {if(in != null) {in.close();}if(out != null) {out.close();}if(this.socket != null) {this.socket.close();}} catch (IOException e) {SocketServerThread.LOGGER.error(e.getMessage(), e);}}}
}

4-2-3、看看服务器端的执行效果:

我相信服务器使用单线程的效果就不用看了,我们主要看一看服务器使用多线程处理时的情况:

这里写图片描述

4-2-4、问题根源

那么重点的问题并不是“是否使用了多线程”,而是为什么()、read()方法会被阻塞。即:异步IO模式 就是为了解决这样的并发性存在的。但是为了说清楚异步IO模式,在介绍IO模式的时候,我们就要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。

这里我要特别说明一下,在一篇网文《Java NIO与IO的详细区别(通俗篇)》中,作者主要讲到了自己对非阻塞方式下硬盘操作的理解。按照我的看法,只要有IO的存在,就会有阻塞或非阻塞的问题,无论这个IO是网络的,还是硬盘的。这就是为什么基本的JAVA NIO框架中会有(而且在操作系统级别是不支持非阻塞模式的)、和的原因。NIO并不只是为了解决磁盘读写的性能而存在的,它的出现原因、要解决的问题更为广阔;但是另外一个方面,文章作者只是表达自己的思想,没有必要争论得“咬文嚼字”。

API文档中对于 .() 方法的使用描述:

for a to be made to this and it. The until a is made.

那么我们首先来看看为什么.()会被阻塞。这里涉及到阻塞式同步IO的工作原理:

这里写图片描述

这里写图片描述

===================================

(内容太多,分上下两篇文章发布)

下篇包括:调用方式概要(RMI、RPC、MQ),整合手段概要(服务治理、ESB实现、自行实现

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了