原创

java-并发


重量级锁进阶

无锁001
-偏向锁101
-轻量级锁01(mark word)(批量重偏向20,批量撤销40)
-重量级锁(monitor)10两阶段终止模式

设计模式-两阶段终止模式

public class demo2 {


public static void main(String[] args) throws InterruptedException {
    eye eye = new eye();
    eye.start();
    System.out.println("开始执行任务");
    Thread.sleep(35000);
    eye.stop();

}
}
class eye{

private Thread eye;

//执行线程
public void start(){
    eye=new Thread(()->{
        while (true){
            boolean interrupted = Thread.currentThread().isInterrupted();
            if (interrupted){
                System.out.println("料理后事");
                break;
            }
            try {
                Thread.sleep(1000);//情况1
                System.out.println(Thread.currentThread().getName()+" :执行监控中。。。");//情况2
            } catch (InterruptedException e) {
                e.printStackTrace();
                //重新打断当前线程
                Thread.currentThread().interrupt();
            }
        }
    });
    eye.start();
}

//打断线程
public void stop(){
    eye.interrupt();
}

}

两阶段终止模式-volatile

一个修改主存 多线程访问

public class demo15 {

	public static void main(String[] args) {

		Moni moni = new Moni();

		new Thread(()->{
			moni.start();
		}).start();

		new Thread(()->{
			try {
				Thread.sleep(200);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			moni.stop();
		}).start();
	}

}
class  Moni{

	private volatile boolean flag=false;

	public void start(){

		Thread thread = Thread.currentThread();

		while (true){

			if (flag){
				System.out.println(thread.getName()+"结束");
				break;
			}
			System.out.println(thread.getName()+"执行监控");
		}
	}
	public void stop(){

		flag=true;
	}
}

正确使用 wait-notifyall

	synchronized(lock){
		while(条件不成立)}
		{
		lock.wait();
		}
		//干活
	}
	//另外一个线程
	synchronized(lock){
		lock.notifyall();//全部唤醒 while判断
	}

设计模式-保护性暂停

public class demo8 {

public static void main(String[] args) {

 	Lock lock = new Lock();
    //t1线程获取数据
    new Thread(()->{
        try {
            System.out.println(Thread.currentThread().getName()+"获取数据");
            List<String> o = (List<String>) lock.get();
            System.out.println(Thread.currentThread().getName()+"数据大小:"+o.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    },"t1").start();
    //t2线程下载数据
    new Thread(()->{
        System.out.println(Thread.currentThread().getName()+"下载数据");
        lock.success(new DownUtil().getInfo());
    },"t2").start();

}
}
class Lock{

private Object response;
public Object get() throws InterruptedException {

    synchronized (this){
        while (this.response==null){
            //进入 waitset 等待区
            this.wait();
        }
        return response;
    }
}

public void success(Object response){

    synchronized (this){
        this.response=response;
        //monitor 全部唤醒
        this.notifyAll();
    }
}

}
class DownUtil{

public List<String> getInfo(){

    List<String> strings = new ArrayList<>();
    try {
        // 创建URL对象
        URL url = new URL("https://world.taobao.com/");
        // 打开连接
        URLConnection connection = url.openConnection();
        // 使用字符流读取数据
        BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8));
        String line;
        while ((line = reader.readLine()) != null) {
            strings.add(line);
//                System.out.println(line);
        }
        // 关闭流
        reader.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
    return strings;
}
}

设计模式-保护性暂停-扩展-解耦等待和生产

public class demo9 {

	public static void main(String[] args) throws InterruptedException {

		for (int i = 0; i < 3; i++) {
			new People().start();
		}
		Thread.sleep(1000);
		for (Integer integer : MailBox.getMiddleWareID()) {
			new PostMan(integer, "内容" + integer).start();
		}


	}

}

class MailBox {

	private static int id;

	/**
	 * 存放 对象集合
	 */
	private static Map<Integer, MiddleWare> list = new Hashtable<>();

	public static synchronized int generateId() {
		return id++;
	}

	/**
	 * 创建中间对象
	 *
	 * @return
	 */
	public static MiddleWare createMiddleWare() {
		MiddleWare middleWare = new MiddleWare(generateId());
		list.put(middleWare.getId(), middleWare);
		return middleWare;
	}

