当前位置: 首页 > news >正文

Rust : FnOnce与线程池

一、问题:mpsc如何发送各类不同的函数?

use std::sync::{mpsc,Arc,Mutex};
use std::thread;
fn process<F>(old:f32,name:String,f:F) where F: FnOnce(f32,String) {f(old,name);
}
fn add_f32(a:f32,b:f32) ->f32{a+b
}
fn doit(amount: f32,code:String){println!("amount:{:?} code:{:?}",amount,code);
}
fn workit(amount:f32){println!("amount:{:?}",amount);
}
fn test(){process(2.0, "b".into(), doit);let fn1: Box<dyn FnOnce()> = Box::new(move || doit(2.0,"hello cat!".into()));fn1();let fn2: Box<dyn FnOnce(f32,String)>  = Box::new(move |amount:f32,code:String| doit(amount,code));fn2(2.0,"hello john!".into());let fn3:Box<dyn FnOnce(f32,String)> = Box::new(move |amount:f32,code:String| process(amount,code,doit));fn3(2.0,"hello rose!".into());let fn4: Box<dyn FnOnce()>  = Box::new(move ||workit(3.0));fn4();let fn5: Box<dyn FnOnce()>  = Box::new(move ||println!("hello world!"));fn5();// 带类型返还的结构let fn6:Box<dyn FnOnce()->f32> = Box::new(move ||add_f32(1.0,2.0));fn6();
}
// 定义闭包中没有参数输入的函数类型,做为发送对象
type box_fn = Box<dyn FnOnce() + Send>;
fn main(){//test();let (sender, receiver) = mpsc::channel::<box_fn>();let receiver = Arc::new(Mutex::new(receiver));// 注意:||中均没有参数,故是FnOnce(); 具体参考test()let vec_fn:Vec<Box<dyn FnOnce()+Send>> = vec![Box::new(move ||println!("hello world!")),Box::new(move ||workit(3.0)),Box::new(move || doit(2.0,"hello cat!".into()))];for f in vec_fn {sender.send(f).unwrap();}let rx = receiver.clone();let handle = thread::spawn(move ||{loop{let box_fn  = rx.lock().unwrap().recv().unwrap() ;println!("from son thread!");box_fn();}});println!("main thread sended all box_fn!");handle.join().unwrap();}

二、线程池的应用:发送函数有什么用处?

如果需要让每一个函数都分配一个线程来执行这些函数(任务),或者用一个线程池来执行函数,这个时侯就可以用上场了。

在线程池中,FnOnce是一个其中的灵魂。他可以把所有的函数进行抽象统一,便一管理和执行。

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;
// 存在不均衡的情况 => steal work todo!
pub struct ThreadPool {threads_num: usize,workers: Vec<Worker>,sender: mpsc::Sender<Message>,
}
// 定义不同类型的函数及参数
enum StrategyFn
{T(Box<dyn FnOnce() + Send +'static>),U(Box<dyn FnOnce(f32, f32) + Send +'static>,Arc<f32>,Arc<f32>),W(Box<dyn FnOnce(Vec<f32>, f32) + Send +'static>,Arc<Vec<f32>>,Arc<f32>),
}//fn get_fn(strategy_fn: StrategyFn) {}
enum Message{Task(Task),Shutdown,
}
struct Task{job :Job,task_id :usize,
}type Job = Box<dyn FnOnce() + Send +'static>;impl ThreadPool {pub fn new(threads_num: usize) -> ThreadPool {assert!(threads_num > 0);let (sender, receiver) = mpsc::channel::<Message>();let receiver = Arc::new(Mutex::new(receiver));let mut workers = Vec::with_capacity(threads_num);for id in 0..threads_num {workers.push(Worker::new(id, Arc::clone(&receiver)));}ThreadPool { threads_num,workers, sender }}pub fn execute2<F>(&self, task_id:usize,f: F, input1: Arc<f32>, input2: Arc<f32>)whereF: FnOnce(&f32, &f32) + Send +'static,{let job = Box::new(move || f(&input1,&input2));let task = Task{job:job,task_id:task_id};self.sender.send(Message::Task(task)).unwrap();}// 可以发送不同种类参数的函数,并安排执行pub fn execute(&self, task_id:usize,strategy_fn: StrategyFn){let mut job:Box<dyn FnOnce()+Send +'static> = Box::new(move||{});match strategy_fn{StrategyFn::U(f, input1, input2) => {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},StrategyFn::T(f) =>{job = Box::new(move || f());}StrategyFn::W(f,input1,input2)=> {job = Box::new(move || f(input1.as_ref().clone(),input2.as_ref().clone()));},}let task = Task{job:job,task_id:task_id};self.sender.send(Message::Task(task)).unwrap();}}pub struct Worker {id: usize,thread: Option<thread::JoinHandle<()>>,
}impl Worker {pub fn new(work_id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {let thread = thread::spawn(move || loop {let msg = receiver.lock().unwrap().recv().unwrap();match msg{Message::Task(task) => {println!("Worker {} got a task; executing task {}.", work_id,task.task_id);(task.job)();},Message::Shutdown => {println!("Worker {} received shutdown message.", work_id);break;//很关键}}});Worker {id:work_id,thread: Some(thread),}}
}impl Drop for ThreadPool {fn drop(&mut self) {for _ in 0..self.threads_num{self.sender.send(Message::Shutdown).unwrap();}println!("drop worker :{:?}",self.workers.len());for (i,worker) in (&mut self.workers).into_iter().enumerate() {println!("------Shutting down worker {}  i:{} -----------", worker.id,i);if let Some(thread) = worker.thread.take() {thread.join().unwrap();}}}
}
macro_rules! create_strategy{($($s:ident),*) => ($(pub fn $s(_a:f32,_b:f32){println!("run strategy {:?}",stringify!($s));thread::sleep(Duration::from_millis(1));})*);
}
create_strategy!(A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q);
pub fn main() {let pool = ThreadPool::new(2);let input1 = Arc::new(1.0);let input2 = Arc::new(2.0);let strategies = vec![A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q];for (task_id,strategy) in strategies.into_iter().enumerate(){let strategy_fn = StrategyFn::U(Box::new(strategy),input1.clone(), input2.clone());pool.execute(task_id as usize,strategy_fn);}
}

http://www.mrgr.cn/news/51342.html

相关文章:

  • 恺撒密码/置换密码案例
  • UltraISO(软碟通)制作U盘制作Ubuntu20.04启动盘
  • 基于SSM的大学学术交流论坛【附源码】
  • 3.Three.js程序基本框架结构和API说明
  • Unity之XR Interaction Toolkit 射线拖拽3DUI
  • 自适应过滤法—初级
  • KNN算法及KDTree树
  • 数据分析分段折线图
  • 【C++常见错误】0xC0000005: 读取位置 0x00000000 时发生访问冲突
  • .Net的潘多拉魔盒开箱即用,你学废了吗?
  • 【面经】2024年软件测试面试题,精选100 道(附答案)
  • OpenGauss学习笔记
  • 【开源】Appium:自动化移动应用测试的强大工具
  • 10月报名 | 海克斯康Adams二次开发培训
  • 前端全栈混合之路Deno篇:Deno 2.0 的权限系统详解和多种权限配置权限声明方式 -一次性搞懂和学会用
  • vulhub复现记录
  • 面试记录一
  • 概率测试:用随机性来发现难以复现的问题
  • STM32 QSPI接口驱动GD/W25Qxx配置简要
  • 瞬时存取,无限可能:顺序表的独特魅力