|
8 | 8 | import org.hswebframework.ezorm.rdb.executor.reactive.r2dbc.R2dbcReactiveSqlExecutor; |
9 | 9 | import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; |
10 | 10 | import org.hswebframework.web.api.crud.entity.TransactionManagers; |
11 | | -import org.hswebframework.web.datasource.DataSourceHolder; |
12 | | -import org.hswebframework.web.datasource.R2dbcDataSource; |
13 | 11 | import org.hswebframework.web.exception.I18nSupportException; |
14 | 12 | import org.reactivestreams.Publisher; |
15 | 13 | import org.springframework.beans.factory.annotation.Autowired; |
16 | 14 | import org.springframework.r2dbc.connection.ConnectionFactoryUtils; |
17 | | -import org.springframework.r2dbc.core.ConnectionAccessor; |
18 | 15 | import org.springframework.transaction.annotation.Propagation; |
19 | 16 | import org.springframework.transaction.annotation.Transactional; |
20 | 17 | import reactor.core.publisher.Flux; |
21 | 18 | import reactor.core.publisher.Mono; |
22 | 19 | import reactor.core.publisher.SignalType; |
23 | 20 |
|
| 21 | +import java.io.Serial; |
24 | 22 | import java.time.LocalDateTime; |
25 | 23 | import java.time.ZoneOffset; |
26 | 24 | import java.util.Date; |
27 | 25 | import java.util.Map; |
| 26 | +import java.util.concurrent.atomic.AtomicBoolean; |
28 | 27 | import java.util.function.Function; |
29 | 28 |
|
30 | 29 | public class DefaultR2dbcExecutor extends R2dbcReactiveSqlExecutor { |
@@ -97,18 +96,50 @@ protected Mono<Connection> getConnection() { |
97 | 96 |
|
98 | 97 | @Override |
99 | 98 | protected <T> Flux<T> doInConnection(Function<Connection, Publisher<T>> handler) { |
| 99 | + Mono<ConnectionCloseHolder> connectionMono = getConnection().map( |
| 100 | + connection -> new ConnectionCloseHolder(connection, this::closeConnection)); |
| 101 | + |
100 | 102 | return Flux.usingWhen( |
101 | | - ConnectionFactoryUtils.getConnection(defaultFactory), |
102 | | - handler, |
103 | | - source -> ConnectionFactoryUtils |
104 | | - .currentConnectionFactory(defaultFactory) |
105 | | - .then() |
106 | | - .onErrorResume(Exception.class, ex -> Mono.from(source.close())) |
| 103 | + connectionMono, |
| 104 | + holder -> handler.apply(holder.connection), |
| 105 | + ConnectionCloseHolder::close, |
| 106 | + (it, err) -> it.close(), |
| 107 | + ConnectionCloseHolder::close |
107 | 108 | ); |
108 | 109 |
|
109 | 110 | // return super.doWith(handler); |
110 | 111 | } |
111 | 112 |
|
| 113 | + static class ConnectionCloseHolder extends AtomicBoolean { |
| 114 | + |
| 115 | + @Serial |
| 116 | + private static final long serialVersionUID = -8994138383301201380L; |
| 117 | + |
| 118 | + final transient Connection connection; |
| 119 | + |
| 120 | + final transient Function<Connection, Publisher<Void>> closeFunction; |
| 121 | + |
| 122 | + ConnectionCloseHolder(Connection connection, Function<Connection, Publisher<Void>> closeFunction) { |
| 123 | + this.connection = connection; |
| 124 | + this.closeFunction = closeFunction; |
| 125 | + } |
| 126 | + |
| 127 | + Mono<Void> close() { |
| 128 | + return Mono.defer(() -> { |
| 129 | + if (compareAndSet(false, true)) { |
| 130 | + return Mono.from(this.closeFunction.apply(this.connection)); |
| 131 | + } |
| 132 | + return Mono.empty(); |
| 133 | + }); |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + private Publisher<Void> closeConnection(Connection connection) { |
| 138 | + return ConnectionFactoryUtils |
| 139 | + .currentConnectionFactory(defaultFactory).then() |
| 140 | + .onErrorResume(Exception.class, ex -> Mono.from(connection.close())); |
| 141 | + } |
| 142 | + |
112 | 143 | @Override |
113 | 144 | protected void releaseConnection(SignalType type, Connection connection) { |
114 | 145 | //所有方法都被事务接管,不用手动释放 |
|
0 commit comments