/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.ArrayList;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.codegen.CodeGenUtils;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.SinkCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.sinks.DataStreamTableSink;
import org.apache.flink.table.planner.sinks.TableSinkUtils;
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public abstract class CommonExecLegacySink<T>
extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
    protected final TableSink<T> tableSink;
    @Nullable
    protected final String[] upsertKeys;
    protected final boolean needRetraction;
    protected final boolean isStreaming;

    public CommonExecLegacySink(TableSink<T> tableSink, @Nullable String[] upsertKeys, boolean needRetraction, boolean isStreaming, InputProperty inputProperty, LogicalType outputType, String description) {
        super(Collections.singletonList(inputProperty), outputType, description);
        this.tableSink = tableSink;
        this.upsertKeys = upsertKeys;
        this.needRetraction = needRetraction;
        this.isStreaming = isStreaming;
    }

    @Override
    protected Transformation<Object> translateToPlanInternal(PlannerBase planner) {
        if (this.tableSink instanceof StreamTableSink) {
            Transformation<T> transform2;
            if (this.tableSink instanceof RetractStreamTableSink) {
                transform2 = this.translateToTransformation(planner, true);
            } else if (this.tableSink instanceof UpsertStreamTableSink) {
                UpsertStreamTableSink upsertSink = (UpsertStreamTableSink)this.tableSink;
                boolean isAppendOnlyTable = !this.needRetraction;
                upsertSink.setIsAppendOnly(Boolean.valueOf(isAppendOnlyTable));
                if (this.upsertKeys != null) {
                    upsertSink.setKeyFields(this.upsertKeys);
                } else if (isAppendOnlyTable) {
                    upsertSink.setKeyFields(null);
                } else {
                    throw new TableException("UpsertStreamTableSink requires that Table has a full primary keys if it is updated.");
                }
                transform2 = this.translateToTransformation(planner, true);
            } else if (this.tableSink instanceof AppendStreamTableSink) {
                if (this.needRetraction) {
                    throw new TableException("AppendStreamTableSink requires that Table has only insert changes.");
                }
                transform2 = this.translateToTransformation(planner, false);
            } else {
                if (this.isStreaming) {
                    throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.");
                }
                transform2 = this.translateToTransformation(planner, false);
            }
            DataStream dataStream = new DataStream(planner.getExecEnv(), transform2);
            DataStreamSink dsSink = ((StreamTableSink)this.tableSink).consumeDataStream(dataStream);
            if (dsSink == null) {
                throw new TableException(String.format("The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, %s doesn't implement this method.", this.tableSink.getClass().getCanonicalName()));
            }
            return dsSink.getTransformation();
        }
        if (this.tableSink instanceof DataStreamTableSink) {
            return this.translateToTransformation(planner, ((DataStreamTableSink)this.tableSink).withChangeFlag());
        }
        throw new TableException(String.format("Only Support StreamTableSink! However %s is not a StreamTableSink.", this.tableSink.getClass().getCanonicalName()));
    }

    protected abstract RowType checkAndConvertInputTypeIfNeeded(RowType var1);

    private Transformation<T> translateToTransformation(PlannerBase planner, boolean withChangeFlag) {
        if (!withChangeFlag && this.needRetraction) {
            throw new TableException("Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages.");
        }
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        RowType convertedInputRowType = this.checkAndConvertInputTypeIfNeeded(inputRowType);
        DataType resultDataType = this.tableSink.getConsumedDataType();
        if (CodeGenUtils.isInternalClass(resultDataType)) {
            return inputTransform;
        }
        int rowtimeIndex = this.getRowtimeIndex(inputRowType);
        DataType physicalOutputType = TableSinkUtils.inferSinkPhysicalDataType(resultDataType, convertedInputRowType, withChangeFlag);
        TypeInformation outputTypeInfo = SinkCodeGenerator.deriveSinkOutputTypeInfo(this.tableSink, physicalOutputType, withChangeFlag);
        CodeGenOperatorFactory converterOperator = SinkCodeGenerator.generateRowConverterOperator(new CodeGeneratorContext(planner.getTableConfig()), convertedInputRowType, this.tableSink, physicalOutputType, withChangeFlag, "SinkConversion", rowtimeIndex);
        return new OneInputTransformation(inputTransform, "SinkConversionTo" + resultDataType.getConversionClass().getSimpleName(), converterOperator, outputTypeInfo, inputTransform.getParallelism());
    }

    private int getRowtimeIndex(RowType inputRowType) {
        int rowtimeIndex = -1;
        ArrayList<Integer> rowtimeFieldIndices = new ArrayList<Integer>();
        for (int i = 0; i < inputRowType.getFieldCount(); ++i) {
            if (!TypeCheckUtils.isRowTime((LogicalType)inputRowType.getTypeAt(i))) continue;
            rowtimeFieldIndices.add(i);
        }
        if (rowtimeFieldIndices.size() == 1) {
            rowtimeIndex = (Integer)rowtimeFieldIndices.get(0);
        }
        return rowtimeIndex;
    }
}

