一文详解响应式编程springwebflux
1.java Stream流式编程
函数式接口
/***函数式接口:* 接口中有且仅有一个未实现的方法,这个接口就叫函数式接口;* 只要是函数式接口就可以用Lambda表达式简化;*/
package com.reactors.lambda;
@FunctionalInterface //该注解会检查你写的接口是否符合函数式接口规范
interface MyInterface{int sum(int i,int j);
}interface Myhaha{int ha();default int heihei(){ //默认实现return 2;};
}interface Myhehe{int he(int i);
}//1.自己写实现类,实现MyInterfaces接口 (冗余写法)
//2.创建匿名实现类 (冗余写法)
//3.lambda表达式:语法糖 (参数+箭头+方法体)public class Lamda {public static void main(String[] args) {MyInterface myInterface=new MyInterface() {@Overridepublic int sum(int i, int j) {return i*i+j*j;}};System.out.println("myInterface.sum(2,3) = " + myInterface.sum(2, 3));//lambda 完整写法MyInterface myInterface2= (int i,int j)->{return i*i+j*j;};System.out.println("myInterface2.sum(2,3) = " + myInterface2.sum(2, 3));//lambda简化写法:1. 参数类型可以不写,只写(参数名),参数变量名随意定义;参数表最少可以只有一个(),或只有一个参数MyInterface myInterface3= (i,j)->{return i*i+j*j;};System.out.println("myInterface2.sum(2,3) = " + myInterface3.sum(2, 3));//参数位置最少情况Myhaha myhaha=()->{return 1;};System.out.println("myhaha.ha() = " + myhaha.ha());Myhehe myhehe= a->{return a*a;};System.out.println("myhehe.he(5) = " + myhehe.he(5));//lambda简化写法2.方法体如果只有一句话,{}可以省略Myhehe myhehe2= y-> y+1;System.out.println("myhehe2.he(7) = " + myhehe2.he(7));}}package com.reactors.lambda;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;public class FunctionDemo {public static void main(String[] args) {//1.定义提供者Supplier<String> supplier=()->"43";//supplier.get()//2.断言:验证是否一个数字Predicate<String> isNumber= str->str.matches("-?\\d+(\\d+)?");//isNumber.test("777")//3.转换器:把字符串转换为数字Function<String,Integer> change=Integer::parseInt;//4.消费者Consumer<Integer> consumer= integer -> {if(integer%2==0){System.out.println("偶数:"+integer);}else{System.out.println("奇数:"+integer);}};//串4个函数:判断奇偶性if(isNumber.test(supplier.get())){//说明是数字String s=supplier.get();change.apply(s);consumer.accept(change.apply(supplier.get()));}else{//不是数字System.out.println("非法的数字");}}
}
lambda表达式
/*** lambda表达式的使用场景:* 1.调用一个方法传入参数,这个参数实例是一个接口对象 ,且只定义了一个方法,就采用lambda简化*/package com.reactors.lambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.function.*;public class LamdaAsArgument {public static void main(String[] args) {List<String> names=new ArrayList<>();names.add("alice");names.add("Bob");names.add("Charlie");names.add("David");//使用Lambda表达式作为参数传递给sort方法,默认比较从小到大
// Collections.sort(names,(a,b)->a.compareTo(b));
// names.forEach(System.out::println);System.out.println("========================================");//类::方法; 引用类中的实例方法 默认比较从小到大Collections.sort(names,String::compareTo);names.forEach(System.out::println);//创建线程new Thread(()->{System.out.println("hello world~!");}).start();//函数式接口,参数的几种形式//有入参,无出参BiConsumer<String,String> consumer=(a,b)->{System.out.println("hello:"+a+";hell:"+b);};consumer.accept("Jim","Elon mask");//有入参,有出参Function<String,Integer> function= (String x)->Integer.parseInt(x);System.out.println("function.apply(\"4\") = " + function.apply("4"));//无入参,无出参Runnable runnable=()-> System.out.println("hello world");new Thread(runnable).start();//无入参,有出参Supplier<String> supplier=()-> UUID.randomUUID().toString();System.out.println("supplier.get() = " + supplier.get());Predicate<Integer> even=(t)-> t%2 ==0;//正向判断System.out.println("正向判断奇偶性: " + even.test(7));//反向向判断System.out.println("反向判断奇偶性= " + even.negate().test(7));}}
Stream
package com.reactors.lambda;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
/*** 流的特性:* 1).流是lazy的,不用,方法不会被调用* StreamAPI:* 1)把数据封装成流,要到数据流; 集合类.stream* 2)定义流式操作* 3)获取最终结果*/
public class StreamDemo {public static void main(String[] args) {//1.选出最大的偶数Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).filter(i -> i % 2 == 0).max(Integer::compareTo).ifPresent(System.out::println);List<Integer> list = List.of(1, -2, 3, 4, 5, 6, 7, 8, 9, 10);list.stream().filter(val->{System.out.println("正在过滤:"+val);return val%2==0;})//过滤出希望的值,如果断言返回true是期望的.max(Integer::compareTo)//max 终止操作.ifPresent(System.out::println);//流的三大部分: 1.数据流 2.N个中间操作 3.终止操作//1.数据流//1).创建流Stream<Integer> stream=Stream.of(1,2,3);Stream<Integer> concat=Stream.concat(Stream.of(4,5,6),stream);Stream<Object> build = Stream.builder().add("11").add("22").build();//2). 从集合容器中获取流,List/set/mapList<Integer> list1=List.of(1,2);Stream<Integer> stream1 = list1.stream();Set<Integer> set = Set.of(1, 2);Stream<Integer> stream2 = set.stream();Map<String, Integer> of = Map.of("aa",2,"bb",44);Stream<String> keyStream = of.keySet().stream();Stream<Integer> valueStream = of.values().stream();System.out.println("主线程:"+Thread.currentThread());//2.流的中间操作//流默认也是用for循环单线程的处理,非并发操作。除非使用parallel(),才能并发操作//有状态数据aa,将产生并发线程安全问题,千万别在流的一次操作中进行改变状态操作如:add或update,deleteList<Integer> aa=new ArrayList<>();//流的所有操作都是无状态;数据状态仅在函数内有效,不溢出到函数外;流的所有操作是独立long count=Stream.of(1,3,4,5,6).parallel()//流进行并行操作.filter(i->{//aa.add(7); //千万别在流的一次操作中进行System.out.println("filter线程:"+Thread.currentThread());System.out.println("正在filter:"+i);return i>2;}).count();System.out.println();System.out.println("count = " + count);}
}package com.reactors.lambda;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Data
@NoArgsConstructor
@AllArgsConstructor
class person{private String name;private String gender;private Integer age;
}public class StreamDemo2 {public static void main(String[] args) {List<person> list = List.of(new person("obma","男",26),new person("Elon mask","女",40),new person("Tim cook","男",66),new person("steven job","男",76),new person("donald trump","女",86));list.stream().filter(person -> person.getAge()>26).peek(person -> System.out.println("peek:"+person))//大于26s岁的人.map(person ->person.getName()) // 获取person中所有的姓名.flatMap(ele->{String[] s = ele.split(" ");return Arrays.stream(s);}).distinct().limit(3).sorted(String::compareTo).forEach(e->{System.out.println("元素:"+e);});List<person> list2 = List.of(new person("obma","男",26),new person("Elon mask","女",40),new person("Tim cook","男",66),new person("steven job","男",76),new person("donald trump","女",86));Map<String, List<person>> collect = list2.stream().filter(s -> s.getAge() > 26).collect(Collectors.groupingBy(t -> t.getGender()));System.out.println("collect:"+collect);}
}
flow
package com.reactors.flow;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;/*** Flow->发布订阅模型:观察者模式:* 1.publisher:发布者* 2.subscriber:订阅者* 3.subscription:订阅关系* 4.processor:处理器* */
public class FlowDemo {public static void main(String[] args) throws InterruptedException {//1.定义一个发布者:发布数据SubmissionPublisher<String> publisher =new SubmissionPublisher<>();//2.定义一个中间操作:给每个元素增加一个前缀:hahaMyProcessor myProcessor1 = new MyProcessor();MyProcessor myProcessor2 = new MyProcessor();MyProcessor myProcessor3 = new MyProcessor();//3.定义一个订阅者:订阅者感兴趣发布者的数据Flow.Subscriber<String> subscriber=new Flow.Subscriber<String>() {private Flow.Subscription subscription;/*** 在订阅时 onxx:在xx事件发生时,执行这个回调* @param subscription*/@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);this.subscription=subscription;//从上游请求一个数据//this.subscription.request(Long.MAX_VALUE);//Long.MAX_VALUE 从上游请求所有数据subscription.request(1);}/*** 在下一个元素到达时;执行这个回调;接收到新的数据* @param item*/@Overridepublic void onNext(String item) {System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);if(item.equals("p-7")){subscription.cancel();//取消订阅}else{subscription.request(1);}}/*** 在错误发生时* @param throwable*/@Overridepublic void onError(Throwable throwable) {System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);}/*** 在完成时*/@Overridepublic void onComplete() {System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");}};//4.绑定发布者和订阅者关系publisher.subscribe(myProcessor1);//此时处理器相当于订阅者myProcessor1.subscribe(myProcessor2);myProcessor2.subscribe(myProcessor3);myProcessor3.subscribe(subscriber);//此时处理器相当于发布者for(int i=0;i<10;i++){System.out.println("Thread.currentThread() = " + Thread.currentThread());if(i==5){//publisher.closeExceptionally(new RuntimeException("555不干了"));//异常中断信号}else{//publisher发布的所有数据在它的buffer区publisher.submit("p-"+i);}}//jvm底层对整个发布订阅关系做好了异步+缓冲区处理=响应式系统//关闭发布者通道publisher.close();Thread.sleep(10000);}//定义流中间操作处理器;只用写订阅者的接口static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String>{private Flow.Subscription subscription;//保存绑定关系@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("processor订阅绑定完成");this.subscription=subscription;subscription.request(1);//找上游要一个数据}//数据到达,触发回调@Overridepublic void onNext(String item) {System.out.println("processor拿到数据:"+item);//再加工item+=": haha";submit(item);//发送加工后的数据subscription.request(1);//再要新数据}@Overridepublic void onError(Throwable throwable) {}@Overridepublic void onComplete() {}}
}
2.Reactor
导入依赖
<dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>2024.0.2</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><!--导入测试库--><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter</artifactId><version>5.10.0</version><scope>test</scope></dependency></dependencies>
flux
package com.reactors;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.sql.SQLOutput;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;public class FluxDemo {//测试fluxpublic static void Flux(String[] args) throws IOException {
// Mono:0|1个元素的流
// Flux:N个元素的流//1.多个元素Flux<Integer> just = Flux.just(1, 2, 3, 4, 5);//流不消费没用;消费:订阅just.subscribe(e -> System.out.println("e= " + e));//一个数据流可以有很多消费者just.subscribe(e2 -> System.out.println("e2= " + e2));//对每个消费者来说流都是一样的;广播模式System.out.println("====================");Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始增加的数字flux.subscribe(System.out::println);System.in.read();}//Mono<Integer>:只有一个Integer//Flux<Integer>:有很多个integer;public static void Mono(String[] args) throws IOException, InterruptedException {
// Mono<Integer> just=Mono.just(1);
// just.subscribe(System.out::println);//空流//事件感知API:当流发生什么事的时候,触发一个回调(hook:钩子函数);doOnxxx
// Flux<Object> empty = Flux.empty()//有一个信号:此时代表完成信号
// .doOnComplete(()->{
// System.out.println("流正常结束了ha");
// }).doOnCancel(()->{
// System.out.println("流被取消!");
// });
// empty.subscribe(System.out::println);Flux<Integer> just1;just1 = Flux.range(1, 7).delayElements(Duration.ofSeconds(1)).doOnComplete(() -> {System.out.println("&&&&&&&&&&&流正常完成。。");}).doOnCancel(() -> {System.out.println("&&&&&&&&&&&流被取消");}).doOnError(throwable -> {System.out.println("流出错。。。" + throwable);}).doOnNext(integer -> {System.out.println("doOnnext..." + integer);});just1.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅者与发布者绑定好了:" + subscription);request(1);//背压}@Overrideprotected void hookOnNext(Integer value) {System.out.println("元素到达:" + value);if (value < 5) {request(1);//背压if (value == 3) {int i = 10 / 0;}} else {cancel();//取消订阅}}@Overrideprotected void hookOnComplete() {System.out.println("数据流结束");}@Overrideprotected void hookOnError(Throwable throwable) {super.hookOnError(throwable);System.out.println("数据异常。。。");}@Overrideprotected void hookOnCancel() {System.out.println("数据流被取消");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("结束信号 :" + type);//正常、异常try {//正常业务} catch (Exception e) {} finally {//结束}}});System.in.read();}/*** 响应式编程核心:看懂文档弹珠图;* 信号:正常/异常(取消,错误)* SignalType:* SUBSCRIBE:被订阅* REQUEST:请求N个元素* CANCEL:流被取消* ON_SUBSCRIBE:在订阅时* ON_NEXT:在元素到达* ON_ERROR:在元素出错* ON_COMPLETE:在流正常完成时* AFTER_TERMINATE:中断后* CURRENT_CONTEXT:当前上下文* ON_CONTEXT:感知上下文* <p>* doOnXXX API触发的时机* 1.doOnNext:每个数据(流的数据)到达的时候触发* 2.doOnEach:每个元素(流的数据和信号) 到达的时候触发* 3.doOnRequest:消费者请求流元素的时候* 4.doOnError:流发生错误* 5.doOnSubscribe:流被订阅时* 6.doOnTerminate:发送取消/异常信号中断流* 7.doOnCancle:流被取消* 8.doOnDiscard:流中元素被忽略时*/public static void doOnXXX(String[] args) {// Flux.range(1,7)
// .map(integer -> 10/integer)
// .subscribe(System.out::println);//doOnnext:表示流中一个元素到达后触发一个回调Flux.just(1, 2, 3, 4, 5, 6, 7, 0).doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达时触发.doOnEach(integerSignal -> { //each封装的更详细System.out.println("doonEach:" + integerSignal.get());}).map(integer -> 10 / integer).doOnError(throwable -> {System.out.println("数据库已经保存了异常:" + throwable.getMessage());}).doOnNext(integer -> {System.out.println("元素到达ha:" + integer);}).subscribe(System.out::println);}public static void subscribe(String[] args) {
// Flux.concat(Flux.just(1,2,4),Flux.just(7,87,0))
// .subscribe(System.out::println);// Flux.range(1,7)
// .log()//日志
// .filter(i-> i> 3)//挑选出大于3的元素
// .map(i-> "haha-"+i)
// .subscribe(System.out::println);Flux<Integer> flux = Flux.just(1, 2, 4);/*** 自定义消费者*/flux.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {//流被订阅的时候触发System.out.println("绑定了。。。" + subscription);//找发布者要数据request(1);//要1个数据requestUnbounded();//要无限数据}@Overrideprotected void hookOnNext(Integer value) {System.out.println("数据到达,正在处理:" + value);if (value.equals("haha:5")) {cancel();//取消流}request(1);//要1个数据}@Overrideprotected void hookOnComplete() {System.out.println("流正常结束");}@Overrideprotected void hookOnError(Throwable throwable) {System.out.println("流异常。。" + throwable);}@Overrideprotected void hookOnCancel() {System.out.println("流被取消了...");}@Overrideprotected void hookFinally(SignalType type) {System.out.println("最终回调:。。。一定会被执行");}});}/*** 请求重塑:buffer*/public void buffer() {Flux<List<Integer>> flux = Flux.range(1, 10).buffer(2)//缓冲区:缓冲2个元素,消费者一次最多可以拿到2个元素.log();// flux.subscribe(v-> System.out.println("v = " + v));flux.subscribe(new BaseSubscriber<List<Integer>>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("绑定关系。。。");request(2);//找发布者,请求2次}@Overrideprotected void hookOnNext(List<Integer> value) {System.out.println("元素:" + value);}});}/*** 请求重塑:限流 limitRate*/public void limit() {Flux.range(1, 1000).log().limitRate(100)//一次预取30个 元素.subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(1);}@Overrideprotected void hookOnNext(Integer value) {System.out.println(value);request(1);}});//75%预取策略:limiteRate(100)//第一次抓取100个数据,如果 75%的元素已经处理了,继续抓取新的75%的元素}/*** 编程方式创建序列-同步环境->generate* Sink:接收器,水槽,通道* Source:数据源 sink:接收端*/public void generate() {Flux<Object> flux = Flux.generate(() -> 0,//初始值(state, sink) -> {/*for(int i=0;i<100;i++){sink.next("haha-"+i);//传递数据,可能会抛出(不受检查异常(运行时异常)、受检异常(编译时异常))}*/if (state <= 10) {sink.next(state);//把元素传出去} else {sink.complete();//完成信号}if (state == 7) {sink.error(new RuntimeException("我不喜欢7@@@"));}return state + 1;//返回新的迭代state值});flux.log().doOnError(throwable -> System.out.println("throwable=" + throwable)).subscribe();}public void dispose() {Flux<Integer> flux = Flux.range(1, 10000).delayElements(Duration.ofSeconds(1)).map(i -> i + 7).log();//1.消费者实现了Disposable,可取消Disposable disposable = flux.subscribe(System.out::println);new Thread(() -> {try {Thread.sleep(10000);disposable.dispose();//销毁} catch (InterruptedException e) {e.printStackTrace();}}).start();}/*** 编程方式创建序列-异步,多线程环境->create*/public void create() throws InterruptedException {
// Flux.create(fluxSink -> {
// fluxSink.next("haha");
// });//异步环境下Flux.create(fluxSink -> {MyListener myListener = new MyListener(fluxSink);for(int i=0;i<100;i++){myListener.online("用户-"+i);}}).log().subscribe();}class MyListener{FluxSink<Object> sink;public MyListener(FluxSink<Object> sink){this.sink=sink;}//用户登录,触发online监听public void online(Object userName){System.out.println("用户登录了:"+userName);sink.next(userName);//传入用户}}/*** 自定义流中元素处理规则*/public void handle(){Flux.range(1,10).handle((value,sink)->{System.out.println("拿到的值:"+value);sink.next("张三:"+value);//可以向下发送数据的通道}).log().subscribe();}public void thread(){//响应式编程:全异步、消息、事件回调//流的发布、中间操作,默认使用当前线程Flux.range(1,10)
// .publishOn(Schedulers.immediate())//在哪个线程池把当这个流的数据和操作执行了
// .publishOn(Schedulers.single()).publishOn(Schedulers.parallel())// .log().map(i->i+10).log().subscribe();//publishOn:改变发布者所在线程池//subscribeOn:改变订阅者所在线程池//调度器:线程池
// Schedulers.immediate();//无执行上下文,当前线程运行所有操作
// Schedulers.single();//使用固定的一个单线程
// Schedulers.boundedElastic();//有界、弹性调度;不是无限扩充的线程池;线程池有10*cpu核心数个线程;队列默认100k,keepalivetime:空闲时间,默认60s
// Schedulers.fromExecutor(new ThreadPoolExecutor(4,8,60, TimeUnit.SECONDS,new LinkedBlockingDeque<>(1000)));
// Schedulers.parallel();}public void thread1(){Scheduler scheduler = Schedulers.newParallel("parallel-scheduler",4);//流的每个操作产生新流,产生新发布者final Flux<String> flux=Flux.range(1,2).map(i->10+i).publishOn(scheduler).map(i->"value "+i).log();//只要不指定线程池,默认发布者使用的线程池就是订阅者的线程new Thread(()->{flux.subscribe(System.out::println);}).start();}public static void main(String[] args) throws IOException, InterruptedException {//new FluxDemo().buffer();
// new FluxDemo().limit();// new FluxDemo().generate();
// new FluxDemo().dispose();
//
// System.in.read();
// new FluxDemo().create();// new FluxDemo().handle();
// new FluxDemo().thread();new FluxDemo().thread1();}
}
Reactor
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;public class APITest {@Testvoid filter(){Flux.just(1,2,3,4)//流发布者.filter(s->s%2==0)//过滤偶数.log().subscribe();//最终消费者;request(unbounded)}/*** 扁平化*/@Testvoid filterMap(){Flux.just("zhang san","li si").flatMap(v->{String[]ss = v.split(" ");return Flux.fromArray(ss);//把数据包装成多个元素流})//两个人的名字,按照空格拆分,打印出所有的姓与名字.log().subscribe();}/*** concatMap:一个元素可以变很多单个;对于元素类型无限制* concat* concatWith:被连接流和老流中的元素类型必须一样**/@Testvoid concatMap(){//Mono,Flux:发布者
// Flux.just(1,2,3)
// .concatMap(s->Flux.just(s+"->a",1))
// .log()
// .subscribe();// concat.concat( Flux.just(1,2), Flux.just("h","j"),Flux.just("haha","hihi"))
// .log()
// .subscribe();Flux.just(1,2).concatWith(Flux.just(4,5,6)).log().subscribe();}/*** transform:把流变形成新数据;不会共享外部变量的值;无状态转换;原理:无论多少个订阅者,transform只执行一次* transformDeferred:把流变形成新数据;会共享外部变量的值;有状态转换;原理:无论多少个订阅者,每个订阅者transform都执行一次*/@Testvoid transform(){
// AtomicInteger atomic = new AtomicInteger(0);
// Flux<String> flux = Flux.just("a", "b", "c")
// .transform(values -> {
// //++atomic
// if (atomic.incrementAndGet() == 1) {
// //如果是:第一次调用,老流中的所有元素转换成大写
// return values.map(String::toUpperCase);
// } else {
// //如果不是一次调用,原封不动返回
// return values;
// }
//
// });
// flux.subscribe(v-> System.out.println("订阅者1:v = " + v));
// flux.subscribe(v-> System.out.println("订阅者2:v = " + v));AtomicInteger atomic = new AtomicInteger(0);Flux<String> flux = Flux.just("a", "b", "c").transformDeferred(values -> {//++atomicif (atomic.incrementAndGet() == 1) {//如果是:第一次调用,老流中的所有元素转换成大写return values.map(String::toUpperCase);} else {//如果不是一次调用,原封不动返回return values;}});flux.subscribe(v-> System.out.println("订阅者1:v = " + v));flux.subscribe(v-> System.out.println("订阅者2:v = " + v));}/*** defaultIfEmpty():静态兜底数据* switchIfEmpty():空转换;调用动态兜底方法,返回新流数据*/@Testvoid empty(){//Mono.just(null):流里面有一个null值元素
// Mono.empty():流里面没有元素,只有完成信号/结束信号
// haha().defaultIfEmpty("x")//如果发布者元素为null,指定默认值x,否则用发布者的值
// .subscribe(v-> System.out.println("v = " + v));haha().switchIfEmpty(hehe())//如果发布者元素为null,指定默认值x,否则用发布者的值.subscribe(v-> System.out.println("v = " + v));}Mono<String> hehe(){return Mono.just("兜底数据...");}Mono<String> haha(){return Mono.empty();}/*** concat:连接;A流的所有元素和B流的所有元素拼接*merge:合并;A流的所有元素和B流的所有元素按照时间序列合并* mergeWith* mergeSequential:按照哪个流先发元素排队** @throws IOException*/@Testvoid merge() throws IOException {
// Flux.merge(Flux.just(1,2,3).delayElements(Duration.ofSeconds(1)),Flux.just("a","b").delayElements(Duration.ofMillis(1500)),Flux.just("haha","heihi","xixi").delayElements(Duration.ofMillis(500)))
// .log()
// .subscribe();Flux.just(1,23,4).mergeWith(Flux.just(99,00)).log().subscribe();// Flux.mergeSequential();System.in.read();}/*** zip:无法结对的元素会被忽略* 最多支持8流压缩*/@Testvoid zip(){Flux.just(1, 2, 3).zipWith(Flux.just("a", "b", "c","d")).map(tuple->{Integer t1 = tuple.getT1();//元组中第一个元素String t2 = tuple.getT2();//元组中第2个元素return t1+"-->"+t2;}).log().subscribe(v-> System.out.println("v = " + v));}/*** 默认:* subscribe:消费者可以感知,正常元素try与流发生的错误catch* 错误是一种中断行为*** 命令式编程-常见错误处理方式:* 1.catch and return a static default value* 2.catch and excute an alternative path with a fallback method:吃掉异常,返回一个兜底方法* 3.catch and dynamically compute a fallback value:捕获动态计算一个返回值* 4.catch ,warp to a businessException,jand re-throw:捕获并包装成一个业务异常,并重新抛出* 5.catch,log an error-specific message,and re-throw:捕获异常,记录特殊日志,从新抛出* 6.use the finally block to clean up resources or java7 "try-with-resource" construct**/@Testvoid onError() throws IOException {
// Flux.just(1,2,0)
// .map(i->"100/"+i+"="+(100/i))
// .onErrorReturn("Divided by zero:(")
// .subscribe(v-> System.out.println("v = " + v));// Flux.just(1,2,0)
// .map(i->"100/"+i+"="+(100/i))
// .onErrorReturn("haha8888") //错误的时候返回一个值,吃掉异常,消费者无异常感知;流正常完成
// .subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err),()-> System.out.println("流结束"));// Flux.just(1,2,0)
// .map(i->"100/"+i+"="+(100/i))
// .onErrorReturn(NullPointerException.class,"haha8888") //错误的时候返回一个值,吃掉异常,消费者无异常感知;流正常完成
// .subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err),()-> System.out.println("流结束"));//
// Flux<String> map = Flux.just(1, 2, 0, 4)
// .map(i -> "100/" + i + "=" + (100 / i));
// map.onErrorResume(err->Mono.just("haha999"))//吃掉异常,消费者无异常感知;调用一个兜底方法;流正常结束
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err),
// ()-> System.out.println("流结束"));// Flux<String> map = Flux.just(1, 2, 0, 4)
// .map(i -> "100/" + i + "=" + (100 / i));
// map.onErrorResume(err->hahhah(err))//
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err),
// ()-> System.out.println("流结束"));/* Flux<String> map = Flux.just(1, 2, 0, 4).map(i -> "100/" + i + "=" + (100 / i));map.onErrorResume(err->Flux.error(new BusinessException(err.getMessage()+":嘿嘿")))//.subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err),()-> System.out.println("流结束"));*///
// Flux<String> map = Flux.just(1, 2, 0, 4)
// .map(i -> "100/" + i + "=" + (100 / i));
// map.onErrorMap(err->new BusinessException(err.getMessage()+":嘿嘿哈哈"))//
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err),
// ()-> System.out.println("流结束"));//
// Flux.just(1, 2, 0, 4)
// .map(i -> "100/" + i + "=" + (100 / i))
// .doOnError(err->{ //异常被捕获,,不影响后序的操作,不吃掉异常,只在异常发生时候,消费者有感知
// System.out.println("err已经被记录="+err);
// })
// .doFinally(signalType -> {
// System.out.println("流信号:"+signalType);
// })
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err),
// ()-> System.out.println("流结束"));//
// Flux.just(1, 2, 0, 4)
// .map(i->10/i)
// .onErrorContinue((err,val)->{
// System.out.println("err = " + err);
// System.out.println("val = " + val);
// System.out.println("发现"+val+"有问题,继续执行其他的");
// })//发生错误,继续
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err));//
// Flux.just(1, 2, 0, 4)
// .map(i->10/i)
// .onErrorComplete()//把错误结束信号,替换为正常结束信号;正常结束
// .subscribe(v-> System.out.println("v = " + v),
// err-> System.out.println("err = " + err),()-> System.out.println("流正常结束。"));Flux.interval(Duration.ofSeconds(1)).map(i->10/(i-10)).onErrorStop()//错误后停止流,源头中断,所有监听者全部结束;错误结束.subscribe(v-> System.out.println("v = " + v),err-> System.out.println("err = " + err),()-> System.out.println("流正常结束。"));
System.in.read();}Mono<String> hahhah(Throwable throwable){if(throwable.getClass()==NullPointerException.class){}return Mono.just("haha-"+throwable.getMessage());}class BusinessException extends RuntimeException{public BusinessException(String msg){super(msg);}}@Testvoid retryAndTimeout() throws IOException {Flux.just(1,2,3).delayElements(Duration.ofSeconds(3)).log().timeout(Duration.ofSeconds(2)).retry(3)//把流从头到尾重新请求一次.onErrorReturn(99).map(i->i+"hahh").subscribe(v-> System.out.println("v = " + v));System.in.read();}@Testvoid sinks() throws InterruptedException, IOException {
// Sinks.many();//发送flux数据
// Sinks.one();//发送mono数据//Sinks:接收器,数据管道,所有数据顺着管道往下走
// Sinks.many().unicast();//单播:管道只能绑定单个订阅者(消费者)
// Sinks.many().multicast();//多播:管道能绑定d多个订阅者(消费者)
// Sinks.many().replay();//重放:管道能重放元素。是否给后来的订阅者把之前的元素依然发给他//从头消费还是从订阅的那一刻消费// Sinks.Many<Object> many = Sinks.many()
// .unicast()//单播
// .onBackpressureBuffer(new LinkedBlockingDeque<>(5));//背压队列
//
// new Thread(()->{
// for (int i = 0; i <10 ; i++) {
// try {
// many.tryEmitNext("a-"+i);
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
//
// //订阅
// many.asFlux().subscribe(v-> System.out.println("v = " + v));
// many.asFlux().subscribe(v2-> System.out.println("v2 = " + v2));
//
// System.in.read();//
// Sinks.Many<Object> many = Sinks.many()
// .multicast()//多播
// .onBackpressureBuffer();
//
// new Thread(()->{
// for (int i = 0; i <10 ; i++) {
// try {
// many.tryEmitNext("a-"+i);
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
//
// //订阅
// many.asFlux().subscribe(v-> System.out.println("v = " + v));
//
// //默认订阅者,从订阅的那一刻开始接元素
//
//
//
// new Thread(()->{
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// many.asFlux().subscribe(v2-> System.out.println("v2 = " + v2));
//
// System.in.read();//发布者数据重放
// Sinks.Many<Object> many = Sinks.many().replay().limit(3);
// new Thread(()->{
// for (int i = 0; i <10 ; i++) {
// try {
// many.tryEmitNext("a-"+i);
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// }).start();
//
// //订阅
// many.asFlux().subscribe(v-> System.out.println("v = " + v));
//
// //默认订阅者,从订阅的那一刻开始接元素
//
// new Thread(()->{
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
// many.asFlux().subscribe(v2-> System.out.println("v2 = " + v2));
//
// System.in.read();Flux<Integer> cache = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)).cache(1) ;//缓存2个数据;默认缓存所有元素cache.subscribe();//缓存元素new Thread(()->{try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}cache.subscribe(v -> System.out.println("v = " + v));}).start();System.in.read();}/*** 阻塞式api*/@Testvoid block(){Integer integer = Flux.just(1, 2, 4).map(i -> i + 10).blockLast();System.out.println("integer = " + integer);List<Integer> block = Flux.just(1, 2, 4).map(i -> i + 10).collectList().block();//block也是一种订阅System.out.println("block = " + block);}/*** 并发订阅批处理* @throws IOException*/@Testvoid paraleFlux() throws IOException {Flux.range(1,100).buffer(10).parallel(8).runOn(Schedulers.newParallel("yy")).log().flatMap(list -> Flux.fromIterable(list)).collectSortedList(Integer::compareTo).subscribe(v-> System.out.println("v = " + v));System.in.read();}/*** Context-API** 响应式编程中ThreadLocal机制会失效** 响应式中,数据流期间共享数据,Context-API:Context(读写) Contextview(只读)*/@Testvoid threadLocal(){Flux.just(1,2,3).transformDeferredContextual((flux,context)->{System.out.println("flux = " + flux);System.out.println("context = " + context);return flux.map(i->i+"===>"+context.get("prefix"));})//threadlocal共享了数据,上游的所有人能看到;context由下游传播给上游//上游拿到下游的最近一次数据.contextWrite(Context.of("prefix","hahh")).subscribe(v-> System.out.println("v = " + v));/*** 命令式编程: controller->service->dao* 响应式编程:dao(9 数据源)->service(9)->controller(9):从下游反向传播*/}@Testvoid next(){Integer block=Flux.just(1,2,3).next()//next拿到第一个元素1.block();System.out.println("block = " + block);}
}
3.springwebflux
-
webflux:底层完全基于netty+reactor+springweb完成一个全异步非阻塞的web响应式框架
-
底层:异步+消息队列(内存)+事件回调机制=整套系统
-
优点:能使用少量资源处理大量请求
-
组件对比:
-
底层基于netty实现web容器与请求/响应处理机制
参考文档:https://docs.spring.io/spring-framework/reference/web/webflux.html
1.搭建webflux项目引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.0</version></parent> <!--引入webflux--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>
Context响应式上下文数据传递;由下游传播给上游
阻塞式编程:浏览器–>controller->Service->Dao
响应式流:Dao(数据库查询对象)–>Service–>Controller->浏览器
2.httphandler、HttpServer创建server
public static void main(String[] args) throws IOException {//编写一个处理请求的服务器//1.创建一个能处理http请求的处理器.参数:请求,响应;返回值:Mono<void>:代表处理完成的信号HttpHandler httpHandler=(ServerHttpRequest request, ServerHttpResponse response)->{//编写请求处理的业务逻辑System.out.println(Thread.currentThread()+"请求进来了:"+request.getURI());
// response.getHeaders();
// response.getCookies();
// response.getStatusCode();
// response.bufferFactory();
// response.writeWith();//把xx写出去
// response.setComplete();//响应结束//创建响应数据的databufferDataBufferFactory factory=response.bufferFactory();DataBuffer dataBuffer = factory.wrap("hello world".getBytes(StandardCharsets.UTF_8));return response.writeWith(Mono.just(dataBuffer));//需要一个Databuffer的发布者};//2.启动服务器,监听8080端口,接收数据,拿到数据交给httphandler进行请求处理ReactorHttpHandlerAdapter adapter=new ReactorHttpHandlerAdapter(httpHandler);//3.启动netty服务器HttpServer.create().host("localhost").port(8080).handle(adapter).bindNow();System.out.println("服务器启动...");System.in.read();System.out.println("服务器停止...");}
3. DispatcherHandler
Springmvc:DispatcherServlet
SpringWebFlux:DispatcherHandler
- 请求处理流程
- HandlerMapping:请求映射处理器,保存每个请求由哪个方法进行处理
- HandlerAdapter:处理适配器;反射执行目标方法
- HandlerResultHandler:处理器结果出来器
Springmvc:DispatcherServlet有一个doDispatch()方法,处理所有请求
SpringWebFlux:DispatcherHandle有一个handle方法处理所有请求
public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return this.createNotFoundError();} else {return CorsUtils.isPreFlightRequest(exchange.getRequest()) ? this.handlePreFlight(exchange) : Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> {return mapping.getHandler(exchange);}).next().switchIfEmpty(this.createNotFoundError()).onErrorResume((ex) -> {return this.handleResultMono(exchange, Mono.error(ex));}).flatMap((handler) -> {return this.handleRequestWith(exchange, handler);});}}private Mono<Void> handleRequestWith(ServerWebExchange exchange, Object handler) {if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {return Mono.empty();} else {if (this.handlerAdapters != null) {Iterator var3 = this.handlerAdapters.iterator();while(var3.hasNext()) {HandlerAdapter adapter = (HandlerAdapter)var3.next();if (adapter.supports(handler)) {Mono<HandlerResult> resultMono = adapter.handle(exchange, handler);return this.handleResultMono(exchange, resultMono);}}}return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));}}
- 1.请求和响应都封装在ServerWebExchange对象中,由handle方法进行处理
- 2.检查是否跨域,CorsUtils.isPreFlightRequest,检查跨域请求
- 3.Flux流式操作,先找到handlerMapping,再获取handleAdapter,再用adapter处理请求,期间错误由onErrorResume触发回调进程处理
开启webflux
package com;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxAutoConfiguration;
import org.springframework.web.reactive.config.EnableWebFlux;@EnableWebFlux //开启webflux自定义;禁用webflux的默认效果,完全自定义,不建议使用
//WebFluxAutoConfiguration的自动配置会生效
@SpringBootApplication
public class WebFluxMainApplication {public static void main(String[] args) {SpringApplication.run(WebFluxMainApplication.class,args);}
}
编写一个处理请求的服务器
package com;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;public class FluxMainApplication {public static void main(String[] args) throws IOException {//1.创建一个能处理http请求的处理器.参数:请求,响应;返回值:Mono<void>:代表处理完成的信号HttpHandler httpHandler=(ServerHttpRequest request, ServerHttpResponse response)->{//编写请求处理的业务逻辑System.out.println(Thread.currentThread()+"请求进来了:"+request.getURI());
// response.getHeaders();
// response.getCookies();
// response.getStatusCode();
// response.bufferFactory();
// response.writeWith();//把xx写出去
// response.setComplete();//响应结束//创建响应数据的databufferDataBufferFactory factory=response.bufferFactory();DataBuffer dataBuffer = factory.wrap("hello world".getBytes(StandardCharsets.UTF_8));return response.writeWith(Mono.just(dataBuffer));//需要一个Databuffer的发布者};//2.启动服务器,监听8080端口,接收数据,拿到数据交给httphandler进行请求处理ReactorHttpHandlerAdapter adapter=new ReactorHttpHandlerAdapter(httpHandler);//3.启动netty服务器HttpServer.create().host("localhost").port(8080).handle(adapter).bindNow();System.out.println("服务器启动...");System.in.read();System.out.println("服务器停止...");}
}
4. 注解开发
1.目标方法传参
2.返回值写法
5.文件上传
class MyForm {private String name;private MultipartFile file;// ...}@Controller
public class FileUploadController {@PostMapping("/form")public String handleFormUpload(MyForm form, BindingResult errors) {// ...}}
或者使用如下上传
@PostMapping("/")
public String handle(@RequestPart("meta-data") MetaData metadata) {// ...}
sse(server side send)实现如下:
package com;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.result.view.Rendering;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;//@RestController
@Controller //controller支持页面跳转
public class HelloController {//webflux:向下兼容原理springmvc的大多数注解和api/**** @param key* @param exchange ServerWebExchange 封装请求和响应对象;自定义获取数据/自定义响应* @param webSession Session* @HttpMethod:请求方式-post/get...* @HttpEntity:封装后的请求对象* @return*/@GetMapping("/hello")public String hello(@RequestParam(value = "key",required = false,defaultValue = "haha") String key,HttpMethod method,HttpEntity<String> entity,@RequestBody String s, //@RequestBody:获取请求体,post,文件上传ServerWebExchange exchange,WebSession webSession) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();Object aaa = webSession.getAttribute("aaa");webSession.getAttributes().put("aa","ddd");String name = method.name();return "helloworld,key="+key;}/*** 推荐使用:* 1.返回单个数据mono:Mono<order>、user、String、map* 2.返回多个数据Flux:Flux<Order>* 3.配合flux,完成sse:server send event:服务器端事件推送**/@GetMapping("/haha")public Mono<String> haha(){return Mono.just(0).map(i->10/i).map(i->"哈哈-"+i);}@GetMapping("/hehe")public Flux<String> hehe(){return Flux.just("he1","he4","he9");}/***text/event-stream* sse测试;cahtgpt都在用* @return*/
// @GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
// public Flux<String> sse(){
// return Flux.range(1,10)
// .map(i->"ha"+i)
// .delayElements(Duration.ofMillis(500));
// }@GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> sse(){return Flux.range(1,10).map(i->{//构建一个sse对象ServerSentEvent<String> haha= ServerSentEvent.<String>builder("ha-"+i).id(i+"").comment("hei-"+i).event("haha").build();return haha;}).delayElements(Duration.ofMillis(500));}//Rendering:一种视图对象@GetMapping("/bai")public Rendering render(){//Rendering.redirectTo("/aaa");//重定向到当前项目根路径下aareturn Rendering.redirectTo("http://www.baidu.com").build();}
}
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>index</title>
</head>
<body>
<h1>哈哈</h1><div id="app" style="width: 500px;height: 300px;background-color: gainsboro"></div>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<script>const http=axios.create({baseURL:"http://localhost:8080/",timeout:100000,responseType:'stream',onDownloadProgress:function (progressEvent){// console.log("progressEvent->",progressEvent.event.currentTarget.responseText);document.getElementById("app").innerHTML=progressEvent.event.currentTarget.responseText}});http.get('/sse?').then(function (response){//处理成功情况console.log(response)}).catch(function (error){//处理错误情况console.log(error);}).finally(function (){//总会执行});
</script>
</body>
</html>
sse与websocket区别:
- sse:单工;请求过去以后,等待服务端源源不断地数据
- websocket:双工,连接建立后,可以任意交换
6.错误处理
@ExceptionHandler(ArithmeticException.class)public String error(ArithmeticException exception){System.out.println("发生了数学运算异常:"+exception);//可以采用如下返回结果//ProblemDetail.forStatusAndDetail(HttpStatus.OK,"sucess");// ErrorResponse.builder().build();return "坏了.heihei..";}/*** 全局异常处理*/
//@ResponseBody
//@ControllerAdvice@RestControllerAdvice
public class GlobalExceptionHandler {@ExceptionHandler(ArithmeticException.class)public String error(ArithmeticException exception){System.out.println("发生了数学运算异常:"+exception);//可以采用如下返回结果//ProblemDetail.forStatusAndDetail(HttpStatus.OK,"sucess");// ErrorResponse.builder().build();return "坏了.heihei..";}
}
7.RequestContext:全局上下文请求
- 详见下面MyWebFilter 实现
8.自定义Flux配置
WebFluxConfigurer:容器中注入这个类型的组件,重写底层逻辑
/*** 配置底层*/
@Configuration
public class MyWebConfig {@Beanpublic WebFluxConfigurer webFluxConfigurer(){return new WebFluxConfigurer() {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**").allowedHeaders("*").allowedMethods("*").allowedOrigins("localhost");}};}}
9.filter
@Component
public class MyWebFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();System.out.println("请求处理放到目标方法之前...");Mono<Void> filter = chain.filter(exchange).doOnError(err->{System.out.println("目标方法异常。。。");}).doFinally(signalType -> {System.out.println("目标方法执行之后...");});return filter;}
4. Spring Data R2DBC 使用步骤:
1.导入依赖
<!--响应式web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!--响应式spring data R2dbc--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><!--mysql 驱动--><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.4.0</version></dependency><dependency><groupId>io.r2dbc</groupId><artifactId>r2dbc-pool</artifactId><version>1.0.2.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
2.编写配置
# Writer Database URL
#spring.r2dbc.master.url=r2dbc:mysql://user:password@test-db.cluster-cey7aslike.us-east-1.rds.amazonaws.com/example
# Reader Database URL
#spring.r2dbc.slave.url=r2dbc:mysql://user:password@test-db.cluster-ro-cey7aslike.us-east-1.rds.amazonaws.com/example# Writer Database URL
#spring.r2dbc.master.url=r2dbc:mysql://root:123456@localhost:3306/test
# Reader Database URL
#spring.r2dbc.slave.url=r2dbc:mysql://root:123456@localhost:3306/testspring:application:name: r2dbcr2dbc:password: 123456username: rooturl: r2dbc:mysql://localhost:3306/test?serverZoneId=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=truename: test
# pool:
# enabled: true
logging: #开启查询日志追踪level:org.springframework.r2dbc: debug
3.sprngdata R2DBC最佳实践:
- 1.基础的CRUD用R2dbcRepository提供的api
- 2.自定义复杂的sql(单表):@Query
- 3.多表查询复杂结果集:使用DatabaseClient 自定义SQL&封装结果集
4.bufferUntilChange:如果下一个判定值比上一个发生了变化就开一个新buffer保存,如果没有变化就保存在原buffer中;可以进行分组
4.spiringr2dbc项目结构实现过程:
应用启动入口
package com;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.r2dbc.R2dbcDataAutoConfiguration;
import org.springframework.boot.autoconfigure.data.r2dbc.R2dbcRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcAutoConfiguration;
import org.springframework.boot.autoconfigure.r2dbc.R2dbcTransactionManagerAutoConfiguration;/*** springboot:对R2DBC的自动配置* 1.R2dbcAutoConfiguration:主要配置连接工厂/连接池* 2.R2dbcDataAutoConfiguration:* R2dbcEntityTemplate:操作数据库响应式客户端;提供crud api* 数据类型映射关系、转换器、自定义R2dbcCustomConversions转换器组件(java数据类型与数据库数据类型映射)* 3.R2dbcRepositoriesAutoConfiguration:开启spring data声明式接口的CRUD,不需要写任何实现,可直接实现* 4.R2dbcTransactionManagerAutoConfiguration:事务管理**/
@SpringBootApplication
public class R2DBCMainApplication {public static void main(String[] args) {SpringApplication.run(R2DBCMainApplication.class);}
}
配置config: 开启R2dbc仓库功能
package com.config;
import com.converter.BookConverter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.convert.R2dbcCustomConversions;
import org.springframework.data.r2dbc.dialect.MySqlDialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;@EnableR2dbcRepositories //开启R2dbc仓库功能
@Configuration
public class R2dbcConfig {/*** 注册自定义转换器* @return*/@Bean@ConditionalOnMissingBeanpublic R2dbcCustomConversions conversions(){return R2dbcCustomConversions.of(MySqlDialect.INSTANCE,new BookConverter());}
}
springdata 转换器converter
package com.converter;
import com.entity.TAuthor;
import com.entity.TBook;
import io.r2dbc.spi.Row;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.convert.ReadingConverter;
import java.time.Instant;@ReadingConverter //读取数据库
public class BookConverter implements Converter<Row, TBook> {@Overridepublic TBook convert(Row source) {TBook tBook=new TBook();tBook.setId(source.get("id",Long.class));tBook.setTitle(source.get("title",String.class));Long authorId=source.get("author_id",Long.class);tBook.setAuthorId(authorId);tBook.setPublishTime(source.get("publish_time", Instant.class));TAuthor tAuthor=new TAuthor();tAuthor.setId(authorId);tAuthor.setName(source.get("name",String.class));// tBook.setAuthor(tAuthor);return tBook;}}
实体类
package com.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.util.List;@Table("t_author")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TAuthor {@Idprivate Long id;private String name;private List<TBook> books;
}package com.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;
import java.util.Date;@Table("t_book")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TBook {@Idprivate Long id;private String title;private Long authorId;private Instant publishTime;//响应式中日期的映射使用Instant 或者LocalXX// private TAuthor author;
}
持久层Repostory
package com.repositories;import com.entity.TAuthor;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import java.util.Collection;@Repository
public interface AuthorRepostory extends R2dbcRepository<TAuthor,Long> {/*** 仅限单表查询* @param id* @param name* @return*/Flux<TAuthor> findAllByIdInAndNameLike(Collection<Long> id, String name);/*** 多表复杂查询-自定义sql*** @return*/@Query("select*from t_author")Flux<TAuthor> findHaha();/**** 1-1:关联* 1-N: 关联*/
}package com.repositories;
import com.entity.TAuthor;
import com.entity.TBook;
import org.springframework.data.r2dbc.repository.Query;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Mono;@Repository
public interface BookRepostory extends R2dbcRepository<TBook,Long> {@Query("select b.*,t.name as name from t_book b LEFT JOIN t_author t on b.author_id=t.id WHERE b.id= ?")Mono<TBook> findBookAndAuthor(Long bookId);
}
api单元测试
package com;import com.entity.TAuthor;
import com.entity.TBook;
import com.repositories.AuthorRepostory;
import com.repositories.BookRepostory;
import io.asyncer.r2dbc.mysql.MySqlConnectionConfiguration;
import io.asyncer.r2dbc.mysql.MySqlConnectionFactory;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.r2dbc.convert.R2dbcCustomConversions;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.relational.core.query.Criteria;
import org.springframework.data.relational.core.query.Query;
import org.springframework.r2dbc.core.DatabaseClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;@SpringBootTest
public class R2DBCTest {@AutowiredDatabaseClient databaseClient; //数据库客户端,贴进底层,join操作可以做@AutowiredR2dbcEntityTemplate r2dbcEntityTemplate;//join操作不好做,单表查询@AutowiredAuthorRepostory authorRepostory;@AutowiredBookRepostory bookRepostory;
// @Autowired
// R2dbcCustomConversions r2dbcCustomConversions;@Testvoid bookRepostory() throws IOException {
// bookRepostory.findAll()
// .subscribe(tBook -> System.out.println("tBook = " + tBook));bookRepostory.findBookAndAuthor(1L).subscribe(tBook -> System.out.println("tBook = " + tBook));System.in.read();}@Testvoid authorRepostory() throws IOException {
// authorRepositories.findAll()
// .subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));//1.方法起名
// authorRepositories.findAllByIdInAndNameLike(
// Arrays.asList(1L,2L),
// "张%"
// ).subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));//2.自定义@Query注解authorRepostory.findHaha().subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));System.in.read();}/*** 复杂条件查询:* 1. QBE API* 2.自定义方法* 3.自定义SQL* @throws IOException*/@Testvoid setDatabaseClient() throws IOException {
// databaseClient
// .sql("select*from t_author")
.sql("select*from t_author where id = ?id")
.bind("id",1L)
// .fetch()//抓取数据
// .all()
// .map(map->{
// //System.out.println("map = " + map);
// String id = map.get("id").toString();
// String name = map.get("name").toString();
//
// return new TAuthor(Long.parseLong(id),name);
// })
// .subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));System.in.read();}/*** bufferUntilChanged:如果下一个判定值比上一个发生了变化就开一个新buffer保存,如果没有变化就保存在原buffer中* @throws IOException*/@Test void bufferUntilChange() throws IOException {
// Flux.just(1,2,3,4,5,6)
// .bufferUntilChanged(i->i%4==0)
// .subscribe(list-> System.out.println("list = " + list));//1对多操作,使用底层apiFlux<TAuthor> flux = databaseClient.sql("select b.*,t.name as name,t.id as aid from t_author t LEFT JOIN t_book b on b.author_id=t.id ORDER BY t.id").fetch().all().bufferUntilChanged(rowMap -> Long.parseLong(rowMap.get("aid").toString()))//对象比较需要重新equals方法.map(list -> {TAuthor tAuthor = new TAuthor();Map<String, Object> map = list.get(0);tAuthor.setId(Long.parseLong(map.get("aid").toString()));tAuthor.setName(map.get("name").toString());//查询所有图书List<TBook> books = list.stream().map(ele -> {TBook tBook = new TBook();tBook.setId(Long.parseLong(ele.get("id").toString()));tBook.setTitle(ele.get("title").toString());tBook.setAuthorId(Long.parseLong(ele.get("author_id").toString()));
// tBook.setPublishTime(Instant.parse(ele.get("publish_time").toString()));return tBook;}).collect(Collectors.toList());tAuthor.setBooks(books);return tAuthor;});flux.subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));System.in.read();}@Testvoid r2dbcEntityTemplate() throws IOException {//1.代表查询条件Criteria criteria=Criteria.empty().and("id").is(1L).and("name").is("张三");//2.封装query对象Query query = Query.query(criteria);r2dbcEntityTemplate.select(query,TAuthor.class).subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));System.in.read();}@Testvoid connection() throws IOException {
// //0.mysql配置
// MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
// .host("localhost")
// .username("root")
// .password("123456")
// .port(3306)
// .database("test")
// .build();
// //1.创建连接工厂
// MySqlConnectionFactory connectionFactory=MySqlConnectionFactory.from(configuration);
// //2.获取连接,发送sql
// Mono.from(connectionFactory.create())
// .flatMapMany(connection ->
// connection
// .createStatement("select*from t_author where id = ?id")
// .bind("id",2L)
// .execute()
// ).flatMap(result -> {
// return result.map(readable -> {
// Long id = readable.get("id", Long.class);
// String name = readable.get("name", String.class);
// return new TAuthor(id,name);
// });
// }).subscribe(tAuthor-> System.out.println("tAuthor = " + tAuthor));
//
// System.in.read();}
}
5.spring security
0.导入依赖
<!-- https://mvnrepository.com/artifact/io.asyncer/r2dbc-mysql --><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.0.5</version></dependency><!-- 响应式 Spring Data R2dbc--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><!-- 响应式Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
1.应用安全
- 防止攻击:DDos/CSRF/XSS/SQL注入
- 控制权限:
1.登录的用户能干什么
2.用户登录系统以后,要控制住用户的所有行为,防止越权 - 传输加密
1.https
2.X509 - 认证:OAuth2.0 JWT
2. RBAC权限模型
所有权限框架:
1.让用户登录进来:认证(authenticate)->账户密码,其他方式
2.查询用户拥有的所有角色和权限:授权(authorize)->每个方法执行时候,匹配角色或权限来判定用户是否可以执行这个方法
3.认证
- 登录行为
1.静态资源放行
2.其他请求需要登录
package com.config;import org.springframework.boot.autoconfigure.security.reactive.PathRequest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;import static org.springframework.security.config.Customizer.withDefaults;@Configuration
public class AppSecurityConfig {
@Bean
SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {/**1.定义哪些请求需要认证与否**/http.authorizeExchange(authorize->{//1.1允许所有人都访问静态资源// authorize.pathMatchers("/js/**","/css/**","/html/**").permitAll();authorize.matchers(PathRequest.toStaticResources().atCommonLocations()).permitAll();//1.2.剩下的所有请求都需要认证(登录)authorize.anyExchange().authenticated();});/**2.开启默认表单登录**/http.formLogin(formLoginSpec -> {//formLoginSpec.disable();//禁用表单登录});/**3.安全控制**/http.csrf(csrfSpec -> {csrfSpec.disable();//禁用csrf});//构建安全配置return http.build();
}
}
这个界面登录点击,最终springsecurity框架会用ReactiveUserDetailsService组件,按照表单提交的用户去数据库中查询用户详情(基本信息:账户/密码;角色;权限)
把db中返回的用户详情中密码和表单中提交的对比。决定是否登录成功
4.授权
- 配置SecurityWebFilterChain
spring 响应式security项目实现过程:
获取用户RBAC信息组件
package com.component;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;@Component
public class AppReactiveUserDetailsService implements ReactiveUserDetailsService {@AutowiredDatabaseClient databaseClient;@AutowiredPasswordEncoder passwordEncoder;/*** 自定义查询用户信息* @param username* @return*/@Overridepublic Mono<UserDetails> findByUsername(String username) {//从数据库查询用户、角色、权限所有数据的逻辑Mono<UserDetails> detailsMono = databaseClient.sql("select u.*,r.id rid,r.`name`,r.`value`,pm.id pid,pm.`value` pvalue,pm.description\n" +"from t_user u\n" +"LEFT JOIN t_user_role ur on ur.user_id=u.id\n" +"LEFT JOIN t_roles r on r.id=ur.role_id\n" +"LEFT JOIN t_role_perm rp on rp.role_id=r.id\n" +"LEFT JOIN t_perm pm on rp.perm_id=pm.id\n" +"where u.username= ?username limit 1").bind("username", username).fetch().one().map(map -> {List<SimpleGrantedAuthority> authorities=new ArrayList<>();authorities.add(new SimpleGrantedAuthority("download"));authorities.add(new SimpleGrantedAuthority("view"));authorities.add(new SimpleGrantedAuthority("delete"));UserDetails details = User.builder().username(username).password(map.get("password").toString())//自动调用密码加密器把前端传来的明文encode//.passwordEncoder(str->passwordEncoder.encode(str))//.authorities("download", "view", "delete").authorities(authorities)//权限,暂时先写死.roles("admin", "sale","haha")//角色,暂时先写死.build();return details;});return detailsMono;}
}
开启配置
spring:application:name: r2dbcr2dbc:password: 123456username: rooturl: r2dbc:mysql://localhost:3306/test?serverZoneId=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=truename: testmain:allow-circular-references: true #spring里面默认循环引用是禁用的
# security: #配置security默认为用户名admin和密码123456
# user:
# name: admin
# password: 123456# pool:
# enabled: true
logging: #开启查询日志追踪level:org.springframework.r2dbc: debug
package com.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.security.reactive.PathRequest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.security.authentication.UserDetailsRepositoryReactiveAuthenticationManager;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.factory.PasswordEncoderFactories;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;import static org.springframework.security.config.Customizer.withDefaults;
@EnableReactiveMethodSecurity//开启方法级别的权限控制
@Configuration
public class AppSecurityConfig {@AutowiredReactiveUserDetailsService appReactiveUserDetailsService;@Bean
SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {/**1.定义哪些请求需要认证与否**/http.authorizeExchange(authorize->{//1.1允许所有人都访问静态资源// authorize.pathMatchers("/js/**","/css/**","/html/**").permitAll();authorize.matchers(PathRequest.toStaticResources().atCommonLocations()).permitAll();//1.2.剩下的所有请求都需要认证(登录)authorize.anyExchange().authenticated();});/**2.开启默认表单登录**/http.formLogin(formLoginSpec -> {//formLoginSpec.disable();//禁用表单登录// formLoginSpec.loginPage("/login");});/**3.安全控制**/http.csrf(csrfSpec -> {csrfSpec.disable();//禁用csrf});/**4.配置认证规则:如何去数据库查询用户** spring security底层使用 ReactiveAuthenticationManager去查询用户信息*UserDetailsRepositoryReactiveAuthenticationManager:从数据库中查询用户信息* ReactiveUserDetailsService:响应式用户查询服务* **/http.authenticationManager(new UserDetailsRepositoryReactiveAuthenticationManager(appReactiveUserDetailsService));//构建安全配置return http.build();
}@Primary@BeanPasswordEncoder passwordEncoder(){PasswordEncoder passwordEncoder= PasswordEncoderFactories.createDelegatingPasswordEncoder();return passwordEncoder;
}
}package com.config;import org.springframework.context.annotation.Configuration;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;@EnableR2dbcRepositories
@Configuration
public class AppR2DBCConfig {
}
controller
package com.controller;import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;@RestController
public class HelloController {@PreAuthorize("hasRole('admin')")@GetMapping("/hello")public Mono<String> hello(){return Mono.just("helloword...");}@PreAuthorize("hasRole('haha')")@GetMapping("/world")public Mono<String> world(){return Mono.just("world CCC...");}
}
用户权限角色资源控制(RABC)实体类:
package com.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;@Table(name = "t_perm")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TPerm {@Idprivate Long id;private String value;private String uri;private String description;private Instant createTime;private Instant updateTime;}package com.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;@Table(name = "t_role_perm")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TRolePerm {@Idprivate Long id;private Long roleId;private Long permId;private Instant createTime;private Instant updateTime;}package com.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;@Table(name = "t_roles")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TRoles {@Idprivate Long id;private String name;private String value;private Instant createTime;private Instant updateTime;}package com.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;@Table(name = "t_user")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TUser {@Idprivate Long id;private String username;private String password;private String email;private String phone;private Instant createTime;private Instant updateTime;}package com.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
import java.time.Instant;@Table(name = "t_user_role")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TUserRole {@Idprivate Long id;private Long userId;private Long roleId;private Instant createTime;private Instant updateTime;}
单元测试
package com;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.security.crypto.factory.PasswordEncoderFactories;
import org.springframework.security.crypto.password.PasswordEncoder;@SpringBootTest
public class PasswordTest {@Testvoid test(){PasswordEncoder passwordEncoder= PasswordEncoderFactories.createDelegatingPasswordEncoder();System.out.println("passwordEncoder.encode(\"123456\") = " + passwordEncoder.encode("123456"));}
}
6.参考资料
- https://www.yuque.com/leifengyang/springboot3/xsu8qfbtgeyxhatb
- https://www.entropy-tree.top/2024/02/09/webflux-project/
- https://blog.zhaojq.top/post/springwebflux/2-webflux%E6%A0%B8%E5%BF%83%E5%8E%9F%E7%90%86/
- https://blog.csdn.net/qq_19749625/category_12692883.html
- https://projectreactor.io/docs/core/release/reference/gettingStarted.html
- Spring WebFlux 实现 SSE 流式回复:类GPT逐字显示回复效果完整指南:https://blog.csdn.net/waiter456/article/details/141128557
https://blog.csdn.net/mst_sun/article/details/136484357 - https://www.axios-http.cn/docs/intro
- Knife4(的OpenAPI接口文档私有化聚合平台):https://doc.xiaominfo.com/docs/quick-start
- https://r2dbc.io/drivers/
- https://github.com/r2dbc/r2dbc-pool
- Spring Web Flux | Master - Slave - Pool Configuration :https://devashishtaneja.medium.com/spring-web-flux-r2dbc-master-slave-pool-configuration-a4cf0161a332
*springwebsecurity+jwt实战:https://www.cnblogs.com/auguse/articles/17654716.html - https://potoyang.gitbook.io/spring-in-action-v5