Thread线程

Posted by Chejdj Blog on July 12, 2018

在实际的代码当中,自己比较少用到多线程的知识,这里做一个知识的总结。

java的内存模型

Java内存模型就是:线程之间的共享变量存储在主存中,但是每一个线程都有一个私有的本地内存也叫工作内存,这个内存中存储了线程共享变量的副本,所以容易造成数据不一致现象,因为我们不知道什么时候会把私有内存中的共享变量给更新到主存中去。

创建线程的几种方法

  1. 继承Thread的类,重写它的run()方法
  2. 实现Runnable接口,然后在实例化线程的时候,传入该参数
  3. 实现Callable接口,重写call()方法

    Callable是Executor这个线程池框架中的功能类,相比较Runnable具有更过功能: 1.在任务执行后,可以提供一个返回值,Runnable不行
    2.Callable中的call()可以抛出异常,Runnable不行(这里面的抛,不是throw,是里面如果有异常,run里面必须处理,但是call()不用)
    3.运行Callable的时候可以拿到一个Future对象,这个对象表示类异步对象的计算结果,可以检查计算是否完成,如果在主线程调用Future.get()方法想要获取结果,会导致主线程阻塞到call()返回结果为止

状态转换图
线程之间的通信:

  1. 等待/通知机制(wait/notify)(如果单个产生消费,wait条件用if就行,多个就需要while并且notify可能导致生产者唤醒生产者,假死现象产生)
  2. 管道通信(pipeStream)一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,在Java中提供了4个类来使线程间可以进行通信
    1)PipedInputStream 和PipedOutputStream 2)PipedReader和PipedWriter

线程之间管道通信的代码方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main{
  try{
   WriteData writeData=new WriteData();
   ReadData readData=new ReadData();
   PipedOutputStream outputStream=new PipedOutputStream();
   PipedInputStream inputStream=new PipedInputStream();
   outputStream.connect(inputStream);//连接就可以使用了
   ThreadRead threadRead=new ThreadRead(readData,inputStream);
   threadRead.start();
   Thread.sleep(2000);
   ThreadWrite threadWrite=new ThreadWrite(writeData,ouputStream);
   threadWrite.start();
     }catch(IOException e){
      e.printStackTrace();
   }

}

常规的等待通知实例