	public static Set<Integer> getMiddleWareID() {
		return list.keySet();
	}

	/**
	 * @param id
	 * @return 获取并删除
	 */
	public static MiddleWare getById(Integer id) {
		return list.remove(id);
	}


}

class MiddleWare {

	private int id;

	public MiddleWare(int id) {
		this.id = id;
	}

	public int getId() {
		return id;
	}

	private Object response;

	/**
	 * @return 获取对象
	 * @throws InterruptedException
	 */
	public Object getInfo() throws InterruptedException {
		synchronized (this) {
			while (this.response == null) {
				this.wait();
			}
			return this.response;
		}
	}

	/**
	 * 创建对象
	 *
	 * @param response
	 */
	public void createInfo(Object response) {
		synchronized (this) {
			this.response = response;
			this.notifyAll();
		}
	}

}

class People extends Thread {

	@Override
	public void run() {

		MiddleWare middleWare = MailBox.createMiddleWare();
		try {
			System.out.println(Thread.currentThread().getName() + "开始收信id: " + middleWare.getId());
			Object info = middleWare.getInfo();
			System.out.println(Thread.currentThread().getName() + "开始收信id: " + middleWare.getId() + "收信内容: " + info.toString());
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

class PostMan extends Thread {

	private int id;
	private String msg;

	public PostMan(int id, String msg) {
		this.id = id;
		this.msg = msg;
	}

	@Override
	public void run() {
		MiddleWare byId = MailBox.getById(id);
		System.out.println("送信id: " + id + "送信内容: " + msg);
		byId.createInfo(msg);
	}

}
	结果:

	Thread-2开始收信id: 2
	Thread-1开始收信id: 1
	Thread-0开始收信id: 0
	送信id: 0送信内容: 内容0
	送信id: 1送信内容: 内容1
	送信id: 2送信内容: 内容2
	Thread-2开始收信id: 2收信内容: 内容2
	Thread-0开始收信id: 0收信内容: 内容0
	Thread-1开始收信id: 1收信内容: 内容1
	一一对应

设计模式-生产者消费者

消息类
消息队列类(容器,限制,消费方法,存入方法)
多线程
public class demo10 {


public static void main(String[] args) {

//消息队列 5条
Queue queue = new Queue(5);

//7个消息生产线程
for (int i = 0; i < 7; i++) {
int finalI = i;
new Thread(() -> {
try {
queue.put(new Message(finalI, "内容" + finalI));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者" + finalI).start();
}
//2个消息消费线程
for (int i = 0; i < 2; i++) {
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}

}, "消费者").start();
}

}

}

/**
* 消息队列 容器(链表 消息类) 队列最大值 消费/存入消息方法
*/
class Queue {

private LinkedList<Message> messagesQueue = new LinkedList<>();

private final int max;

public Queue(int max) {
this.max = max;
}

/**
* @return 消费消息
* @throws InterruptedException
*/
public Message take() throws InterruptedException {
//先拿到锁 为空就等待阻塞
synchronized (messagesQueue) {
while (messagesQueue.isEmpty()) {
System.out.println("队列空,消费者等待着");
messagesQueue.wait();
}
//没有为空就取第一个 唤醒
Message message = messagesQueue.removeFirst();
messagesQueue.notifyAll();
System.out.println("消费者已经消费消息,id: " + message.getId() + "内容: " + message.getMsg().toString());
return message;
}
}

/**
* @param 存入消息
* @throws InterruptedException
*/
public void put(Message message) throws InterruptedException {
//如果队列满了 就阻塞等待
synchronized (messagesQueue) {
while (max == messagesQueue.size()) {
System.out.println("队列满,生产者等待着");
messagesQueue.wait();
}

//否则消息加入队列 唤醒
messagesQueue.addLast(message);
System.out.println("生产者已经生产消息 id: " + message.getId() + "内容: " + message.getMsg());
messagesQueue.notifyAll();
}

}

}

/**
* 消息类
*/
class Message {

private int id;

private Object msg;

public Message(int id, Object msg) {
this.id = id;
this.msg = msg;
}

public int getId() {
return id;
}

public Object getMsg() {
return msg;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", msg=" + msg +
'}';
}
}
  	结果:
	生产者已经生产消息 id: 5内容: 内容5
	生产者已经生产消息 id: 0内容: 内容0
	生产者已经生产消息 id: 3内容: 内容3
	生产者已经生产消息 id: 6内容: 内容6
	生产者已经生产消息 id: 1内容: 内容1
	队列满,生产者等待着
	队列满,生产者等待着
	消费者已经消费消息,id: 5内容: 内容5
	生产者已经生产消息 id: 2内容: 内容2
	消费者已经消费消息,id: 0内容: 内容0
	生产者已经生产消息 id: 4内容: 内容4

设计模式-固定运行顺序-synchronized wait-notifiy

public class demo12 {

	//对象锁
	private static Object lock = new Object();
	// flag
	private static boolean isRun =false;

	public static void main(String[] args) throws InterruptedException {

		new Thread(()->{
			synchronized (lock){
				while (!isRun){
					try {
						lock.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				System.out.println(Thread.currentThread().getName()+": 1");
			}

		},"t1").start();

		Thread.sleep(1000);

		new Thread(()->{
			synchronized (lock){
				System.out.println(Thread.currentThread().getName()+": 2");
				isRun=true;
				lock.notifyAll();
			}
		},"t2").start();
	}
}

设计模式-固定运行顺序-ReentrantLock await-single

public class demo12 {

	//ReentrantLock 锁
	private static ReentrantLock lock =new ReentrantLock();
	//等待区
	private static  Condition condition = lock.newCondition();
	// flag
	private static boolean isRun =false;

	public static void main(String[] args) throws InterruptedException {

		new Thread(()->{
			lock.lock();
			try {
				while (!isRun){
					try {
						//等待区睡眠
						condition.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				System.out.println(Thread.currentThread().getName()+": 1");
			}finally {
				lock.unlock();
			}
		},"t1").start();

		Thread.sleep(1000);

		new Thread(()->{
			lock.lock();
			try {
				System.out.println(Thread.currentThread().getName()+": 2");
				isRun=true;
				//唤醒
				condition.signalAll();
			}finally {
				lock.unlock();
			}
		},"t2").start();
	}
}

设计模式-固定运行顺序-park unpark

public class demo13 {

	public static void main(String[] args) {


		Thread t1 = new Thread(() -> {
			LockSupport.park();
			System.out.println(Thread.currentThread().getName() + ": 1");
		}, "t1");
		t1.start();


		new Thread(()->{
			System.out.println(Thread.currentThread().getName()+": 2");
			LockSupport.unpark(t1);
		},"t2").start();
	}

}

设计模式-交替输出-synchronized wait-notify

public class demo14 {

	public static void main(String[] args) {

		WaitClass waitClass = new WaitClass(1, 3);

		new Thread(()->{
			try {
				waitClass.print("a",1,2);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"a").start();

		new Thread(()->{
			try {
				waitClass.print("b",2,3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"b").start();

		new Thread(()->{
			try {
				waitClass.print("c",3,1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"c").start();

	}
}

/**
 * 设计思路 flag标记 sum循环次数 加锁 唤醒 方式
 */
class WaitClass{

	private int flag;

	private int sum;

	public WaitClass(int flag, int sum) {
		this.flag = flag;
		this.sum = sum;
	}

	public void print(String str,int waitFlag,int nextFlag) throws InterruptedException {
		for (int i = 0; i < sum; i++) {
			//加锁
			synchronized (this){
				//如果不相等就阻塞
				while (flag!=waitFlag){
					this.wait();
				}
				//否则就打印修改flag唤醒
				System.out.print(str);
				flag=nextFlag;
				this.notifyAll();
			}
		}
	}
}

juc-并发工具

自定义线程池

package jucTool;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: 曾豪杰
 * @Date: 2023/09/06/11:05
 * @Description:
 */
public class juc2 {

	public static void main(String[] args) {


		ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.SECONDS, 10);
		for (int i = 0; i < 5; i++) {
			int j=i;
			threadPoll.execute(()->{
				System.out.println(j);
			});
		}

	}

}
/**
* @author: zhj
* @date: 2023/9/6
* @email: 1965411732@qq.com
* @description: 自定义线程池
*/
class ThreadPoll{

	/**
	 * 任务队列
	 */
	private WaitingQueue<Runnable> runnableWaitingQueue;

	/**
	 * 线程集合
	 */
	private HashSet<Work> works=new HashSet<>();

	/**
	 * 线程数
	 */
	private int coreSize;

	/**
	 * 超时时间
	 */
	private long timeOut;

	/**
	 * 时间单位
	 */
	private TimeUnit timeUnit;


	public void execute(Runnable task){
		//当任务数没有超过coreSize,直接交给work对象执行
		//当任务数满了,加入队列暂存
	  synchronized (works){
		  if (works.size()<coreSize){
			  Work work = new Work(task);
			  System.out.println("新增work:"+work+"task: "+task);
			  works.add(work);
			  work.start();
		  }else {
			  System.out.println("加入任务队列:"+task);
			  runnableWaitingQueue.put(task);
		  }
	  }


	}

	public ThreadPoll(int coreSize, long timeOut, TimeUnit timeUnit,int size) {
		this.coreSize = size;
		this.timeOut = timeOut;
		this.timeUnit = timeUnit;
		this.runnableWaitingQueue=new WaitingQueue<>(size);
	}

	/**
	* @author: zhj
	* @date: 2023/9/6
	* @email: 1965411732@qq.com
	* @description: 工作线程执行方法
	*/
	class Work extends Thread{
		private Runnable task;

		public Work(Runnable task){
			this.task=task;
		}

		@Override
		public void run() {
		   //执行任务
			//当task不为空 执行任务
			//当task执行完毕 再接着从任务队列获取任务并且执行
			while (task!=null|| (task=runnableWaitingQueue.take())!=null) {
				try {
					System.out.println("正在执行..."+task);
					task.run();
				}catch (Exception e){
					e.printStackTrace();
				}finally {
					task=null;
				}
			}
			synchronized (works){
				System.out.println("work被移除"+this);
				works.remove(this);
			}
		}
	}
}

/**
* @author: zhj
* @date: 2023/9/6
* @email: 1965411732@qq.com
* @description: 线程队列
*/
class WaitingQueue<T>{

	private Deque<T> arrayDeque = new ArrayDeque<>();

	private  ReentrantLock lock = new ReentrantLock();

	/**
	 * 生产者等待
	 */
	private Condition producerWaiting = lock.newCondition();
	/**
	 * 消费者等待
	 */
	private Condition consumerWaiting = lock.newCondition();

	private int size;

	public WaitingQueue(int size) {
		this.size = size;
	}

	public int getSize(){
		lock.lock();
		try {
			return this.size;
		}finally {
			lock.unlock();
		}

	}

	/**
	* @author: zhj
	* @date: 2023/9/6
	* @email: 1965411732@qq.com
	* @description: 带超时的take
	*/
	public T poll(long time, TimeUnit timeUnit){
		lock.lock();
		try {
			long nanos = timeUnit.toNanos(time);
			//如果为空
			while (arrayDeque.isEmpty()){
				try {
					//如果剩余等待时间小于0,无需等待
					if(nanos<=0){
					   return null;
					}
					//获取剩余等待时间
					nanos = consumerWaiting.awaitNanos(nanos);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			producerWaiting.signal();
			return arrayDeque.removeFirst();
		} finally {
			lock.unlock();
		}
	}

	/**
	* @author: zhj
	* @date: 2023/9/6
	* @email: 1965411732@qq.com
	* @description: 从队列拿线程
	*/
	public T take(){
		lock.lock();
		try {
			//如果为空
			while (arrayDeque.isEmpty()){
				try {
					consumerWaiting.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			producerWaiting.signal();
			return arrayDeque.removeFirst();
		} finally {
			lock.unlock();
		}
	}

	public void put(T t){
		lock.lock();
		try {
			while (arrayDeque.size()==size){
				try {
					producerWaiting.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			arrayDeque.addLast(t);
			consumerWaiting.signal();
		}finally {
			lock.unlock();
		}
	}
}
总结
  • 作者:阿杰(联系作者)
  • 发表时间:2023-08-29T16:32:50
  • 版权声明:杰出版
  • 公众号:--无
  • 评论