什么是 JUC
JUC 是 java.util.concurrent 的简写。在 jdk 官方手册中可以看到 juc 相关的 jar 包有三个。
用中文概括一下,JUC 的意思就是 java 并发编程工具包
线程和进程
如果不能使用一句话说出来的技术,不扎实!
进程:一个程序,QQ.exe Music.exe 程序的集合
一个进程往往可以包含多个线程,至少包含一个!
Java 默认有几个线程? 2 个 main、GC
线程:线程是程序执行中一个单一的顺序控制流程
对于 Java 而言:Thread、Runnable、Callable
Java 真的可以开启线程吗? 开不了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 JAVA public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } // 本地方法,底层的C++,Java无法直接操作硬件 private native void start0();
并发、并行 并发编程:并发、并行
并发(多线程操作同一个资源)
CPU 一核,模拟出来多条线程,天下武功,唯快不破,快速交替
并行(多个人一起行走)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 JAVA package com.ouwen.demo01; /** * @author IRVING * @create 2021-04-04 14:20 */ public class Test { public static void main(String[] args) { // 获取CPU的核心数 // CPU密集型,IO密集型 System.out.println(Runtime.getRuntime().availableProcessors()); } }
并发编程的本质:充分利用 CPU 的资源
所有的公司都很看重!
线程有几个状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 JAVA public enum State { //新生 NEW, //运行 RUNNABLE, //阻塞 BLOCKED, //等待 WAITING, //超时等待 TIMED_WAITING, //终止 TERMINATED; }
wait/sleep 的区别
来自不同的类
wait => Object
sleep => Thread
关于锁的释放
wait 会释放锁,sleep 睡觉了,抱着锁睡觉,不会释放!
使用的范围是不同的
wait:只能在同步代码块中使用
sleep:可以在任何地方睡
Lock 锁(重点) 传统 synchronized 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 JAVA package com.ouwen.demo01; /** * 真正的多线程开发,公司中的开发,降低耦合性 * * 线程就是一个单独的资源类,没有任何附属的操作! * * 1、 属性、方法 * * @author IRVING * @create 2021-04-04 14:42 */ public class SaleTicketDemo01 { public static void main(String[] args) { // 并发:多线程操作同一个资源类, 把资源类丢入线程 Ticket ticket = new Ticket(); // @FunctionalInterface 函数式接口,jdk1.8 lambda表达式 (参数)->{ 代码 } new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "C").start(); } } // 资源类 OOP class Ticket { // 属性、方法 private int number = 30; // 卖票的方式 // synchronized 本质: 队列,锁 public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余:" + number); } } }
Lock 接口
公平锁:十分公平;可以先来后到
非公平锁:十分不公平;可以插队(默认)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 JAVA package com.ouwen.demo01; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author IRVING * @create 2021-04-04 14:42 */ public class SaleTicketDemo02 { public static void main(String[] args) { // 并发:多线程操作同一个资源类, 把资源类丢入线程 Ticket2 ticket = new Ticket2(); new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "A").start(); new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "B").start(); new Thread(() -> { for (int i = 1; i < 40; i++) { ticket.sale(); } }, "C").start(); } } /** * Lock三部曲 * 1.new ReentrantLock(); * 2.Lock.lock() //加锁 * 3.finally => lock.unlock() //解锁 */ class Ticket2 { // 属性、方法 private int number = 30; Lock lock = new ReentrantLock(); // 卖票的方式 public void sale() { lock.lock();//加锁 try { //业务代码 if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了第" + (number--) + "票,剩余:" + number); } } catch (Exception e) { e.printStackTrace(); } finally { //解锁 lock.unlock(); } } }
synchronized 和 Lock 区别
synchronize 内置的 Java 关键字,Lock 是一个 Java 类
synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到了锁
synchronized 会自动释放锁,Lock 必须要手动释放锁!如果不释放锁,死锁
synchronized 线程 1(获得锁,阻塞)、线程 2(等待,傻傻的等);Lock 锁就不一定会等待下去;
synchronized 可重入锁,不可以中断的,非公平;Lock 可重入锁,可以判断锁,非公平(可以自己设置)
synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
生产者和消费者问题 生产者和消费者问题 synchronized 版 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 JAVA package com.ouwen.pc; import com.sun.org.apache.bcel.internal.generic.NEW; /** * 线程之间的通信问题:生产者与消费者问题! 等待唤醒,通知唤醒 * 线程交替执行 A B 操作同一个变量 num = 0 * A num+1 * B num-1 * @author IRVING * @create 2021-04-04 15:10 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); } } // 判断等待、业务、通知 //数字 资源类 class Data { private int number = 0; //+1 public synchronized void increment() throws InterruptedException { if (number != 0) { //等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其他线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { if (number == 0) { //等待 this.wait(); } number--; // 通知其他线程,我-1完毕了 System.out.println(Thread.currentThread().getName() + "=>" + number); this.notifyAll(); } }
问题存在,A B C D 4 个线程 虚假唤醒
if 改为 while 判断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 JAVA package com.ouwen.pc; import com.sun.org.apache.bcel.internal.generic.NEW; /** * 线程之间的通信问题:生产者与消费者问题! 等待唤醒,通知唤醒 * * @author IRVING * @create 2021-04-04 15:10 */ public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } // 判断等待、业务、通知 //数字 资源类 class Data { private int number = 0; //+1 public synchronized void increment() throws InterruptedException { while (number != 0) { //等待 this.wait(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其他线程,我+1完毕了 this.notifyAll(); } //-1 public synchronized void decrement() throws InterruptedException { while (number == 0) { //等待 this.wait(); } number--; // 通知其他线程,我-1完毕了 System.out.println(Thread.currentThread().getName() + "=>" + number); this.notifyAll(); } }
JUC 版的生产者与消费者问题
通过 Lock 找到 Condition
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 JAVA package com.ouwen.pc; import java.lang.reflect.Constructor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author IRVING * @create 2021-04-04 16:01 */ public class B { public static void main(String[] args) { Data2 data = new Data2(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D").start(); } } // 判断等待、业务、通知 //数字 资源类 class Data2 { private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); //condition.await(); //等待 //condition.signalAll(); //唤醒全部 //+1 public void increment() throws InterruptedException { lock.lock(); try { //业务代码 while (number != 0) { //等待 condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "=>" + number); // 通知其他线程,我+1完毕了 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } //-1 public void decrement() throws InterruptedException { lock.lock(); try { while (number == 0) { //等待 condition.await(); } number--; // 通知其他线程,我-1完毕了 System.out.println(Thread.currentThread().getName() + "=>" + number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
任何一个新的技术诞生,绝不是仅仅只是覆盖了原来的技术,一定存在优势和补充!
Condition 精准的通知和唤醒线程
代码测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 JAVA package com.ouwen.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author IRVING * @create 2021-04-04 16:14 * A执行完调用B,B执行完调用C,C执行完调用A */ public class C { public static void main(String[] args) { Data3 data3 = new Data3(); new Thread(() -> { for (int i = 0; i < 10; i++) { data3.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data3.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { data3.printC(); } }, "C").start(); } } //资源类 Lock class Data3 { private Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); private int number = 1; // 1A 2B 3C public void printA() { lock.lock(); try { //业务,判断 -> 执行 -> 通知 while (number != 1) { //等待 condition1.await(); } System.out.println(Thread.currentThread().getName() + "=>AAAAAAA"); //唤醒,唤醒指定的人,B number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { //业务,判断 -> 执行 -> 通知 while (number != 2) { //等待 condition2.await(); } System.out.println(Thread.currentThread().getName() + "=>BBBBBB"); //唤醒,唤醒指定的人,C number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { //业务,判断 -> 执行 -> 通知 while (number != 3) { //等待 condition3.await(); } System.out.println(Thread.currentThread().getName() + "=>CCCCCCCC"); //唤醒,唤醒指定的人,A number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
8 锁现象 任何判断锁的是谁!永远的知道什么是锁,锁到底锁的是谁!
**深刻理解我们的锁 **
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 JAVA package com.ouwen.lock8; import java.util.concurrent.TimeUnit; /** * 8锁,就是关于锁的8个问题 * 1、标准情况下,两个线程先打印 发短信还是 打电话? 1发短信 2打电话 * 1、sendSms延迟4秒,两个线程先打印 发短信还是 打电话? 1发短信 2打电话 * @author IRVING * @create 2021-04-04 16:30 */ public class Test1 { public static void main(String[] args) { Phone phone = new Phone(); //锁的存在 new Thread(()->{ phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone.call(); }).start(); } } class Phone { // synchronized 锁的对象是方法的调用者! // 两个方法用的是同一所,谁先拿到谁执行! public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } } JAVA package com.ouwen.lock8; import java.util.concurrent.TimeUnit; /** * 3. 增加了一个普通方法!先执行发短信还是hello? 普通方法hello * 4. 两个对象,两个同步方法,发短信还是打电话? //打电话 * @author IRVING * @create 2021-04-04 16:36 */ public class Test2 { public static void main(String[] args) { //两个不同的对象 两把锁 Phone2 phone1 = new Phone2(); Phone2 phone2 = new Phone2(); //锁的存在 new Thread(()->{ phone1.sendSms(); },"A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ phone2.call(); },"B").start(); } } class Phone2 { // synchronized 锁的对象是方法的调用者! // 两个方法用的是同一锁,谁先拿到谁执行! public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); } // 这里没有锁!不是同步方法!不受锁的影响 public void hello(){ System.out.println("hello"); } } JAVA package com.ouwen.lock8; import java.util.concurrent.TimeUnit; /** * 5.增加两个静态同步方法,只有一个对象,先打印 发短信?打电话? 发短信 * 6.增加两个静态同步方法,两个对象!,先打印 发短信?打电话? 发短信 * @author IRVING * @create 2021-04-04 17:22 */ public class Test3 { public static void main(String[] args) { //两个不同的对象 类模板Class只有一个 static,锁的是Class Phone3 phone1 = new Phone3(); Phone3 phone2 = new Phone3(); //锁的存在 new Thread(() -> { phone1.sendSms(); }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone3 { // synchronized 锁的对象是方法的调用者! // staic静态方法 // 类一加载就有了!锁的是Class Phone3.class public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call() { System.out.println("打电话"); } } JAVA package com.ouwen.lock8; import java.util.concurrent.TimeUnit; /** * 7. 一个静态同步方法,一个普通同步方法 一个对象 先打印 发短信?打电话? 打电话 * 8. 一个静态同步方法,一个普通同步方法 两个对象 先打印 发短信?打电话? 打电话 * * @author IRVING * @create 2021-04-04 17:27 */ public class Test4 { public static void main(String[] args) { //两个不同的对象 类模板Class只有一个 static,锁的是Class Phone4 phone1 = new Phone4(); Phone4 phone2 = new Phone4(); //锁的存在 new Thread(() -> { phone1.sendSms(); }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }, "B").start(); } } class Phone4 { // staic静态同步方法 锁的是Class public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //普通同步方法 锁的是调用对象 public synchronized void call() { System.out.println("打电话"); } }
小结
new this 具体的一个对象
static Class 唯一的模板
集合类不安全 List 不安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 JAVA package com.ouwen.unsafe; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; /** * java.util.ConcurrentModificationException 并发修改异常; * * @author IRVING * @create 2021-04-04 17:34 */ public class ListTest { public static void main(String[] args) { // 并发下 ArrayList 不安全的 /** * 解决方案: * 1、List<String> list = new Vector<>(); * 2、List<String> list = Collections.synchronizedList(new ArrayList<>()); * 3、List<String> list = new CopyOnWriteArrayList<>(); */ // CopyOnWriter COW 计算机程序设计领域的一种优化策略: // 多个线程调用的时候,list,读取时是固定的,写入(覆盖) // 在写入的时候避免覆盖,造成数据问题 -- 读写分离 // CopyOnWriterList 比 Vector NB在哪里? => 前者效率高 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 0; i < 10; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list); }, String.valueOf(i)).start(); } } }
学习方法推荐:1、先会用、2、货比 3 家,寻找其他解决方案,3、分析源码!
Set 不安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 JAVA package com.ouwen.unsafe; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; /** * 同理可证:java.util.ConcurrentModificationException 并发修改异常 * @author IRVING * @create 2021-04-04 17:49 */ public class SetList { public static void main(String[] args) { //Set<String> set = new HashSet<>(); //Set<String> set = Collections.synchronizedSet(new HashSet<>()); Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0; i < 300; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set); }, String.valueOf(i)).start(); } } }
HashSet 底层是什么?
1 2 3 4 5 6 7 8 9 10 11 JAVA public HashSet() { map = new HashMap<>(); } //add set 本质就是map key是无法重复的!! public boolean add(E e) { return map.put(e, PRESENT)==null; } private static final Object PRESENT = new Object(); //常量 不变的值!
Map 不安全 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 JAVA package com.ouwen.unsafe; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * @author IRVING * @create 2021-04-04 17:57 */ public class MapTest { public static void main(String[] args) { // map 是这样的用的吗? 不是,工作中不用HashMap() // 默认等价于什么 new HashMap<>(16,0.75f); //Map<String,String> map = new HashMap<>(); //Map<String,String> map = Collections.synchronizedMap(new HashMap<>()); // 研究ConcurrentHashMap原理 Map<String,String> map = new ConcurrentHashMap<>(); for (int i = 0; i < 30; i++) { new Thread(()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5)); System.out.println(map); },String.valueOf(i)).start(); } } }
Callable(简单)
可以有返回值
可以抛出异常
方法不同 ,run()/call()
代码测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 JAVA package com.ouwen.callable; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * 启动Callable * * @author IRVING * @create 2021-04-04 18:13 */ public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //怎么启动callable // 1.new Thread(new Runnable()).start(); // 2.new Thread(new FutureTask<V>()).start(); // 3.new Thread(new FutureTask<V>(new Callable())).start(); MyThread myThread = new MyThread(); //适配类 FutureTask FutureTask<String> task = new FutureTask<>(myThread); new Thread(task, "A").start(); new Thread(task, "B").start(); //结果会被缓存,效率高 String s = task.get(); // 这个get方法可能会产生阻塞! 把它放到最后,或者使用异步通信来处理 System.out.println(s); } } class MyThread implements Callable<String> { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName()+"call()"); return "123"; } }
细节:
有缓存
结果可能需要等待,会阻塞!
常用的辅助类 CountDownLatch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 JAVA package com.ouwen.add; import java.util.concurrent.CountDownLatch; /** * 计数器 * @author IRVING * @create 2021-04-04 21:14 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //总数是6 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"Go Out"); countDownLatch.countDown(); //数量-1 },String.valueOf(i)).start(); } countDownLatch.await(); //等待计数器归零,再向下执行 System.out.println("关门"); } }
原理:
==countDownLatch.countDown()== //数量-1
==countDownLatch.await()== //等待计数器归零,然后再向下执行
每次有线程调用 countDownLatch()数量-1,假设计数器变为 0,countDownLatch.await()就会被唤醒,继续往下执行
CyclicBarrier
加法计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 JAVA package com.ouwen.add; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * @author IRVING * @create 2021-04-04 21:28 */ public class CycliBarrierDemo { public static void main(String[] args) { /** * 集齐七颗龙珠,召唤神龙 */ //召唤龙珠的线程 CyclicBarrier barrier = new CyclicBarrier(7,()->{ System.out.println("召唤神龙成功"); }); for (int i = 0; i < 7; i++) { int temp = i; // lambda能操作i吗 new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集"+temp+"个龙珠"); try { barrier.await(); //等待 } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
Semaphore
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 JAVA package com.ouwen.add; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @author IRVING * @create 2021-04-04 21:36 */ public class SemaphoreDemo { public static void main(String[] args) { //线程数量:停车位 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 6; i++) { new Thread(() -> { //acquire()得到 try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); } finally { //release()释放 semaphore.release(); } },String.valueOf(i)).start(); } } }
原理:
==semaphore.acquire();== // 获得,假设已经满了,等待,等待被释放为止!
==semaphore.release();== // 释放,会将当前的信号量释放+1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用!并发限流,控制并发的线程数
读写锁 ReadWriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 JAVA package com.ouwen.rw; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 独占锁(写锁) 一次只能被一个线程占有 * 共享锁(读锁) 多个线程可以同时占有 * ReadWriteLock * 读-读 可以共存! * 读-写 不能共存! * 写-写 不能共存! * @author IRVING * @create 2021-04-05 14:28 */ public class ReadWriteLockDemo { public static void main(String[] args) { MyCacheLock myCache = new MyCacheLock(); //写入 for (int i = 0; i < 5; i++) { final int temp = i; new Thread(() -> { myCache.put(temp + "", temp); }, String.valueOf(i)).start(); } //读取 for (int i = 0; i < 5; i++) { final int temp = i; new Thread(() -> { myCache.get(temp + ""); }, String.valueOf(i)).start(); } } } /** * 自定义缓存 */ class MyCache { private volatile Map<String, Object> map = new HashMap<>(); // 存,写 public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕"); } // 取,读 public void get(String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完毕"); } } class MyCacheLock { private volatile Map<String, Object> map = new HashMap<>(); // 读写锁,更加细粒度的控制 private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 存,写入的时候,只希望同时只有一个线程写 public void put(String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } // 取,读 public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
阻塞队列
阻塞队列
BlockingQueue 不是新的东西,属于 Collection 集合框架下
什么情况下我们会使用阻塞队列:多线程并发处理,线程池!
学会使用队列 添加、移除
四组 API
方式
有返回值,抛出异常
有返回值,不抛出异常
阻塞等待
超时等待
添加
add
offer()
put()
offer(,,)
移除
remove
foll()
take()
foll(,)
检测队首元素
element
peek
-
-
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 JAVA /** * 抛出异常 */ public static void test1() { //队列的大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); //java.lang.IllegalStateException: Queue full 抛出异常!队列已满 //System.out.println(blockingQueue.add("d")); System.out.println("=========="); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); //java.util.NoSuchElementException 抛出异常!没有元素 System.out.println(blockingQueue.remove()); } JAVA /** * 有返回值,不抛出异常 */ public static void test2(){ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); //System.out.println(blockingQueue.offer("d")); //返回false System.out.println("========"); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //返回null } JAVA /** * 阻塞等待 */ public static void test3() throws InterruptedException { ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); //blockingQueue.put("d"); //程序一直阻塞 System.out.println("========"); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); //程序一直阻塞 } JAVA /** * 超时等待 */ public static void test4() throws InterruptedException { ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); //System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS)); //超时等待2秒 System.out.println("========="); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); //超时等待2秒 }
SynchronousQueue 同步队列
进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
put、take
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 JAVA package com.ouwen.bq; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** * 同步队列 * 和其他的BlockingQueue不一样,SynchronousQueue 不存储元素 * put了一个元素,必须从里面先take出来,否则无法继续往里面put值 * @author IRVING * @create 2021-04-05 17:03 */ public class SynchronousQueueTest { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>();//同步队列 new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName()+"put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName()+"put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"t1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"t2").start(); } }
线程池(重点) 线程池:三大方法、七大参数、四种拒绝策略
池化技术 程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术
线程池、连接池、内存池、对象池….创建、销毁。十分浪费资源
池化技术:事先准备好一些资源,有人要用,就到我这里来拿,用完之后还给我。
线程池的好处
降低资源的消耗
提高响应的速度
方便管理
==线程复用,可以控制最大并发数、管理线程 ==
线程池:三大方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 JAVA package com.ouwen.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Executors 工具类 3大方法 * @author IRVING * @create 2021-04-05 17:19 */ public class Demo01 { public static void main(String[] args) { //ExecutorService threadPool = Executors.newSingleThreadExecutor(); //单个线程 //ExecutorService threadPool = Executors.newFixedThreadPool(5); //创建一个固定大小的线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的,缓存线程池 try { for (int i = 0; i < 10; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //关闭线程池 } } }
七大参数 源码分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 JAVA public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //本质都是ThreadPoolExecutor() public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize, //最大核心线程池大小 long keepAliveTime, //超时了没有人调用就会释放 TimeUnit unit, //超时单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂,创建线程的,一般不用动 RejectedExecutionHandler handler //拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
手动创建线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 JAVA package com.ouwen.pool; import java.util.concurrent.*; /** * Executors 工具类 3大方法 * @author IRVING * @create 2021-04-05 17:19 */ public class Demo01 { public static void main(String[] args) { //自定义线程池!工作中 ThreadPoolExecutor ExecutorService threadPool = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), /** * 四种拒绝策略 * 1.new ThreadPoolExecutor.AbortPolicy()); // 银行满了,还有人进来,不处理这个人的,抛出异常 * 2.new ThreadPoolExecutor.CallerRunsPolicy()); // 哪来的去哪里 * 3.new ThreadPoolExecutor.DiscardPolicy()); // 队列满了,不会抛出异常,丢弃 * 4.new ThreadPoolExecutor.DiscardOldestPolicy()); // 队列满了,丢弃最早的任务,添加新任务 */ new ThreadPoolExecutor.DiscardOldestPolicy()); // 队列满了,丢弃最早的任务,添加新任务 try { // 最大承载:Deque + max // 抛出 java.util.concurrent.RejectedExecutionException for (int i = 0; i < 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //关闭线程池 } } }
四种拒绝策略
1 2 3 4 5 6 7 8 JAVA /** * 四种拒绝策略 * 1.new ThreadPoolExecutor.AbortPolicy()); // 银行满了,还有人进来,不处理这个人的,抛出异常 * 2.new ThreadPoolExecutor.CallerRunsPolicy()); // 哪来的去哪里 * 3.new ThreadPoolExecutor.DiscardPolicy()); // 队列满了,不会抛出异常,丢弃 * 4.new ThreadPoolExecutor.DiscardOldestPolicy()); // 队列满了,丢弃最早的任务,添加新任务 */
小结和扩展 最大线程池大小如何去设置!
了解:IO 密集型,CPU 密集型:(调优)!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 JAVA package com.ouwen.pool; import java.util.concurrent.*; /** * @author IRVING * @create 2021-04-05 17:19 */ public class Demo01 { public static void main(String[] args) { //自定义线程池!工作中 ThreadPoolExecutor /** * 最大线程到底该如何定义? * 1. CPU密集型,几核,就是几,可以保持CPU效率最高 * 2. IO密集型,判断你程序中十分耗IO的线程 * 程序 15个大型任务 io十分占用资源! -> 30个 */ //获取CPU的核心数 System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool = new ThreadPoolExecutor(2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); try { // 最大承载:Deque + max // 抛出 java.util.concurrent.RejectedExecutionException for (int i = 0; i < 9; i++) { threadPool.execute(()->{ System.out.println(Thread.currentThread().getName()); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); //关闭线程池 } } }
四大函数式接口(重点、必需掌握) 新时代的程序员:lambda 表达式、链式编程、函数式接口、Stream 流式计算
函数式接口:只有一个方法的接口 1 2 3 4 5 6 7 8 9 JAVA @FunctionalInterface public interface Runnable { public abstract void run(); } //超级多的FunctionInterface //简化编程模型,在新版本的框架底层大量应用! //forEach(消费者类型的函数式接口)
代码测试:
Function 接口 函数型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 JAVA package com.ouwen.function; import java.util.function.Function; /** * Function 函数型接口,有一个输入参数,有一个输出参数 * 只要是函数型接口,都可以用lambda表达式简化 * * @author IRVING * @create 2021-04-05 19:45 */ public class Demo01 { public static void main(String[] args) { //Function<String, String> function = new Function<String, String>() { // @Override // public String apply(String s) { // return s; // } //}; //使用lambda表达式简化 Function<String, String> function = (str) -> {return str;}; System.out.println(function.apply("asd")); } }
Predicate 接口 断定型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 JAVA package com.ouwen.function; import java.util.function.Predicate; /** * 断定型接口:有一个输入参数,返回值只能是布尔值! * @author IRVING * @create 2021-04-05 19:51 */ public class Demo02 { public static void main(String[] args) { //判断字符是否为空 //Predicate<String> predicate = new Predicate<String>() { // @Override // public boolean test(String s) { // return s.isEmpty(); // } //}; Predicate<String> predicate = str -> str.isEmpty(); System.out.println(predicate.test("")); } }
Consumer 接口 消费型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 JAVA package com.ouwen.function; import com.sun.org.apache.bcel.internal.generic.NEW; import java.util.function.Consumer; /** * 消费性接口:一个输入参数,没有返回值 * @author IRVING * @create 2021-04-05 20:00 */ public class Demo03 { public static void main(String[] args) { //Consumer<String> consumer = new Consumer<String>() { // @Override // public void accept(String s) { // System.out.println(s); // } //}; Consumer<String> consumer = str -> System.out.println(str); consumer.accept("asd"); } }
Supplier 接口 供给型接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 JAVA package com.ouwen.function; import java.util.function.Supplier; /** * Supplier供给型接口 没有参数,只有返回值 * @author IRVING * @create 2021-04-05 20:03 */ public class Demo04 { public static void main(String[] args) { //Supplier<Integer> supplier = new Supplier<Integer>() { // @Override // public Integer get() { // return 1024; // } //}; Supplier<Integer> supplier = () -> 1024; System.out.println(supplier.get()); } }
Stream 流式计算 什么是流式计算? 大数据时代:存储+计算
集合是用来存储东西的
计算都应该交给流来操作!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 JAVA package com.ouwen.stream; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.function.Function; /** * 现有5个用户,筛选: * 1. ID偶数的 * 2. 年纪大于23岁 * 3. 用户名转为大写字母 * 4. 用户名字母倒排序 * 5. 只输出一个用户 * <p> * * @author IRVING * * @create 2021-04-05 22:49 */ public class Test { public static void main(String[] args) { User u1 = new User(1, "a", 21); User u2 = new User(2, "b", 22); User u3 = new User(3, "c", 23); User u4 = new User(4, "d", 24); User u5 = new User(6, "e", 26); List<User> users = Arrays.asList(u1, u2, u3, u4, u5); users.stream() .filter(u -> u.getId() % 2 == 0) .filter(u -> u.getAge() > 23) .peek(user -> user.setName(user.getName().toUpperCase())) .sorted(Comparator.comparing(User::getName, Comparator.reverseOrder())) .limit(1) .forEach(System.out::println); } }
ForkJoin 分支合并
什么是 ForkJoin? ForkJoin 在 JDK1.7,并行执行任务!提高效率,大数据量!
ForkJoin 特点:工作窃取
这个里面维护的都是双端队列
如何使用 ForkJoin?
异步回调 Future 设计的初衷:对将来的某个事件的结果进行建模
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 JAVA package com.ouwen.future; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * 异步调用:CompletableFuture * 异步执行 * 成功回调 * 失败回调 * * @author IRVING * @create 2021-04-06 22:18 */ public class Demo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { //发起一个请求 //CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { // try { // TimeUnit.SECONDS.sleep(5); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(Thread.currentThread().getName() + "runAsync=>Void"); //}); // //System.out.println("1111"); // //completableFuture.get(); //获取阻塞执行结果 //有返回值的 CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "runAsync=>Integer"); int i = 10/0; return 1024; }); completableFuture.whenComplete((t,u)->{ System.out.println(t); //正常的返回结果 System.out.println(u); //错误信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero }).exceptionally((e)->{ System.out.println(e.getMessage()); //可以获取到错误的返回结果 return 233; }); } }
JMM 请你谈谈你对 Volatile 的理解 Volatile 是 Java 虚拟机提供的轻量级的同步机制
保证可见性
不保证原子性
禁止指令重排
什么是 JMM JMM:java 内存模型,不存在的东西,概念!约定!
关于 JMM 一些同步的约定:
线程解锁前,必须把共享变量立刻刷回主内存
线程加锁前,必须读取主内存中的最新值到工作内存中!
加锁和解锁必须是同一把锁!
线程 工作内存、主内存
8 种操作
内存交互操作有 8 种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于 double 和 long 类型的变量来说,load、store、read 和 write 操作在某些平台上允许例外)
lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用
load (载入):作用于工作内存的变量,它把 read 操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的 write 使用
write (写入):作用于主内存中的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中
JMM 对这八种指令的使用,制定了如下规则:
不允许 read 和 load、store 和 write 操作之一单独出现。即使用了 read 必须 load,使用了 store 必须 write
不允许线程丢弃他最近的 assign 操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有 assign 的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施 use、store 操作之前,必须经过 assign 和 load 操作
一个变量同一时间只有一个线程能对其进行 lock。多次 lock 后,必须执行相同次数的 unlock 才能解锁
如果对一个变量进行 lock 操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新 load 或 assign 操作初始化变量的值
如果一个变量没有被 lock,就不能对其进行 unlock 操作。也不能 unlock 一个被其他线程锁住的变量
对一个变量进行 unlock 操作之前,必须把此变量同步回主内存
问题:程序不知道主内存的值被修改过了!!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 JAVA package com.ouwen.volatile1; import java.util.concurrent.TimeUnit; /** * @author IRVING * @create 2021-04-06 22:50 */ public class JMMDemo { private static int num = 0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (num == 0){ //线程1对主内存的变化不知道 } }).start(); TimeUnit.SECONDS.sleep(1); num = 1; System.out.println(num); } }
Volatile 保证可见性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 JAVA package com.ouwen.volatile1; import java.util.concurrent.TimeUnit; /** * @author IRVING * @create 2021-04-06 22:50 */ public class JMMDemo { /* 不加 volatile 程序就会死循环 加 volatile 可以保证可见性 */ private volatile static int num = 0; public static void main(String[] args) throws InterruptedException { new Thread(()->{ while (num == 0){ //线程1对主内存的变化不知道 } }).start(); TimeUnit.SECONDS.sleep(1); num = 1; System.out.println(num); } }
不保证原子性 原子性:不可分割
线程 A 在执行任务的时候,不能被打扰的,也不能被分割,要么同时成功,要么同时失败!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 JAVA package com.ouwen.volatile1; /** * @author IRVING * @create 2021-04-06 22:55 */ public class Demo2 { //volatile不保证原子性 private volatile static int num = 0; public static void add() { num++; //不是一个原子性操作 } public static void main(String[] args) { //理论上num结果为2万 for (int i = 0; i < 20; i++) { new Thread(() -> { for (int i1 = 0; i1 < 1000; i1++) { add(); } }).start(); } while (Thread.activeCount() > 2) { //main gc Thread.yield(); } System.out.println(Thread.currentThread().getName() + "==>" + num); } }
如果不加 lock 和 synchronized,怎样保持原子性
使用原子类,解决原子性问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 JAVA package com.ouwen.volatile1; import java.util.concurrent.atomic.AtomicInteger; /** * @author IRVING * @create 2021-04-06 22:55 */ public class Demo2 { //volatile不保证原子性 //使用原子类的Integer private volatile static AtomicInteger num = new AtomicInteger(); public static void add() { //num++; //不是一个原子性操作 num.getAndIncrement(); // AtomicInteger +1 方法 CAS } public static void main(String[] args) { //理论上num结果为2万 for (int i = 0; i < 20; i++) { new Thread(() -> { for (int i1 = 0; i1 < 1000; i1++) { add(); } }).start(); } while (Thread.activeCount() > 2) { //main gc Thread.yield(); } System.out.println(Thread.currentThread().getName() + "==>" + num); } }
这些类的底层都直接和操作系统挂钩!!直接在内存中修改只!Usafe 类是一个很特殊的存在!
指令重排 什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的
源代码 –> 编译器优化的重排 –> 指令并行也可能会重排 –> 内存也可能会重排 –>执行
==处理器在进行指令重排的时候,考虑:数据之间的依赖性! ==
可能造成影响的结果:a b x y 四个数都是 0
线程 A
线程 B
x=a
y=b
b=1
a=2
正常的结果:x=0 y=0 但是可能由于指令重排
线程 A
线程 B
b=1
a=2
x=a
y=b
指令重排导致的诡异结果:x=2 y=1
volatile 可以避免指令重排:
内存屏障。CPU 指令。
作用:
1、保证特定的操作的执行顺序!
2、可以保证某些变量的内存可见性 (利用这些特性 volatile 实现了可见性)
Volatile 是可以保持可见性,不能保证原子性,因为内存屏障,可以保证避免指令重排的现象产生!
深入理解 CAS 什么是 CAS? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 JAVA package com.ouwen.cas; import java.util.concurrent.atomic.AtomicInteger; /** * @author IRVING * @create 2021-04-06 23:52 */ public class CASDemo { //CAS compareAndSet:比较并交换 compareAndSwap public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); // 期望、更新 // public final boolean compareAndSet(int expect, int update) // 如果我期望的值达到了,那么就会更新,否则就不更新 CAS 是CPU的并发原语 System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); } }
Unsafe 类
CAS : 比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就 一直循环!
缺点:
1、 循环会耗时
2、一次性只能保证一个共享变量的原子性
3、ABA 问题
ABA 问题(狸猫换太子)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 JAVA package com.ouwen.cas; import java.util.concurrent.atomic.AtomicInteger; /** * @author IRVING * @create 2021-04-06 23:52 */ public class CASDemo { //CAS compareAndSet:比较并交换 compareAndSwap public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020); // 对于我们写的SQL :乐观锁! // 期望、更新 // public final boolean compareAndSet(int expect, int update) // 如果我期望的值达到了,那么就会更新,否则就不更新 CAS 是CPU的并发原语 // ==============捣乱的线程============== System.out.println(atomicInteger.compareAndSet(2020, 2021)); System.out.println(atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(2021, 2020)); System.out.println(atomicInteger.get()); // ==============期望的线程============== System.out.println(atomicInteger.compareAndSet(2020, 6666)); System.out.println(atomicInteger.get()); } }
原子引用 解决 ABA 问题,引入原子引用!对应的思想就是我们的乐观锁
带版本号的原子操作!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 JAVA package com.ouwen.cas; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicStampedReference; import java.util.zip.DeflaterOutputStream; /** * @author IRVING * @create 2021-04-07 0:10 */ public class CASDemo02 { public static void main(String[] args) { // AtomicStampedReference 注意:如果泛型是一个包装类,注意对象的引用问题 // 正常在业务操作,这里面比较的都是一个个对象 AtomicStampedReference<Integer> integer = new AtomicStampedReference<>(1, 1); //乐观锁的原理 new Thread(() -> { int stamp = integer.getStamp(); //获得版本号 System.out.println("a1=>"+stamp); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(integer.compareAndSet(1, 2, integer.getStamp(), integer.getStamp() + 1)); System.out.println("a2=>"+integer.getStamp()); System.out.println(integer.compareAndSet(2, 1, integer.getStamp(), integer.getStamp() + 1)); System.out.println("a3=>"+integer.getStamp()); }, "a").start(); new Thread(() -> { int stamp = integer.getStamp(); //获得版本号 System.out.println("b1=>"+stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(integer.compareAndSet(1, 6, stamp, stamp + 1)); System.out.println("b2=>"+integer.getStamp()); }, "b").start(); } }
注意:
Integer 使用了对象缓存机制,默认范围是 -128 ~ 127 ,推荐使用静态工厂方法 valueOf 获取对象实 例,而不是 new,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间;
各种锁的理解 公平锁、非公平锁 公平锁:非常公平,不能够插队,必须先来后到!
非公平锁:非常不公平,可以插队(默认都是非公平)
1 2 3 4 5 6 7 8 JAVA public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
可重入锁(递归锁)
synchronized 版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 JAVA package com.ouwen.lock; /** * @author IRVING * @create 2021-04-07 0:39 */ public class Demo01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(()->{ phone.sms(); },"A").start(); new Thread(()->{ phone.sms(); },"B").start(); } } class Phone { public synchronized void sms() { System.out.println(Thread.currentThread().getName() + "sms"); call(); //这里也有锁 } public synchronized void call() { System.out.println(Thread.currentThread().getName() + "call"); } }
Lock 版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 JAVA package com.ouwen.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author IRVING * @create 2021-04-07 0:39 */ public class Demo02 { public static void main(String[] args) { Phone2 phone = new Phone2(); new Thread(() -> { phone.sms(); }, "A").start(); new Thread(() -> { phone.sms(); }, "B").start(); } } class Phone2 { Lock lock = new ReentrantLock(); public void sms() { lock.lock(); //细节问题:lock.lock(),lock.unlock(); //lock锁必须配对,否则就会死在里面 lock.lock(); try { System.out.println(Thread.currentThread().getName() + "sms"); call(); //这里也有锁 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); } } public synchronized void call() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "call"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
自选锁 spinlock
自定义锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 JAVA package com.ouwen.lock; import java.util.concurrent.atomic.AtomicReference; /** * 自旋锁 * * @author IRVING * @create 2021-04-07 0:52 */ public class SpinlockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); //加锁 public void mylock() { Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "==>mylock"); //自旋锁 while (!atomicReference.compareAndSet(null, thread)) { } } //解锁 public void myunlock() { Thread thread = Thread.currentThread(); System.out.println(thread.getName() + "==>myunlock"); atomicReference.compareAndSet(thread, null); } }
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 JAVA package com.ouwen.lock; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * @author IRVING * @create 2021-04-07 0:59 */ public class TestSpinLock { public static void main(String[] args) throws InterruptedException { //ReentrantLock lock = new ReentrantLock(); //lock.lock(); //lock.unlock(); //地层使用CAS实现自旋锁 SpinlockDemo lock = new SpinlockDemo(); new Thread(()->{ lock.mylock(); try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } finally { lock.myunlock(); } },"T1").start(); TimeUnit.SECONDS.sleep(1); new Thread(()->{ lock.mylock(); try { TimeUnit.SECONDS.sleep(5); } catch (Exception e) { e.printStackTrace(); } finally { lock.myunlock(); } },"T2").start(); } }
死锁 死锁是什么?
如何解决死锁问题? 1、使用jps -l
定位进程号
2、使用jstack 进程号
找到死锁问题
查看堆栈信息,找到死锁问题!