技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> Java --> [Java基础教程]第十三章-Java多线程

[Java基础教程]第十三章-Java多线程

浏览:1762次  出处信息

在前面我们介绍的客户端与服务器端通信的小程序,当一个客户端退出通信时服务器端也结束了,不满足我们对于服务器端的定位。通常服务器应该可以同时接受多个客户端的链接,并且客户端结束后服务器端应该正常运行。所以服务器端接受链接的代码应该一直接受新的客户链接,每个客户有一个独立的程序处理,相互之间不影响,这种场景成为并发。那么什么是并发,我们一起了解一下。

在介绍并发之前我们先了解一下串行和并行:
热闹的景点,买票人很多,这时只有一个窗口售票,大家排队依次买票就可以理解为串行。
排队人太多了,旁边又加开了几个窗口,多人在不同的窗口同时买票可以理解为并行。
如果只能开一个窗口,这时好多着急的人围上来,有问价格的,有掏钱的,又有取票的,在这个过程中售票员在同时应对多个买票人,可以理解为并发。

我们经常在计算机上一边听歌一边写文档(或者处理其他的事情),这就是一种并发行为,表面看两个程序是同时进行,为什么不是并行呢?计算机只有一个CPU所以只能支持一个线程运行。有人拍砖说:我家里计算机是多核CPU可以同时支持多个线程运行,确实是这样,为此我也特地去百度了一下有如下几点认知:
1、虽然是多核CPU但是,系统总线,内存是共用的,在加载内存数据时仍然需要串行访问。
2、目前的程序设计语言仍然是过程型开发,没有和好的方法能自动的切割任务使并行计算。
3、操作系统在线程调度时随着内核的增加复杂性递增,目前最多支持8核
所以基于以上认知,我们在讨论并发和同步这个问题时仍然按照CPU单核来讨论。
那么计算机是如何做到一边播放歌曲一边支持文档编辑呢?操作系统会把CPU的执行时间划分微妙级别的时间片段,每一个时间片内去调度一个线程执行,多个线程不断的切换执行,因此在人类可感知的时间段(秒级)内线程是同时执行的,所以多个线程在某个时间段内的同时执行就是并发。串行、并行和并发如下图所示:

串行并行并发

Java对并发场景提供了线程类Runnable接口和Thread类支持,下面是重构后的服务器端代码:

