/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityGraphGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
import org.apache.flink.table.types.logical.RowType;

@Internal
public class InputPriorityConflictResolver
extends InputPriorityGraphGenerator {
    private final ShuffleMode shuffleMode;
    private final Configuration configuration;

    public InputPriorityConflictResolver(List<ExecNode<?>> roots, InputProperty.DamBehavior safeDamBehavior, ShuffleMode shuffleMode, Configuration configuration) {
        super(roots, Collections.emptySet(), safeDamBehavior);
        this.shuffleMode = shuffleMode;
        this.configuration = configuration;
    }

    public void detectAndResolve() {
        this.createTopologyGraph();
    }

    @Override
    protected void resolveInputPriorityConflict(ExecNode<?> node, int higherInput, int lowerInput) {
        BatchExecExchange newNode;
        ExecNode<?> higherNode = node.getInputEdges().get(higherInput).getSource();
        ExecNode<?> lowerNode = node.getInputEdges().get(lowerInput).getSource();
        if (lowerNode instanceof BatchExecExchange) {
            BatchExecExchange exchange = (BatchExecExchange)lowerNode;
            InputProperty inputEdge = exchange.getInputProperties().get(0);
            InputProperty inputProperty = InputProperty.builder().requiredDistribution(inputEdge.getRequiredDistribution()).priority(inputEdge.getPriority()).damBehavior(this.getDamBehavior()).build();
            if (this.isConflictCausedByExchange(higherNode, exchange)) {
                BatchExecExchange newExchange = new BatchExecExchange(inputProperty, (RowType)exchange.getOutputType(), "Exchange");
                newExchange.setRequiredShuffleMode(this.shuffleMode);
                newExchange.setInputEdges(exchange.getInputEdges());
                newNode = newExchange;
            } else {
                BatchExecExchange newExchange = new BatchExecExchange(inputProperty, (RowType)exchange.getOutputType(), exchange.getDescription());
                newExchange.setRequiredShuffleMode(this.shuffleMode);
                newExchange.setInputEdges(exchange.getInputEdges());
                newNode = newExchange;
            }
        } else {
            newNode = this.createExchange(node, lowerInput);
        }
        ExecEdge newEdge = ExecEdge.builder().source(newNode).target(node).build();
        node.replaceInputEdge(lowerInput, newEdge);
    }

    private boolean isConflictCausedByExchange(ExecNode<?> higherNode, BatchExecExchange lowerNode) {
        ConflictCausedByExchangeChecker checker = new ConflictCausedByExchangeChecker(lowerNode);
        checker.visit(higherNode);
        return checker.found;
    }

    private BatchExecExchange createExchange(ExecNode<?> node, int idx) {
        ExecNode<?> inputNode = node.getInputEdges().get(idx).getSource();
        InputProperty inputProperty = node.getInputProperties().get(idx);
        InputProperty.RequiredDistribution requiredDistribution = inputProperty.getRequiredDistribution();
        if (requiredDistribution.getType() == InputProperty.DistributionType.BROADCAST) {
            throw new IllegalStateException("Trying to resolve input priority conflict on broadcast side. This is not expected.");
        }
        InputProperty newInputProperty = InputProperty.builder().requiredDistribution(requiredDistribution).priority(inputProperty.getPriority()).damBehavior(this.getDamBehavior()).build();
        BatchExecExchange exchange = new BatchExecExchange(newInputProperty, (RowType)inputNode.getOutputType(), "Exchange");
        exchange.setRequiredShuffleMode(this.shuffleMode);
        ExecEdge execEdge = ExecEdge.builder().source(inputNode).target(exchange).build();
        exchange.setInputEdges(Collections.singletonList(execEdge));
        return exchange;
    }

    private InputProperty.DamBehavior getDamBehavior() {
        if (BatchExecExchange.getShuffleMode(this.configuration, this.shuffleMode) == ShuffleMode.BATCH) {
            return InputProperty.DamBehavior.BLOCKING;
        }
        return InputProperty.DamBehavior.PIPELINED;
    }

    private static class ConflictCausedByExchangeChecker
    extends AbstractExecNodeExactlyOnceVisitor {
        private final BatchExecExchange exchange;
        private boolean found;

        private ConflictCausedByExchangeChecker(BatchExecExchange exchange) {
            this.exchange = exchange;
        }

        @Override
        protected void visitNode(ExecNode<?> node) {
            if (node == this.exchange) {
                this.found = true;
            }
            for (ExecEdge inputEdge : node.getInputEdges()) {
                this.visit(inputEdge.getSource());
                if (!this.found) continue;
                return;
            }
        }
    }
}

