/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.r2dbc.function;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.r2dbc.function.DatabaseClient;
import org.springframework.data.r2dbc.function.DefaultDatabaseClient;
import org.springframework.data.r2dbc.function.DefaultDatabaseClientBuilder;
import org.springframework.data.r2dbc.function.ReactiveDataAccessStrategy;
import org.springframework.data.r2dbc.function.TransactionalDatabaseClient;
import org.springframework.data.r2dbc.function.connectionfactory.ConnectionFactoryUtils;
import org.springframework.data.r2dbc.function.connectionfactory.ReactiveTransactionSynchronization;
import org.springframework.data.r2dbc.function.connectionfactory.TransactionResources;
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
import org.springframework.transaction.NoTransactionException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;

class DefaultTransactionalDatabaseClient
extends DefaultDatabaseClient
implements TransactionalDatabaseClient {
    DefaultTransactionalDatabaseClient(ConnectionFactory connector, R2dbcExceptionTranslator exceptionTranslator, ReactiveDataAccessStrategy dataAccessStrategy, DefaultDatabaseClientBuilder builder) {
        super(connector, exceptionTranslator, dataAccessStrategy, builder);
    }

    @Override
    public TransactionalDatabaseClient.Builder mutate() {
        return (TransactionalDatabaseClient.Builder)super.mutate();
    }

    @Override
    public Mono<Void> beginTransaction() {
        Mono transactional = ConnectionFactoryUtils.currentReactiveTransactionSynchronization().map(synchronization -> {
            TransactionResources transactionResources = TransactionResources.create();
            synchronization.registerTransaction(transactionResources);
            return transactionResources;
        });
        return transactional.flatMap(it -> ConnectionFactoryUtils.doGetConnection(this.obtainConnectionFactory())).flatMap(it -> Mono.from((Publisher)((Connection)it.getT1()).beginTransaction()));
    }

    @Override
    public Mono<Void> commitTransaction() {
        return DefaultTransactionalDatabaseClient.cleanup(Connection::commitTransaction);
    }

    @Override
    public Mono<Void> rollbackTransaction() {
        return DefaultTransactionalDatabaseClient.cleanup(Connection::rollbackTransaction);
    }

    @Override
    public <T> Flux<T> inTransaction(Function<DatabaseClient, ? extends Publisher<? extends T>> callback) {
        return Flux.usingWhen((Publisher)this.beginTransaction().thenReturn((Object)this), callback, DefaultTransactionalDatabaseClient::commitTransaction, DefaultTransactionalDatabaseClient::rollbackTransaction, DefaultTransactionalDatabaseClient::rollbackTransaction).subscriberContext(DefaultTransactionalDatabaseClient::withTransactionSynchronization);
    }

    @Override
    protected Mono<Connection> getConnection() {
        return ConnectionFactoryUtils.getConnection(this.obtainConnectionFactory()).map(Tuple2::getT1);
    }

    @Override
    protected Publisher<Void> closeConnection(Connection connection) {
        return Mono.subscriberContext().flatMap(context -> {
            if (context.hasKey(ReactiveTransactionSynchronization.class)) {
                return ConnectionFactoryUtils.currentConnectionFactory().flatMap(it -> ConnectionFactoryUtils.releaseConnection(connection, it));
            }
            return Mono.from((Publisher)connection.close());
        });
    }

    private static Mono<Void> cleanup(Function<Connection, ? extends Publisher<Void>> callback) {
        return ConnectionFactoryUtils.currentActiveReactiveTransactionSynchronization().flatMap(synchronization -> {
            TransactionResources currentSynchronization = synchronization.getCurrentTransaction();
            ConnectionFactory connectionFactory = currentSynchronization.getResource(ConnectionFactory.class);
            if (connectionFactory == null) {
                throw new NoTransactionException("No ConnectionFactory attached");
            }
            return Mono.from((Publisher)connectionFactory.create()).flatMap(connection -> Mono.from((Publisher)((Publisher)callback.apply((Connection)connection))).then(ConnectionFactoryUtils.releaseConnection(connection, connectionFactory)).then(ConnectionFactoryUtils.closeConnection(connection, connectionFactory))).doFinally(s -> synchronization.unregisterTransaction(currentSynchronization));
        });
    }

    static Context withTransactionSynchronization(Context context) {
        return context.put(ReactiveTransactionSynchronization.class, context.getOrDefault(ReactiveTransactionSynchronization.class, (Object)new ReactiveTransactionSynchronization()));
    }
}

