当前位置: 首页 > news >正文

Java基础知识八

并发数据结构

List
-Vector 同步安全,写多读少
-ArrayList 不安全
-Collections.synchronizedList(List list)基于synchronized,效率差

-CopyOnWriteArrayList i读多写少,基于复制机制,非阻塞

Set
-HashSet 不安全
-Collections.synchronizedSet(Setset)基于synchronized,效率差

-CopyOnWriteArraySet(基于CopyOnWriteArrayList实现)读多写少,非阻塞

Map
-Hashtable 同步安全,写多读少
-HashMap 不安全
-Collections.synchronizedMap(Map map)基于synchronized,效率差

-ConcurrentHashMap 读多写少,非阻塞
Queue & Deque

-ConcurrentLinkedQueue非阻塞

-ArrayBlockingQueue/LinkedBlockingQueue 阻塞

list

package org.example.Occur;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;public class ListTest {public static void main(String[] args) throws InterruptedException{//thread unsafeList<String> unsafeList = new ArrayList<String>();//thread safeList<String> safeList1 = Collections.synchronizedList(new ArrayList<String>());//thread safeCopyOnWriteArrayList<String> safeList2 = new CopyOnWriteArrayList<String>();ListThread t1 = new ListThread(unsafeList);ListThread t2 = new ListThread(safeList1);ListThread t3 = new ListThread(safeList2);for (int i = 0; i < 10; i++) {Thread t = new Thread(t1, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t2, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t3, String.valueOf(i));t.start();}//wait child thread doneThread.sleep(2000);System.out.println("listThread1.list.size()= "+t1.list.size());System.out.println("listThread2.list.size()= "+t2.list.size());System.out.println("listThread3.list.size()= "+t3.list.size());System.out.println("unsafelist:");for(String s : t1.list){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();System.out.println("safelist1:");for(String s : t2.list){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();System.out.println("safelist:");for(String s : t3.list){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();}
}class ListThread implements Runnable{public List<String> list;public ListThread(List<String> list){this.list=list;}@Overridepublic void run(){int i=0;while (i<10){try {Thread.sleep(10);}catch (InterruptedException e){e.printStackTrace();}//把当前的线程名称加入到listlist.add(Thread.currentThread().getName());//System.out.println(Thread.currentThread().getName());i++;}}
}

set

package org.example.Occur;import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;public class SetTest {public static void main(String[] args) throws InterruptedException{//thread unsafeSet<String> unsafeSet = new HashSet<String>();//thread safeSet<String> safeSet1 = Collections.synchronizedSet(new HashSet<String>());//thread safeCopyOnWriteArraySet<String> safeSet2 = new CopyOnWriteArraySet<String>();SetThread t1 = new SetThread(unsafeSet);SetThread t2 = new SetThread(safeSet1);SetThread t3 = new SetThread(safeSet2);for (int i = 0; i < 10; i++) {Thread t = new Thread(t1, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t2, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t3, String.valueOf(i));t.start();}//wait child thread doneThread.sleep(2000);System.out.println("listThread1.set.size()= "+t1.set.size());System.out.println("listThread2.set.size()= "+t2.set.size());System.out.println("listThread3.set.size()= "+t3.set.size());System.out.println("unsafelist:");for(String s : t1.set){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();System.out.println("safelist1:");for(String s : t2.set){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();System.out.println("safelist:");for(String s : t3.set){if (s == null){System.out.printf("null ");}else {System.out.printf(s+" ");}}System.out.println();}
}class SetThread implements Runnable{public Set<String> set;public SetThread(Set<String> set){this.set=set;}@Overridepublic void run(){int i=0;while (i<10){i++;try {Thread.sleep(10);}catch (InterruptedException e){e.printStackTrace();}//把当前的线程名称加入到listset.add(Thread.currentThread().getName()+i);}}
}

map

/*
package org.example.Occur;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;public class MapTest {public static void main(String[] args) throws InterruptedException{//thread unsafeMap<Integer,String> unsafeMap = new HashMap<Integer,String>();//thread safeMap<Integer,String> safeMap1 = Collections.synchronizedMap(new HashMap<Integer,String>());//thread safeConcurrentHashMap<Integer,String> safeMap2 = new ConcurrentHashMap<Integer,String>();MapThread t1 = new MapThread(unsafeMap);MapThread t2 = new MapThread(safeMap1);MapThread t3 = new MapThread(safeMap2);for (int i = 0; i < 10; i++) {Thread t = new Thread(t1, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t2, String.valueOf(i));t.start();}for (int i = 0; i < 10; i++) {Thread t = new Thread(t3, String.valueOf(i));t.start();}//wait child thread doneThread.sleep(2000);System.out.println("MapThread.map.size()= "+t1.map.size());System.out.println("MapThread1.map.size()= "+t2.map.size());System.out.println("MapThread2.map.size()= "+t3.map.size());System.out.println("unsafemap:");Iterator iter = t1.map.entrySet().iterator();while (iter.hasNext()){Map.Entry<Integer,String> entry = (Map.Entry<Integer, String>) iter.next();System.out.print(entry.getKey()+":");System.out.print(entry.getValue()+" ");}System.out.println();System.out.println("safemap1:");Iterator iter1 = t2.map.entrySet().iterator();while (iter1.hasNext()){Map.Entry<Integer,String> entry = (Map.Entry<Integer, String>) iter1.next();System.out.print(entry.getKey()+":");System.out.print(entry.getValue()+" ");}System.out.println();System.out.println("safemap2:");Iterator iter2 = t2.map.entrySet().iterator();while (iter2.hasNext()){Map.Entry<Integer,String> entry = (Map.Entry<Integer, String>) iter2.next();System.out.print(entry.getKey()+":");System.out.print(entry.getValue()+" ");}System.out.println();}
}class MapThread implements Runnable{private static int counter=0;public Map<Integer,String> map;public MapThread(Map<Integer,String> map){this.map=map;}@Overridepublic void run(){int i=0;Random rand = new Random();while (i<10){i++;try {Thread.sleep(10);}catch (InterruptedException e){e.printStackTrace();}//把当前的线程名称加入到listmap.put(++counter,Thread.currentThread().getName());}}
}
*/
Java 并发协作

线程协作
Thread/Executor/Fork-Join-线程启动,运行,结束-线程之间缺少协作
synchronized 同步-限定只有一个线程才能进入关键区-简单粗暴,性能损失有点大

Lock

Lock也可以实现同步的效果-实现更复杂的临界区结构-tryLock方法可以预判锁是否空闲-允许分离读写的操作,多个读,一个写
-性能更好
·ReentrantLock类,可重入的互斥锁 ReentrantReadWriteLock 可重入的读写锁  lock和unlock函数

package org.example.Occur;import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class LockExample {private static final ReentrantLock queueLock = new ReentrantLock();// 可重入 锁private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock();//读写锁/*** 有家奶茶店,点单有时需要排队* 假设想买奶茶的人如果看到需要排队,就决定不买* 又假设奶茶店有老板和多名员工,记单方式比较原始,只有一个订单本* 老板负责写新订单,员工不断地查看订单本得到信息来制作奶茶,在老板写新订单时员工不能看订单本* 多个员工可同时看订单本,在员工看时老板不能写新订单** @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {buyMilkTea();//handleOrder();}public void tryToBuyMilkTea() throws InterruptedException {boolean flag = true;while (flag) {if (queueLock.tryLock()) {//queueLock.lock();long thinkingTime = (long) (Math.random() * 500);Thread.sleep(thinkingTime);System.out.println(Thread.currentThread().getName() + ":来一杯珍珠奶茶,不要珍珠");flag = false;queueLock.unlock();} else {//System.out.println(Thread.currentThread().getName()+":"+ queueLock.getQueueLength()System.out.println(Thread.currentThread().getName() + ":再等等");}if (flag) {Thread.sleep(1000);}}}public static void buyMilkTea() throws InterruptedException {LockExample lockExample = new LockExample();int STUDENTS_CNT = 10;Thread[] students = new Thread[STUDENTS_CNT];for (int i = 0; i < STUDENTS_CNT; i++) {students[i] = new Thread(new Runnable() {@Overridepublic void run() {try {long walkingTime = (long) (Math.random() * 1000);Thread.sleep(walkingTime);lockExample.tryToBuyMilkTea();} catch (InterruptedException e) {System.out.println(e.getMessage());}}});students[i].start();}}public static void handleOrder() throws InterruptedException {LockExample lockExample = new LockExample();Thread boss = new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {lockExample.addOrder();long waitingTime = (long) (Math.random() * 1000);Thread.sleep(waitingTime);} catch (InterruptedException e) {System.out.println(e.getMessage());}}}});boss.start();int workerCnt = 3;Thread[] workers = new Thread[workerCnt];for (int i = 0; i < workerCnt; i++) {workers[i] = new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {lockExample.viewOrder();long workingTime = (long) (Math.random() * 5000);Thread.sleep(workingTime);} catch (InterruptedException e) {System.out.println(e.getMessage());}}}});workers[i].start();}}public static void addOrder() throws InterruptedException {orderLock.writeLock().lock();long writingTime = (long) (Math.random() * 1000);Thread.sleep(writingTime);System.out.println("老板新加一笔订单");orderLock.writeLock().unlock();}public static void viewOrder() throws InterruptedException {orderLock.readLock().lock();long readingTime = (long) (Math.random() * 500);Thread.sleep(readingTime);System.out.println(Thread.currentThread().getName() + ":查看订单本");orderLock.readLock().unlock();}}
 Semaphore


信号量,由1965年Dijkstra提出的信号量:本质上是一个计数器计数器大于0,可以使用,等于0不能使用可以设置多个并发量,例如限制10个访问
Semaphore  -acquire获取
-release释放
比Lock更进一步,可以控制多个同时访问关键区

package org.example.Occur;import java.util.concurrent.Semaphore;public class SemaphoreExample {private final Semaphore placesemaphore = new Semaphore(5);public boolean parking() throws InterruptedException {if (placesemaphore.tryAcquire()) {System.out.println(Thread.currentThread().getName() + ":停车成功");return true;} else {System.out.println(Thread.currentThread().getName() + ":没有空位");return false;}}public void leaving() throws InterruptedException {placesemaphore.release();System.out.println(Thread.currentThread().getName() + ":开走");}//10辆车 5个停车位public static void main(String[] args) throws InterruptedException {int tryToParkCnt = 10;SemaphoreExample semaphoreExample = new SemaphoreExample();Thread[] parkers = new Thread[tryToParkCnt];for (int i = 0; i < tryToParkCnt; i++) {parkers[i] = new Thread(new Runnable() {@Overridepublic void run() {try {long randomTime = (long) (Math.random() * 1000);Thread.sleep(randomTime);if (semaphoreExample.parking()) {long parkingTime = (long) (Math.random() * 1200);Thread.sleep(parkingTime);semaphoreExample.leaving();}} catch (InterruptedException e) {e.printStackTrace();}}});parkers[i].start();}for (int i = 0; i < tryToParkCnt; i++) {parkers[i].join();}}
}
Latch


等待锁,是一个同步辅助类用来同步执行任务的一个或者多个线程不是用来保护临界区或者共享资源
CountDownLatch
-countDown()计数减1
-await0)等待latch变成0

package org.example.Occur;import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {/*** 百米赛跑*/public static void main(String[] args) throws InterruptedException {int runnercnt = 10;CountDownLatch starsignal = new CountDownLatch(1);CountDownLatch doneSignal = new CountDownLatch(runnercnt);for (int i = 0; i < runnercnt; ++i)// create and start threadsnew Thread(new Worker(starsignal, doneSignal)).start();System.out.println("准备工作...");System.out.println("准备工作就绪");starsignal.countDown();//let all threads proceedSystem.out.println("比赛开始");doneSignal.await();//wait for all to finishSystem.out.println("比赛结束");}//Latch变成0以后,将唤醒所有在此 Latch上await的线程 解锁它们的await等待static class Worker implements Runnable {private final CountDownLatch startSignal;private final CountDownLatch doneSignal;Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {this.startSignal = startSignal;this.doneSignal = doneSignal;}public void run() {try {startSignal.await();doWork();doneSignal.countDown();} catch (InterruptedException ex) {}//return;}void doWork() {System.out.println(Thread.currentThread().getName() + ":跑完全程");}}
}

Barrier


·集合点,也是一个同步辅助类     允许多个线程在某一个点上进行同步
CyclicBarrier-构造函数是需要同步的线程数量   -await等待其他线程,到达数量后,就放行

package org.example.Occur;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class CyclicBarrierExample {//规定三行数 分别计算每一行的和  最后计算总和public static void main(String[] args) {final int[][] numbers = new int[3][5];final int[] results = new int[3];int[] row1 = new int[]{1, 2, 3, 4, 5};int[] row2 = new int[]{6, 7, 8, 9, 10};int[] row3 = new int[]{11, 12, 13, 14, 15};numbers[0] = row1;numbers[1] = row2;numbers[2] = row3;CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);//当有3个线程在barrier上await,就执行finalResultcalculatorfor (int i = 0; i < 3; i++) {CalculateEachRow rowCalculator = new CalculateEachRow(barrier, numbers, i, results);new Thread(rowCalculator).start();}}
}class CalculateEachRow implements Runnable {final int[][] numbers;final int rowNumber;final int[] res;final CyclicBarrier barrier;CalculateEachRow(CyclicBarrier barrier, int[][] numbers, int rowNumber, int[] res) {this.barrier = barrier;this.numbers = numbers;this.rowNumber = rowNumber;this.res = res;}@Overridepublic void run() {int[] row = numbers[rowNumber];int sum = 0;for (int data : row) {sum += data;res[rowNumber] = sum;}try {System.out.println(Thread.currentThread().getName() + ":计算第" + (rowNumber + 1) + "行结束,结束:" + res[rowNumber]);barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}
}class CalculateFinalResult implements Runnable {final int[] eachRowRes;int finalRes;public int getFinalResult() {return finalRes;}CalculateFinalResult(int[] eachRowRes) {this.eachRowRes = eachRowRes;}@Overridepublic void run() {int sum = 0;for (int data : eachRowRes) {sum += data;finalRes = sum;System.out.println("最终结果为:" + finalRes);}}
}
Phaser



允许执行并发多阶段任务,同步辅助类在每一个阶段结束的位置对线程进行同步,:当所有的线程都到达这步,再进行下一步。
Phaser
-arrive()
-arriveAndAwaitAdvance()

package org.example.Occur;import java.util.concurrent.Phaser;public class PhaserExample {//假设举行考试 总共三道大题 每次发一个题目  等所有学生完成后再进行下一题public static void main(String[] args) {int studentscnt = 5;Phaser phaser = new Phaser(studentscnt);for (int i = 0; i < studentscnt; i++) {new Thread(new Student(phaser)).start();}}
}class Student implements Runnable {private final Phaser phaser;public Student(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {try {doTesting(1);phaser.arriveAndAwaitAdvance();doTesting(2);phaser.arriveAndAwaitAdvance();doTesting(3);phaser.arriveAndAwaitAdvance();} catch (InterruptedException e) {e.printStackTrace();}}private void doTesting(int i) throws InterruptedException {String name = Thread.currentThread().getName();System.out.println(name + "开始答第" + i + "题");long thinkingTime = (long) (Math.random() * 1000);Thread.sleep(thinkingTime);System.out.println(name + "第"+ i +"道题答题结束");}
}
Exchanger



允许在并发线程中互相交换消息允许在2个线程中定义同步点,当两个线程都到达同步点它们交换数据结构
Exchanger
-exchange0,线程双方互相交互数据
-交换数据是双向的

package org.example.Occur;import java.util.Scanner;
import java.util.concurrent.Exchanger;public class ExchangerExample {//学生成绩查询 简单线程问数据的交换public static void main(String[] args) throws InterruptedException {Exchanger<String> exchanger = new Exchanger<String>();BackgroundWorker worker = new BackgroundWorker(exchanger);new Thread(worker).start();Scanner scanner = new Scanner(System.in);while (true) {System.out.println("输入要查询的属性学生姓名:");String input = scanner.nextLine().trim();exchanger.exchange(input);//把用户输入传递给线程String value =exchanger.exchange(null);//拿到线程反馈结果if("exit".equals(value)){break;}System.out.println("查询结果:"+value);}scanner.close();}
}
//当两个线程都同时执行到同一个  exchanger的exchange方法,两个线程就互相交换数据,交换是双向的,
class BackgroundWorker implements Runnable {final Exchanger<String> exchanger;BackgroundWorker(Exchanger<String> exchanger) {this.exchanger = exchanger;}@Overridepublic void run() {while (true) {try {String item = exchanger.exchange(null);switch (item) {case "zhangsan":exchanger.exchange("90");break;case "lisi":exchanger.exchange("80");break;case "wangwu":exchanger.exchange("70");break;case "exit":exchanger.exchange("exit");break;default:exchanger.exchange("查无此人");break;}} catch (InterruptedException e) {e.printStackTrace();}}}
}
定时任务

简单定时器机制


-设置计划任务,也就是在指定的时间开始执行某一个任务。
-TimerTask 封装任务
-Timer类定时器

package org.example.Occur;import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;public class TimerTest {public static void main(String[] args) throws InterruptedException {MyTask task = new MyTask();Timer timer = new Timer();System.out.println("当前时间:" + new Date().toLocaleString());//当前时间1秒后,每2秒执行一次timer.schedule(task, 1000, 2000);Thread.sleep(10000);task.cancel();//取消当前的任务System.out.println("======================== ==");Calendar now = Calendar.getInstance();now.set(Calendar.SECOND, now.get(Calendar.SECOND) + 3);Date runDate = now.getTime();MyTask2 task2 = new MyTask2();timer.scheduleAtFixedRate(task2, runDate, 3000);//定速率Thread.sleep(20000);timer.cancel();//取消定时器}
}class MyTask extends TimerTask {public void run() {System.out.println("运行了! 时间为" + new Date());}
}class MyTask2 extends TimerTask {public void run() {System.out.println("运行了! 时间为" + new Date());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}}
}
Executor+定时器机制

ScheduledExecutorService    -定时任务   -周期任务

package org.example.Occur;import java.util.Date;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduledExecutorTest {public static void main(String[] args) throws Exception {//executeAtFixTime();executeFixedRate();//executeFixedDelay();}public static void executeAtFixTime() throws Exception{ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.schedule(new MyTask3(),1, TimeUnit.SECONDS);Thread.sleep(20000);executor.shutdown();}public static void executeFixedRate() throws Exception{ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleAtFixedRate(new MyTask3(),1,3000, TimeUnit.MICROSECONDS);Thread.sleep(20000);executor.shutdown();}public static void executeFixedDelay() throws Exception{ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleWithFixedDelay(new MyTask3(),1,3000, TimeUnit.MICROSECONDS);Thread.sleep(20000);executor.shutdown();}}class MyTask3 extends TimerTask {public void run() {System.out.println(" 时间为" + new Date());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}}
}
Quartz


-Quartz是一个较为完善的任务调度框架-解决程序中Timer零散管理的问题

-功能更加强大
·Timer执行周期任务,如果中间某一次有异常,整个任务终止执行不影响下次任务执行

package org.example.Occur;import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;public class QuartzTest {public static void main(String[] args) {try {//创建schedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();//定义一个TriggerTrigger trigger = newTrigger().withIdentity("trigger1", "group1")  //定义name/group.startNow()//-日加入scheduler,立即生效.withSchedule(simpleSchedule()   //使用simpleTrigger.withIntervalInSeconds(2)//每隔2秒执行一次.repeatForever())//一直执行.build();//定义一个JobDetai1JobDetail job = newJob(HelloJob.class) //定义Job类为HelloQuartz类.withIdentity("job1","group1") //定义name/group.usingJobData("name","quartz").build();scheduler.scheduleJob(job,trigger);scheduler.start();Thread.sleep(10000);scheduler.shutdown(true);} catch (InterruptedException e) {e.printStackTrace();} catch (SchedulerException e) {throw new RuntimeException(e);}}
}

 

package org.example.Occur;import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;import java.util.Date;public class HelloJob implements Job {public void execute(JobExecutionContext context) throws JobExecutionException {JobDetail detail = context.getJobDetail();String name = detail.getJobDataMap().getString("name");System.out.println("hello from " + name + " at " + new Date());}
}


http://www.mrgr.cn/news/5845.html

相关文章:

  • 图像识别,图片线条检测
  • WRF-LES与PALM微尺度气象大涡模拟
  • 如何处理前端项目中的SEO优化:从SPA到SSR与SSG
  • 【前端基础篇】CSS基础速通万字介绍(下篇)
  • 贪吃蛇(Qt版)
  • 【ESP-IDF FreeRTOS】任务管理
  • Java蛋糕店烘焙店系统小程序系统源码
  • Flink 流转表,表转流,watermark设置
  • fiddler在软件测试中的使用(详细版)
  • 6款智能改写软件,一键改写文章瞬间提升原创质量
  • 007 SpringCloudAlibaba基础使用(nacos,gateway)
  • 上传PDF、DOC文件到SAP HCM系统中案例
  • 信息学奥赛初赛天天练-73-NOIP2016普及组-基础题4-枚举法、放苹果、统筹方法
  • 学习笔记第三十天
  • 点灯案例练习(基于寄存器)
  • Bigtop 从0开始(下)
  • 如何通过数据互通提升销售效率与客户满意度
  • GraphRAG + Ollama 本地部署全攻略:避坑实战指南
  • 二分类交叉熵与多分类交叉熵详解及实例计算
  • 手动修改zk类型的kafka offset