publicstaticvoidmain(String[] args) throwsIOException {
    ServerSocket serverSocket = newServerSocket(8080);
    System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:"
                + serverSocket.getLocalPort());
    inti = 0;
    // 主线程一直在等待客户端连接
    while(true) {
        Socket acceptSocket = serverSocket.accept();
        // 接受到一个客户请求就会创建一个新线程处理
        String sessionName = "session"+ i;
        Thread thread = newSessionProcessor(sessionName, acceptSocket);
        thread.start();
    }
}  
publicclassSessionProcessor extendsThread {
    /**
     * 会话名称
     */
    privateString name;
    /**
     * 与客户端连接的socket
     */
    privateSocket acceptSocket;
    publicSessionProcessor(String name, Socket acceptSocket) {
        this.name = name;
        this.acceptSocket = acceptSocket;
    }
    /* (non-Javadoc)
     * @see java.lang.Runnable#run()
     */
    @Override
    publicvoidrun() {
        InputStreamReader isr = null;
        BufferedReader br = null;
        try{
            InputStream inputStream = acceptSocket.getInputStream();
            isr = newInputStreamReader(inputStream, "utf-8");
            br = newBufferedReader(isr);
            String line = null;
            while((line = br.readLine()) != null) {
                System.out.println(name + "客户输入:"+ line);
            }
        } catch(UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        } finally{
            try{
                if(br != null) {
                    br.close();
                }
                if(isr != null) {
                    isr.close();
                }
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
    }
}  

main方法启动的线程,做为主线程一直在监听新的链接,每当有新的用户链接进来会创建一个新的线程”Thread thread = new SessionProcessor(sessionName, acceptSocket);”,并启动线程”thread.start()”.
新创建的线程执行逻辑是我们在继承了Thread的类SessionProcessor的run方法。从socket中获取输入流,读取客户端发送的字符串。
多线程实现的关键点:
1、继承Thread类或者实现Runnable接口,并重写run方法
2、创建线程并调用start方法。

为了以后方便查询,我们把聊天记录存储到文件中,新增ChatLogManager用于管理聊天记录的存储,每个SessionProcessor收到消息后调用ChatLogManager的save(String)方法,为了避免频繁打开聊天记录文件,我们先把聊天记录缓存到List中,每10条保存一次,代码重构如下:

publicclassChatLogManager {
    privateString fileName = "chatlog.txt";
    privateList<String> tempChatLogList = newArrayList<>();
    privateintmaxSize = 10;
    publicvoidsave(String chatLog) throwsIOException {
        // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件
        tempChatLogList.add(chatLog);
        if(tempChatLogList.size() == maxSize) {
            File chatLogFile = newFile(fileName);
            if(!chatLogFile.exists()) {
                chatLogFile.createNewFile();
            }
            FileOutputStream fos = newFileOutputStream(chatLogFile, true);
            OutputStreamWriter osw = newOutputStreamWriter(fos);
            BufferedWriter bw = newBufferedWriter(osw);
            for(inti = 0; i < maxSize; i++) {
                bw.append(tempChatLogList.get(i) + "n");
            }
            // 记录到文件后清空列表
            tempChatLogList.clear();
            bw.flush();
            bw.close();
            osw.close();
            fos.close();
        }
    }
}  
 
//监听线程创建SessionProcessor新增ChatLogManager入参
publicstaticvoidmain(String[] args) throwsIOException {
    ServerSocket serverSocket = newServerSocket(8080);
    System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:"
                + serverSocket.getLocalPort());
    inti = 0;
    ChatLogManager manager = newChatLogManager();
    // 主线程一直在等待客户端连接
    while(true) {
        Socket acceptSocket = serverSocket.accept();
        // 接受到一个客户请求就会创建一个新线程处理
        String sessionName = "session"+ i;
        Thread thread = newSessionProcessor(sessionName, acceptSocket, manager);
        thread.start();
    }
} 
 
//SessionProcessor新增ChatLogManager属性和构造方法入参
privateChatLogManager manager;
publicSessionProcessor(String name, Socket acceptSocket, ChatLogManager manager) {
    this.name = name;
    this.acceptSocket = acceptSocket;
    this.manager = manager;
}  
 
//SessionProcessor的run方法收到消息后调用ChatLogManager的save方法
br = newBufferedReader(isr);
String line = null;
while((line = br.readLine()) != null) {
    manager.save(name + "客户输入:"+ line);
}
System.out.println("接受完毕");  

使用telnet链接测试一下,聊天记录保存到工程根目录下的chatlog.txt,记得一定要输入10条的倍数,否则会有数据不能保存的。

为了更真实的模拟现实场景我们使用代码模拟10个用户同时访问的情景,创建线程类SenderProcessor,用于连接服务器端,并创建一个main方法创建10个线程,并启动,代码如下:

publicclassSenderProcessor extendsThread {
    @Override
    publicvoidrun() {
        try{
            Socket socket = newSocket("127.0.0.1", 8080);
            OutputStream outputStream = socket.getOutputStream();
            BufferedWriter writer = newBufferedWriter(newOutputStreamWriter(outputStream));
            for(inti = 0; i < 10; i++) {
                writer.write("发送的第"+ i + "句话");
                writer.newLine();
                writer.flush();
            }
            writer.close();
            outputStream.close();
            socket.close();
            System.out.println("输出完毕");
        } catch(UnknownHostException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
} 
publicstaticvoidmain(String[] args) {
    for(inti = 0; i < 10; i++) {
        Thread thread = newSenderProcessor();
        thread.start();
    }
} 

好了,先运行Server的main方法,再运行Client的main方法,从client的10个线程共发送了100句聊天(每个线程10个聊天记录),但是查看chatlog.txt,发现保存的数据少了,多运行几次(注意每次把文件清空),发现每次缺少的数据个数和数据内容都不相同,为什么呢?还记得前面介绍的并发知识吗,同一时刻只有一个线程在执行,多线程时每个线程只是在非常小的时间内执行,其他非执行线程会等待,所以线程的执行时断断续续的,假如其中一个线程A在ChatLogManager的”tempChatLogList.clear();”被切换到另外一个线程B执行,B执行的是“tempChatLogList.add(chatLog);”,刚执行完,又切换到了A,然后”tempChatLogList.clear();”被执行了,B添加的聊天记录被清空了。此时就需要进行同步处理,多个线程共享资源(这里指文件和缓存List),为了解决资源竞争,需要保持线程串行访问共享资源,Java中关键字”synchronized”和接口“java.util.concurrent.locks.Lock”相关的类可以解决这种问题。这里我们使用关键字”synchronized”,代码如下:

publicsynchronizedvoidsave(String chatLog) throwsIOException {
    // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件
    tempChatLogList.add(chatLog);
    if(tempChatLogList.size() == maxSize) {
        File chatLogFile = newFile(fileName);
        if(!chatLogFile.exists()) {
            chatLogFile.createNewFile();
        }
        FileOutputStream fos = newFileOutputStream(chatLogFile, true);
        OutputStreamWriter osw = newOutputStreamWriter(fos);
        BufferedWriter bw = newBufferedWriter(osw);
        for(inti = 0; i < maxSize; i++) {
            bw.append(tempChatLogList.get(i) + "n");
        }
        // 记录到文件后清空列表
        tempChatLogList.clear();
        bw.flush();
        bw.close();
        osw.close();
        fos.close();
    }
}  

再次运行,就会发现慢慢的100个聊天记录都在文件中了。多线程能很大的提升程序员运行的效率,但是如果处理不好同步会得到不一样的预期结果。所以线程有风险,使用需谨慎。

小练习:
使用多线程分析该工程中所有的Java文件中a-z出现的次数,不区分大小写。

课程中的代码:

packagecom.sunhaojie.learntest.thirteenth;
 
importjava.io.BufferedWriter;
importjava.io.File;
importjava.io.FileOutputStream;
importjava.io.IOException;
importjava.io.OutputStreamWriter;
importjava.util.ArrayList;
importjava.util.List;
 
/**
 * @ClassName ChatLogManager
 * @Description 聊天记录管理类
 *
 * @author sunhaojie 3113751575@qq.com
 * @date 2016年2月13日 下午10:07:43
 */
publicclassChatLogManager {
    privateString fileName = "chatlog.txt";
 
    privateList<String> tempChatLogList = newArrayList<>();
    privateintmaxSize = 10;
 
    publicsynchronizedvoidsave(String chatLog) throwsIOException {
        // 因为文件访问速度慢,所以我们积累10条记录,写入一次文件
        tempChatLogList.add(chatLog);
        if(tempChatLogList.size() == maxSize) {
            File chatLogFile = newFile(fileName);
            if(!chatLogFile.exists()) {
                chatLogFile.createNewFile();
            }
 
            FileOutputStream fos = newFileOutputStream(chatLogFile, true);
            OutputStreamWriter osw = newOutputStreamWriter(fos);
            BufferedWriter bw = newBufferedWriter(osw);
            for(inti = 0; i < maxSize; i++) {
                bw.append(tempChatLogList.get(i) + "n");
            }
            // 记录到文件后清空列表
            tempChatLogList.clear();
            bw.flush();
            bw.close();
            osw.close();
            fos.close();
        }
    }
}
packagecom.sunhaojie.learntest.thirteenth;
 
importjava.io.BufferedReader;
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.InputStreamReader;
importjava.io.UnsupportedEncodingException;
importjava.net.Socket;
 
/**
 * @ClassName SessionProcessor
 * @Description 用户会话
 *
 * @author sunhaojie 3113751575@qq.com
 * @date 2016年2月13日 下午2:07:31
 */
publicclassSessionProcessor extendsThread {
 
    /**
     * 会话名称
     */
    privateString name;
    /**
     * 与客户端连接的socket
     */
    privateSocket acceptSocket;
 
    /**
     * 聊天记录管理类
     */
    privateChatLogManager manager;
 
    publicSessionProcessor(String name, Socket acceptSocket, ChatLogManager manager) {
        this.name = name;
        this.acceptSocket = acceptSocket;
        this.manager = manager;
    }
 
    /* (non-Javadoc)
     * @see java.lang.Runnable#run()
     */
    @Override
    publicvoidrun() {
        InputStreamReader isr = null;
        BufferedReader br = null;
        try{
            InputStream inputStream = acceptSocket.getInputStream();
            isr = newInputStreamReader(inputStream, "utf-8");
            br = newBufferedReader(isr);
            String line = null;
            while((line = br.readLine()) != null) {
                manager.save(name + "客户输入:"+ line);
            }
            System.out.println("接受完毕");
        } catch(UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        } finally{
            try{
                if(br != null) {
                    br.close();
                }
                if(isr != null) {
                    isr.close();
                }
            } catch(IOException e) {
                e.printStackTrace();
            }
        }
 
    }
 
}
packagecom.sunhaojie.learntest.thirteenth;
importjava.io.IOException;
importjava.net.ServerSocket;
importjava.net.Socket;
/**
 * 
 * @ClassName SocketServerTest
 * @Description 服务器端测试类
 *
 * @author sunhaojie 3113751575@qq.com
 * @date 2016年2月12日 下午10:53:45
 */
publicclassSocketServerTest {
    @SuppressWarnings({ "static-access", "resource"})
    publicstaticvoidmain(String[] args) throwsIOException {
        ServerSocket serverSocket = newServerSocket(8080);
        System.out.println("监听IP:"+ serverSocket.getInetAddress().getLocalHost().getHostAddress() + "; 监听端口:"
                + serverSocket.getLocalPort());
        inti = 0;
        ChatLogManager manager = newChatLogManager();
        // 主线程一直在等待客户端连接
        while(true) {
            Socket acceptSocket = serverSocket.accept();
            // 接受到一个客户请求就会创建一个新线程处理
            String sessionName = "session"+ i++;
            Thread thread = newSessionProcessor(sessionName, acceptSocket, manager);
            thread.start();
        }
    }
}
packagecom.sunhaojie.learntest.thirteenth;
importjava.io.BufferedWriter;
importjava.io.IOException;
importjava.io.OutputStream;
importjava.io.OutputStreamWriter;
importjava.net.Socket;
importjava.net.UnknownHostException;
/**
 * @ClassName SenderProcessor
 * @Description TODO
 *
 * @author sunhaojie 3113751575@qq.com
 * @date 2016年2月13日 下午10:22:36
 */
publicclassSenderProcessor extendsThread {
    @Override
    publicvoidrun() {
        try{
            Socket socket = newSocket("127.0.0.1", 8080);
            OutputStream outputStream = socket.getOutputStream();
            BufferedWriter writer = newBufferedWriter(newOutputStreamWriter(outputStream));
            for(inti = 0; i < 10; i++) {
                writer.write("发送的第"+ i + "句话");
                writer.newLine();
                writer.flush();
            }
            writer.close();
            outputStream.close();
            socket.close();
            System.out.println("输出完毕");
        } catch(UnknownHostException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        }
    }
}
packagecom.sunhaojie.learntest.thirteenth;
/**
 * 
 * @ClassName SocketClientTest
 * @Description 测试端输出类
 *
 * @author sunhaojie 3113751575@qq.com
 * @date 2016年2月12日 下午10:54:03
 */
publicclassSocketClientTest {
    publicstaticvoidmain(String[] args) {
        for(inti = 0; i < 10; i++) {
            Thread thread = newSenderProcessor();
            thread.start();
        }
    }
}

建议继续学习:

  1. 浅析C++多线程内存模型    (阅读:6944)
  2. C++ 多线程编程总结    (阅读:6653)
  3. 多线程队列的算法优化    (阅读:6376)
  4. 程序中的“多线程”    (阅读:5366)
  5. php多线程扩展    (阅读:3997)
  6. 为什么在多线程程序中要慎用volatile关键字?    (阅读:3846)
  7. Ameba , 一个简单的 lua 多线程实现    (阅读:3432)
  8. 多线程程序中操作的原子性    (阅读:2853)
  9. 极不和谐的 fork 多线程程序    (阅读:2568)
  10. linux下多线程的创建与等待详解    (阅读:2539)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2024 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1