实现两个线程之间的交互打印,使用wait和notify的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class thread_test{
    private static volatile boolean flag=true; //true代表线程1在执行
    public static void main(String[] args){
        Object a=new Object();
        int[] data={1,2,3,4};
        thread_test test=new thread_test();
        thread_test.ThreadA threadA= test.new ThreadA(data,a);
        thread_test.ThreadB threadB= test.new ThreadB(data,a);
        threadA.start();
        threadB.start();
    }
      class ThreadA extends Thread{
        private int[] nums;
        private Object a;
        public ThreadA(int[]nums,Object a){
                this.nums=nums;
                this.a=a;
        }
        public void run(){
            for(int i=0;i<nums.length;i++){
                synchronized(a){
                    while(true){
                        if(flag){
                            System.out.println(nums[i]);
                            flag=false;
                            a.notify();
                            break;
                        }else{
                            try{
                            a.wait();
                            }catch(InterruptedException e){
                             e.printStackTrace();   
                            }
                        }
                    }
                }
            }
        }
    }
      class ThreadB extends Thread{
        private int[] nums1;
        private Object a;
        public ThreadB(int[]nums1,Object a){
            this.nums1=nums1;
            this.a=a;
        }
        public void run(){
            for(int i=0;i<nums1.length;i++){
               synchronized(a){
                while(true){
                    if(!flag){
                        System.out.println(nums1[i]);
                        flag=true;
                        a.notify();
                        break;
                    }else{
                        try{
                        a.wait();
                        }catch(InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                }
               }
            }
        }
    }
}

线程阻塞的方法,sleep和wait和锁

  1. sleep()释放CPU执行权,但是不释放同步锁
  2. wait()释放CPU执行权,也释放同步锁,使得其他线程可以使用同步控制快或者方法
  3. sleep()可以在任意地方使用,wait()只能在同步代码块方法或者同步代码中使用

线程的中断

正常的线程在run()方法执行完毕,或者在方法中出现没有捕获的异常的时候,线程终止。
interrupt()用这个方法请求中断线程,该方法会把线程的中断标志位标为true,我们需要在线程中检查这个标志位

1
2
3
4
5
public void run(){
   while(!Thread,=.currentThread.isInterrupted()){
      //TODO ....
   }
}

还可以调用Thread.interrupted()方法对中断标志位进行复位,在中断检查中我们需要注意,如果一个线程被阻塞,线程在检查中断标志位为true的时候,会在阻塞方法处抛出InterruptedException异常,并且在抛出异常前会将中断标志位复位,重新设置为false。对于这种情况,有两种处理方法,一种在阻塞处捕获InterruptedException然后设置设置interrupt(),让外层循环判断该线程的应该怎么样,或者直接抛出这个异常
安全的终止线程一般都是判断interrupt标志位,或者自己设置标志位

重入锁与条件对象

synchronizedReentrantLock两个都是可重入锁,我们以ReentrantLock为例,写一个如果对已经获得锁,但是因为不满足条件而需要让出锁阻塞的代码例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Main{
   private int[] accounts;
   private ReentrantLock lock;
   private Condition condition;
   public Main(int n){
      accounts=new int[n];
      for(int i=0;i<n;i++){
         accounts[i]=i;
      }
      lock=new ReentrantLock();
      condition=lock.newCondition();
   }
    //转钱
    public void transform(int from,int to,int money){
        try{
            lock.lock();
            while(accounts[from]<money){
                condition.await();
            }
            accounts[from]=accounts[from]-money;
            accounts[to]=accounts[to]+money;
            condition.signalAll();
        }finally{
            lock.unlock();
        }
    }
}

java中原子性,可见性,有序性

原子性: 只有赋值操变量的读取和赋值操作是原子型的,不可以打断
可见性: 一个线程的修改状态对于另一个线程是可见的,volatile可以保证
有序性: Java允许编译器和处理器对指令进行重排序
volatile禁止指令重排序,保证执行volatile变量的时候,在其前面的操作已经全部执行完全,并且结果对后面操作可见
volatile的性能好于synchronized,但是它无法保证原子性,所以需要保证下面两种情况:

  1. 对变量的操作不依赖于当前值
  2. 该变量没有包含在具有其他变量的不变式中(不变式就是大于,小于,的公式里面没有其他变量,包括使用volatile)
    volatile经常使用的场景就是:
  3. 单纯做一个标志位
  4. 双重检查,实现一个单例模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
publci class Singleton{
    private volatile Singleton singleton;
    private Singleton(){}
    public static Singleton(){
        if(singleton == null){
            synchornized(Singleton.class){
                if(singleton == null){
                    singleton=new Singleton();
                }
            }
        }
        return singleton;s
    }

}

java中的线程池

java中使用Executor用于任务的提交和执行,Executor中最核心的成员是ThreadPoolExecutor

1
2
3
4
5
6
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler
        )
  • corePoolsize : 核心线程数量,默认线程池为空,只有当前的线程小于corePoolsize,就创建新线程来处理任务,如果等于或者大于,不创建
  • maximumPoolsize: 线程池允许创建的最大线程数,如果任务满了,并且线程数小于这个数,就会创建新的线程执行任务
  • keepAliveTime: 非核心线程的闲置的超时时间
  • TimeUnit: keepAliveTime参数的时间单位
  • workQueue: 任务队列,这个任务队列是BlockingQueue类型的,阻塞队列,当前线程大于corePoolsize就会放入这个队列中(不设置大小,默认无穷大,int类)
  • ThreadFactory: 线程工厂,为线程起名字,创建线程的
  • RejectedExecutionHandler: 饱和策略,当任务队列和线程池满了,采用什么策略,默认是AbordPolicy,无法处理新任务还抛出RejectExecutionException异常,还有3种策略
    1. CallerRunsPolicy:使用调用者所在线程处理任务
    2. DiscardPolicy:将这个任务删除
    3. DiscardOldestPolicy: 丢弃队列最近任务,执行当前任务。(这个非常好)
      我们创建线程池一般会使用Executors这个类来创建
1
2
3
4
Executors.newSingleThreadExecutor(); //核心线程数一个,最大线程数也是一个,LinkedBlockingQueue默认容量为Interger.MAX,keepAliveTime 为0
Executors.newFixedThreadPool(n);//核心线程数为n,最大线程数也是n,其他一样,keepAliveTime 为0
Executors.newCachedThreadPool();//核心线程数0,最大线程数Integer.MAX,keepAliveTime 为0
Executors.newScheduledThreadPool(corePoolSize);//它的任务阻塞队列是 DelayedWorkQueue这个队列支持延时获取元素,可以指定item的到期时间,才可以从队列取出

实际开发当中

我们在实际的开发当中,应该优先使用java.util.concurrent包提供的类