Thread-Per-Message模式之网络聊天客户端


prtyaa
prtyaa 2023-12-30 21:30:39 50771 赞同 0 反对 0
分类: 资源
Thread-Per-Message设计模式就是 为每一个消息处理开辟一个线程,让消息能够被并发处理,提高系统的吞吐量。

最典型的就是Socket的聊天客户端,在学习java的时候很多人都做过这个案例,每一个连接到服务器的连接都会被放进一个独立的线程中,当连接数超过能够处理的最大连接数时,将被放入等待队列中。

服务端代码:

//聊天室服务端
public class ChatServer {
    //定义端口号
    private final int port;
    //自定义的线程池
    private ThreadPool threadPool;
    //服务端socket
    private ServerSocket serverSocket;
   //默认端口号
    private final static int DEFAULT_PORT = 13312;
    public ChatServer(){
        this(DEFAULT_PORT);
    }
    public ChatServer(int port){
        this.port = port;
    }
    public void startServer() throws IOException {
        //初始化线程池
        this.threadPool = new BasicThreadPool(2,4,8,1000);
        this.serverSocket = new ServerSocket(port);
        this.serverSocket.setReuseAddress(true);
        System.out.println("服务器已经开启,端口号:"+port);
        this.listen();
    }
    private void listen() throws IOException {
        for (;;){
            //监听客户端连接
            Socket client = serverSocket.accept();
            this.threadPool.execute(new ClientHandler(client));
        }
    }
}

线程池使用的是之前自己实现的线程池。

客户端代码

public class ClientHandler implements Runnable {
    //客户端socket
    private final Socket socket;

    //客户端的id
    private final String clientId;

    public ClientHandler(final Socket socket) {
        this.socket = socket;
        this.clientId = socket.getInetAddress().getHostAddress()+":"+socket.getPort();
    }

    @Override
    public void run() {
        try {
            this.chat();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            this.release();
        }

    }

    private void chat() throws IOException {
        BufferedReader bufferedReader = wrap2Reader(this.socket.getInputStream());
        PrintStream printStream = wrap2Print(this.socket.getOutputStream());
        String received;
        while ((received=bufferedReader.readLine())!=null){
            //将客户端发送的消息输出
            System.out.printf("客户端:%s-消息:%s\n",clientId,received);
            if (received.equals("quit")){
                write2Client(printStream,"客户端关闭");
                socket.close();
                break;
            }
        }
        write2Client(printStream,"Server:"+received);
    }

    //将字节流封装成BufferReader缓冲字符流
    private BufferedReader wrap2Reader(InputStream inputStream){
        return new BufferedReader(new InputStreamReader(inputStream));
    }

    //将输出字节流封装成PrintStream

    private PrintStream wrap2Print(OutputStream outputStream){
        return new PrintStream(outputStream);
    }

    //发送消息
    private void write2Client(PrintStream print,String message){
        print.println(message);
        print.flush();
    }
    private void release(){
        try {
            if (socket!=null){
                socket.close();
            }
        } catch (IOException e) {
            if (socket!=null){
                SocketCleaningTracker.tracker(socket);
            }
        }
    }
}

发生异常时候关闭socket连接代码

public class SocketCleaningTracker {
    private static final ReferenceQueue<Object> queue = new ReferenceQueue<>();

    {
        new Cleaner().start();
    }

    public static void tracker(Socket socket) {
        new Tracker(socket, queue);
    }

    private static class Cleaner extends Thread {
        private Cleaner() {
            super("SocketCleaningTracker");
            setDaemon(true);
        }

        @Override
        public void run() {
            for (; ; ) {
                try {
                    Tracker tracker = (Tracker) queue.remove();
                    tracker.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Tracker extends PhantomReference<Object> {
        private final Socket socket;

        Tracker(Socket socket, ReferenceQueue<? super Object> queue) {
            super(socket, queue);
            this.socket = socket;
        }

        public void close() {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

解释一下上面的代码,当客户端出现异常被意外关闭,socket.close()并不能保证一定能够关闭资源,所以就需要这一段代码,在垃圾回收器对socket对象回收的时候再尝试清理。

最后说几句:以前大二的时候期末实训就是做这个作业,网上的代码很多,有的还结合的GUI,但以前始终云里雾里,其实原理非常的简单。

运行这段程序很简单,只需要启动服务端

public class Test {
    public static void main(String[] args) throws IOException {
        new ChatServer().startServer();
    }
}

打开CMD或者PowerShell,使用talnet命令去连接到服务器端口,也可以打开多个命令窗口去连接。

如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!

评价 0 条
prtyaaL2
粉丝 1 资源 1949 + 关注 私信
最近热门资源
银河麒麟桌面操作系统备份用户数据  125
统信桌面专业版【全盘安装UOS系统】介绍  120
银河麒麟桌面操作系统安装佳能打印机驱动方法  111
银河麒麟桌面操作系统 V10-SP1用户密码修改  105
最近下载排行榜
银河麒麟桌面操作系统备份用户数据 0
统信桌面专业版【全盘安装UOS系统】介绍 0
银河麒麟桌面操作系统安装佳能打印机驱动方法 0
银河麒麟桌面操作系统 V10-SP1用户密码修改 0
作者收入月榜
1

prtyaa 收益393.62元

2

zlj141319 收益218元

3

1843880570 收益214.2元

4

IT-feng 收益209.03元

5

风晓 收益208.24元

6

777 收益172.71元

7

Fhawking 收益106.6元

8

信创来了 收益105.84元

9

克里斯蒂亚诺诺 收益91.08元

10

技术-小陈 收益79.5元

请使用微信扫码

加入交流群

请使用微信扫一扫!