/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.source.resolver;

import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.errorcode.ConnectorErrors;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.common.source.message.BytesMessage;
import com.alibaba.ververica.connectors.common.source.message.ListByteMessage;
import com.alibaba.ververica.connectors.common.source.message.ListMessage;
import com.alibaba.ververica.connectors.common.source.message.ListStringMessage;
import com.alibaba.ververica.connectors.common.source.message.RawMessage;
import com.alibaba.ververica.connectors.common.source.resolver.DirtyDataStrategy;
import com.alibaba.ververica.connectors.common.source.resolver.RecordResolver;
import com.alibaba.ververica.connectors.common.source.resolver.parse.AbstractHighSpeedParser;
import com.alibaba.ververica.connectors.common.source.resolver.parse.BufferedTextRowHighSpeedParser;
import com.alibaba.ververica.connectors.common.source.resolver.parse.DefaultHighSpeedParser;
import com.alibaba.ververica.connectors.common.util.ByteSerializer;
import com.alibaba.ververica.connectors.common.util.ByteString;
import com.alibaba.ververica.connectors.common.util.StringSerializer;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSourceCollector
implements RecordResolver<List<RawMessage>, RowData> {
    private static final long serialVersionUID = -4053363045803644643L;
    private static final Logger logger = LoggerFactory.getLogger(DefaultSourceCollector.class);
    private static final int DEFAULT_LOG_INTERVAL_MS = 60000;
    private static final String rgTag = "__blink_rg__tag__";
    boolean highSpeed;
    private transient TableSchema schema;
    private int totalColumnSize;
    private int dataColumnSize;
    private Map<Integer, Integer> dataIndexMapping;
    private Set<String> headerFields;
    private Set<String> nullValues;
    private DirtyDataStrategy formatErrorStrategy;
    private DirtyDataStrategy fieldMissingStrategy;
    private DirtyDataStrategy fieldIncrementStrategy;
    private DirtyDataStrategy colLenStrategy;
    private String fieldDelimiter;
    private String encoding;
    private String lineDelimiter;
    private String sourceType = "";
    private ByteSerializer.ValueType[] fieldTypes;
    private boolean columnErrorDebug;
    private Meter parserTpsMetrics;
    private Meter bpsMetrics;
    private Counter parserSkipMetrics;
    private SimpleGauge batchReadCount;
    private long lastLogExceptionTime;
    private long lastLogHandleFieldTime;
    private boolean splitByWholeSeparator = false;
    private Map<Integer, Integer> colIndexLenFilter;
    private Map<String, Integer> colNameIndexMap;
    private Map<String, String> properties;
    private transient Object[] reuseObj;
    private AbstractHighSpeedParser highSpeedParser;
    private transient DataType[] fieldDataTypes;
    private final boolean hasMetadata;
    private final MetadataCollector metadataCollector;

    DefaultSourceCollector(TableSchema schema, List<String> headerFields, Map<String, String> properties, DirtyDataStrategy formatErrorStrategy, DirtyDataStrategy fieldMissingStrategy, DirtyDataStrategy fieldIncrementStrategy, DirtyDataStrategy colLenStrategy, String fieldDelimiter, String encoding, String lineDelimiter, boolean hasMetadata, MetadataConverter[] metadataConverters) {
        this.formatErrorStrategy = formatErrorStrategy;
        this.fieldMissingStrategy = fieldMissingStrategy;
        this.fieldIncrementStrategy = fieldIncrementStrategy;
        this.colLenStrategy = colLenStrategy;
        this.fieldDelimiter = StringEscapeUtils.unescapeJava(fieldDelimiter);
        this.encoding = encoding;
        this.lineDelimiter = StringEscapeUtils.unescapeJava(lineDelimiter);
        this.schema = schema;
        if (headerFields != null) {
            this.headerFields = new HashSet<String>(headerFields);
        }
        this.properties = properties;
        this.hasMetadata = hasMetadata;
        this.metadataCollector = new MetadataCollector(hasMetadata, metadataConverters);
        this.init();
    }

    @Override
    public void open(FunctionContext context) {
        DescriptorProperties config = new DescriptorProperties();
        config.putProperties(this.properties);
        this.schema = SchemaValidator.deriveTableSinkSchema((DescriptorProperties)config);
        this.fieldDataTypes = this.schema.getFieldDataTypes();
        this.parserTpsMetrics = MetricUtils.registerNumRecordsInRate(context);
        this.bpsMetrics = MetricUtils.registerNumBytesInRate(context, this.sourceType);
        this.parserSkipMetrics = MetricUtils.registerNumRecordsInErrors(context);
        this.batchReadCount = (SimpleGauge)context.getMetricGroup().gauge("currentNumRecordsPerBatch", (Gauge)new SimpleGauge());
        this.lastLogExceptionTime = System.currentTimeMillis();
        this.lastLogHandleFieldTime = System.currentTimeMillis();
        this.initHighSpeed();
    }

    @VisibleForTesting
    void initHighSpeed() {
        if (this.canUsePointerHighSpeedParser()) {
            this.highSpeedParser = new BufferedTextRowHighSpeedParser(this.fieldTypes, this.fieldDataTypes, this.fieldDelimiter, this.lineDelimiter, this.nullValues, this.columnErrorDebug, this.parserTpsMetrics);
            this.highSpeed = true;
        } else if (this.canUseDefaultHighSpeedParser()) {
            this.highSpeedParser = new DefaultHighSpeedParser(this.fieldTypes, this.fieldDataTypes, this.fieldDelimiter, this.lineDelimiter, this.splitByWholeSeparator, this.nullValues, this.formatErrorStrategy, this.fieldMissingStrategy, this.fieldIncrementStrategy, this.columnErrorDebug, this.parserTpsMetrics, this.parserSkipMetrics);
            this.highSpeed = true;
        }
    }

    private boolean canUsePointerHighSpeedParser() {
        return this.encoding.equalsIgnoreCase("UTF-8") && this.lineDelimiter.getBytes().length == 1 && this.fieldDelimiter.getBytes().length == 1 && this.totalColumnSize == this.dataColumnSize && this.colIndexLenFilter.size() == 0 && this.totalColumnSize > 1 && this.formatErrorStrategy == DirtyDataStrategy.SKIP && this.fieldMissingStrategy == DirtyDataStrategy.SKIP && this.fieldIncrementStrategy == DirtyDataStrategy.CUT && !this.hasMetadata;
    }

    private boolean canUseDefaultHighSpeedParser() {
        return this.encoding.equalsIgnoreCase("UTF-8") && this.lineDelimiter.getBytes().length == this.lineDelimiter.length() && (this.splitByWholeSeparator || this.fieldDelimiter.getBytes().length == this.fieldDelimiter.length()) && this.totalColumnSize == this.dataColumnSize && this.colIndexLenFilter.size() == 0;
    }

    public DefaultSourceCollector setColumnErrorDebug(boolean columnErrorDebug) {
        this.columnErrorDebug = columnErrorDebug;
        return this;
    }

    public boolean getColumnErrorDebug() {
        return this.columnErrorDebug;
    }

    public DirtyDataStrategy getFormatErrorStrategy() {
        return this.formatErrorStrategy;
    }

    public Set<String> getNullValues() {
        return this.nullValues;
    }

    private void init() {
        int i;
        logger.info("Init Method!");
        this.totalColumnSize = this.schema.getFieldNames().length;
        this.fieldTypes = new ByteSerializer.ValueType[this.totalColumnSize];
        this.dataColumnSize = 0;
        this.colNameIndexMap = new HashMap<String, Integer>();
        this.dataIndexMapping = new HashMap<Integer, Integer>();
        for (i = 0; i < this.schema.getFieldNames().length; ++i) {
            this.colNameIndexMap.put(this.schema.getFieldNames()[i], i);
        }
        for (i = 0; i < this.totalColumnSize; ++i) {
            ByteSerializer.ValueType type = ByteSerializer.getTypeIndex(this.schema.getFieldTypes()[i].getTypeClass());
            this.fieldTypes[i] = type;
            if (this.isHeaderField(i)) continue;
            this.dataIndexMapping.put(this.dataColumnSize, i);
            ++this.dataColumnSize;
        }
        if (null == this.properties) {
            logger.info("Properties is null!");
            return;
        }
        Configuration config = new Configuration();
        for (String key : this.properties.keySet()) {
            config.setString(key, this.properties.get(key));
        }
        String colFilterList = config.getString(CollectorOption.COLUMN_LENGTH_FILTER);
        String[] colFilterArray = StringUtils.split(colFilterList, ";");
        this.colIndexLenFilter = new HashMap<Integer, Integer>();
        for (String string : colFilterArray) {
            String[] colLen = StringUtils.split(string, ":");
            if (colLen == null || colLen.length != 2 || StringUtils.isEmpty(colLen[0]) || StringUtils.isEmpty(colLen[1]) || !this.colNameIndexMap.containsKey(colLen[0]) || !this.isStringType(colLen[0])) continue;
            try {
                int length = Integer.parseInt(colLen[1]);
                this.colIndexLenFilter.put(this.colNameIndexMap.get(colLen[0]), length);
            }
            catch (NumberFormatException e) {
                logger.warn("column filter length illegal:" + colLen[1]);
            }
        }
        if (this.colIndexLenFilter.size() > 0) {
            logger.info("Parser using column filter:" + colFilterList);
        }
        this.splitByWholeSeparator = config.getBoolean(CollectorOption.PARSER_SPLIT_BY_WHOLE_SEPARATOR);
        String nullValuesStr = config.getString(CollectorOption.PARSER_NULL_VALUES);
        String nullValuesDelimiter = config.getString(CollectorOption.PARSER_NULL_VALUES_DELIMITER);
        if (null != nullValuesStr) {
            this.nullValues = new HashSet<String>();
            if (nullValuesStr.contains(nullValuesDelimiter)) {
                String[] tmpValues;
                for (String s : tmpValues = StringUtils.splitPreserveAllTokens(nullValuesStr, nullValuesDelimiter)) {
                    this.nullValues.add(StringEscapeUtils.unescapeJava(s));
                }
            } else {
                this.nullValues.add(StringEscapeUtils.unescapeJava(nullValuesStr));
            }
            logger.info("nullValues: " + this.nullValues);
            for (String string : this.nullValues) {
                logger.info("nullValues: " + string);
            }
        }
    }

    private boolean isStringType(String fieldName) {
        TypeInformation typeInformation = this.schema.getFieldTypes()[this.colNameIndexMap.get(fieldName)];
        if (null != typeInformation) {
            ByteSerializer.ValueType type = ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
            return type == ByteSerializer.ValueType.V_String;
        }
        return false;
    }

    private boolean isByteArrayType(String fieldName) {
        TypeInformation typeInformation = this.schema.getFieldTypes()[this.colNameIndexMap.get(fieldName)];
        if (null != typeInformation) {
            ByteSerializer.ValueType type = ByteSerializer.getTypeIndex(typeInformation.getTypeClass());
            return type == ByteSerializer.ValueType.V_ByteArray;
        }
        return false;
    }

    private boolean isHeaderField(int index) {
        return this.headerFields != null && this.headerFields.contains(this.schema.getFieldNames()[index]);
    }

    private String getHeaderValue(RawMessage msg, int index) {
        Object obj = msg.getProperty(this.schema.getFieldNames()[index]);
        return obj != null ? (String)obj : "";
    }

    private String getValue(RawMessage msg, String[] data, String line, int idx) {
        String fieldValue = null;
        if (this.isHeaderField(idx)) {
            fieldValue = this.getHeaderValue(msg, idx);
        } else if (this.dataColumnSize == 1) {
            fieldValue = line;
        } else if (idx < data.length) {
            fieldValue = data[idx];
        }
        return fieldValue;
    }

    private boolean isOnlyHaveVarbinaryDataField() {
        if (this.dataColumnSize == 1 && this.dataIndexMapping.size() == 1) {
            int idx = this.dataIndexMapping.get(0);
            if (this.isByteArrayType(this.schema.getFieldNames()[idx])) {
                return true;
            }
        }
        return false;
    }

    private boolean isAllHeaderField() {
        return null != this.headerFields && this.headerFields.size() == this.schema.getFieldNames().length;
    }

    @Override
    public void parse(List<RawMessage> msgs, Collector<RowData> collector) {
        this.metadataCollector.collector = collector;
        this.parseRawMessage(msgs, this.metadataCollector);
    }

    private void parseRawMessage(List<RawMessage> msgs, MetadataCollector collector) {
        if (null == msgs || msgs.size() == 0) {
            return;
        }
        if (this.batchReadCount != null) {
            this.batchReadCount.report(msgs.size());
        }
        Iterator<RawMessage> iterator = msgs.iterator();
        while (iterator.hasNext()) {
            GenericRowData row;
            RawMessage msg;
            collector.message = msg = iterator.next();
            if (msg instanceof ListMessage) {
                GenericRowData row2 = new GenericRowData(this.totalColumnSize);
                Object records = ((ListMessage)msg).getData();
                long length = 0L;
                boolean isRGData = msg.getProperties() != null && msg.getProperties().containsKey(rgTag);
                Iterator iterator2 = records.iterator();
                while (iterator2.hasNext()) {
                    int i;
                    Object[] fields = (Object[])iterator2.next();
                    boolean skip = false;
                    if (msg instanceof ListStringMessage) {
                        String[] strFields = (String[])fields;
                        for (i = 0; i < strFields.length; ++i) {
                            try {
                                row2.setField(i, StringSerializer.deserialize(strFields[i], this.fieldTypes[i], this.fieldDataTypes[i], isRGData));
                                if (fields[i] == null) continue;
                                length += (long)strFields[i].length();
                                continue;
                            }
                            catch (Exception e) {
                                skip = this.handleException(row2, i, fields, e);
                            }
                        }
                    } else if (msg instanceof ListByteMessage) {
                        ByteString[] byteFields = (ByteString[])fields;
                        for (i = 0; i < byteFields.length; ++i) {
                            try {
                                row2.setField(i, StringSerializer.deserialize(byteFields[i], this.fieldTypes[i], this.fieldDataTypes[i], isRGData, this.reuseObj[i]));
                                if (fields[i] == null) continue;
                                length += (long)byteFields[i].size();
                                continue;
                            }
                            catch (Exception e) {
                                skip = this.handleException(row2, i, Arrays.stream(byteFields).map(bytes -> bytes == null ? null : bytes.toStringUtf8()).toArray(), e);
                            }
                        }
                    } else {
                        throw new RuntimeException();
                    }
                    if (skip) {
                        if (this.parserSkipMetrics == null) continue;
                        this.parserSkipMetrics.inc();
                        continue;
                    }
                    collector.collect((RowData)row2);
                    if (null == this.parserTpsMetrics) continue;
                    this.parserTpsMetrics.markEvent();
                }
                if (null == this.bpsMetrics) continue;
                this.bpsMetrics.markEvent(length);
                continue;
            }
            int length = 0;
            if (null != ((BytesMessage)msg).getData()) {
                length = ((BytesMessage)msg).getData().length;
            }
            if (null != this.bpsMetrics) {
                this.bpsMetrics.markEvent((long)length);
            }
            if (this.isOnlyHaveVarbinaryDataField()) {
                row = new GenericRowData(this.totalColumnSize);
                int dataIdx = this.dataIndexMapping.get(0);
                row.setField(dataIdx, (Object)((BytesMessage)msg).getData());
                for (int i = 0; i < this.totalColumnSize; ++i) {
                    if (i == dataIdx) continue;
                    String headerValue = this.getHeaderValue(msg, i);
                    row.setField(i, StringSerializer.deserialize(headerValue, this.fieldTypes[i], this.fieldDataTypes[i], this.nullValues));
                }
                collector.collect((RowData)row);
                this.parserTpsMetrics.markEvent();
                continue;
            }
            if (this.isAllHeaderField()) {
                row = new GenericRowData(this.totalColumnSize);
                for (int i = 0; i < this.totalColumnSize; ++i) {
                    String headerValue = this.getHeaderValue(msg, i);
                    row.setField(i, StringSerializer.deserialize(headerValue, this.fieldTypes[i], this.fieldDataTypes[i], this.nullValues));
                }
                collector.collect((RowData)row);
                this.parserTpsMetrics.markEvent();
                continue;
            }
            if (null == msg.getData()) {
                logger.info("Empty ByteMessage Body, Ignore It.");
                return;
            }
            if (this.highSpeed) {
                this.highSpeedParser.parseBytesMessageByBinary(collector, msg);
                continue;
            }
            this.parseBytesMessageByString(collector, msg);
        }
    }

    private void parseBytesMessageByString(Collector<RowData> collector, RawMessage<?> msg) {
        String[] lineArray;
        String lines;
        try {
            lines = new String(((BytesMessage)msg).getData(), this.encoding);
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException(ConnectorErrors.INST.parserUnsupportedEncodingError(this.encoding), e);
        }
        for (String line : lineArray = StringUtils.split(lines, this.lineDelimiter)) {
            Object[] data = !this.splitByWholeSeparator ? StringUtils.splitPreserveAllTokens(line, this.fieldDelimiter) : StringUtils.splitByWholeSeparatorPreserveAllTokens(line, this.fieldDelimiter);
            if (this.dataColumnSize == 1) {
                data = new String[]{line};
            }
            if (data.length < this.dataColumnSize) {
                data = this.handleFieldMissing((String[])data);
            } else if (data.length > this.dataColumnSize) {
                data = this.handleFieldIncrement((String[])data);
            }
            if (data == null) {
                if (this.parserSkipMetrics == null) continue;
                this.parserSkipMetrics.inc();
                continue;
            }
            GenericRowData row = new GenericRowData(this.totalColumnSize);
            boolean skip = false;
            Set<Integer> colFilterOrd = this.colIndexLenFilter.keySet();
            for (int idx = 0; idx < this.totalColumnSize; ++idx) {
                String fieldValue;
                try {
                    fieldValue = this.getValue(msg, (String[])data, line, idx);
                    row.setField(idx, StringSerializer.deserialize(fieldValue, this.fieldTypes[idx], this.fieldDataTypes[idx], this.nullValues));
                }
                catch (Exception e) {
                    skip = this.handleException(row, idx, data, e);
                }
                if (colFilterOrd.isEmpty() || !colFilterOrd.contains(idx) || (fieldValue = this.getValue(msg, (String[])data, line, idx)) == null) continue;
                Object filterResult = this.handleFiledLengthLarge(fieldValue, this.colIndexLenFilter.get(idx));
                if (filterResult == null) {
                    skip = true;
                    break;
                }
                row.setField(idx, StringSerializer.deserialize(fieldValue, this.fieldTypes[idx], this.fieldDataTypes[idx], this.nullValues));
            }
            if (skip) {
                if (this.parserSkipMetrics == null) continue;
                this.parserSkipMetrics.inc();
                continue;
            }
            collector.collect((Object)row);
            if (null == this.parserTpsMetrics) continue;
            this.parserTpsMetrics.markEvent();
        }
    }

    private boolean handleException(GenericRowData row, int idx, Object[] data, Exception e) {
        boolean skip = false;
        switch (this.formatErrorStrategy) {
            case SKIP: {
                long now = System.currentTimeMillis();
                if (this.columnErrorDebug || now - this.lastLogExceptionTime > 60000L) {
                    logger.warn("Data format error, field type: " + (Object)((Object)this.fieldTypes[idx]) + "field data: " + data[idx] + ", index: " + idx + ", data: [" + StringUtils.join(data, ",") + "]", (Throwable)e);
                    this.lastLogExceptionTime = now;
                }
                skip = true;
                break;
            }
            case SKIP_SILENT: {
                skip = true;
                break;
            }
            default: {
                row.setField(idx, null);
                break;
            }
            case EXCEPTION: {
                throw new RuntimeException(ConnectorErrors.INST.parserDataFormatError(String.valueOf((Object)this.fieldTypes[idx]), String.valueOf(data[idx]), String.valueOf(idx), StringUtils.join(data, ",")), e);
            }
        }
        return skip;
    }

    private String[] handleFieldMissing(String[] data) {
        switch (this.fieldMissingStrategy) {
            default: {
                long now = System.currentTimeMillis();
                if (this.columnErrorDebug || now - this.lastLogHandleFieldTime > 60000L) {
                    logger.warn("Field missing error, table column number: " + this.totalColumnSize + ", data column number: " + this.dataColumnSize + ", data field number: " + data.length + ", data: [" + StringUtils.join((Object[])data, ",") + "]");
                    this.lastLogHandleFieldTime = now;
                }
                return null;
            }
            case SKIP_SILENT: {
                return null;
            }
            case CUT: 
            case NULL: 
            case PAD: {
                String[] res = new String[this.totalColumnSize];
                for (int i = 0; i < data.length; ++i) {
                    Integer dataIdx = this.dataIndexMapping.get(i);
                    if (dataIdx == null) continue;
                    res[dataIdx.intValue()] = data[i];
                }
                return res;
            }
            case EXCEPTION: 
        }
        throw new RuntimeException(ConnectorErrors.INST.parserFieldMissingError(String.valueOf(this.totalColumnSize), String.valueOf(this.dataColumnSize), String.valueOf(data.length), StringUtils.join((Object[])data, ",")));
    }

    private String[] handleFieldIncrement(String[] data) {
        switch (this.fieldIncrementStrategy) {
            case SKIP: {
                long now = System.currentTimeMillis();
                if (this.columnErrorDebug || now - this.lastLogHandleFieldTime > 60000L) {
                    logger.warn("Field increment error, table column number: " + this.totalColumnSize + ", data column number: " + this.dataColumnSize + ", data field number: " + data.length + ", data: [" + StringUtils.join((Object[])data, ",") + "]");
                    this.lastLogHandleFieldTime = now;
                }
                return null;
            }
            case SKIP_SILENT: {
                return null;
            }
            default: {
                String[] res = new String[this.totalColumnSize];
                for (int i = 0; i < this.dataColumnSize; ++i) {
                    Integer dataIdx = this.dataIndexMapping.get(i);
                    if (dataIdx == null) continue;
                    res[dataIdx.intValue()] = data[i];
                }
                return res;
            }
            case EXCEPTION: 
        }
        throw new RuntimeException(ConnectorErrors.INST.parserFieldIncrementError(String.valueOf(this.totalColumnSize), String.valueOf(this.dataColumnSize), String.valueOf(data.length), StringUtils.join((Object[])data, ",")));
    }

    private Object handleFiledLengthLarge(String data, int length) {
        if (data != null && data.length() > length) {
            switch (this.colLenStrategy) {
                case SKIP: {
                    long now = System.currentTimeMillis();
                    if (this.columnErrorDebug || now - this.lastLogHandleFieldTime > 60000L) {
                        logger.warn("Field too long error, data length: " + String.valueOf(data).length() + ", exceed specified length: " + length + ", data: [" + data + "]");
                        this.lastLogHandleFieldTime = now;
                    }
                    return null;
                }
                case SKIP_SILENT: {
                    return null;
                }
            }
            long ts = System.currentTimeMillis();
            if (this.columnErrorDebug || ts - this.lastLogHandleFieldTime > 60000L) {
                logger.warn("Field too long error, data length: " + String.valueOf(data).length() + ", exceed specified length: " + length + ", data: [" + data + "], use cut strategy to cut field value");
                this.lastLogHandleFieldTime = ts;
            }
            return String.valueOf(data).substring(0, length);
        }
        return data;
    }

    public TypeInformation<RowData> getProducedType() {
        return InternalTypeInfo.of((RowType)((RowType)this.schema.toRowDataType().getLogicalType()));
    }

    public static final class MetadataCollector
    implements Collector<RowData>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final boolean hasMetadata;
        private final MetadataConverter[] metadataConverters;
        public transient RawMessage<?> message;
        public transient Collector<RowData> collector;

        public MetadataCollector(boolean hasMetadata, MetadataConverter[] metadataConverters) {
            this.hasMetadata = hasMetadata;
            this.metadataConverters = metadataConverters;
        }

        public void collect(RowData physicalRow) {
            if (this.hasMetadata) {
                int index;
                int physicalArity = physicalRow.getArity();
                int metadataArity = this.metadataConverters.length;
                GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
                GenericRowData genericPhysicalRow = (GenericRowData)physicalRow;
                for (index = 0; index < physicalArity; ++index) {
                    producedRow.setField(index, genericPhysicalRow.getField(index));
                }
                for (index = 0; index < metadataArity; ++index) {
                    producedRow.setField(index + physicalArity, this.metadataConverters[index].read(this.message));
                }
                this.collector.collect((Object)producedRow);
            } else {
                this.collector.collect((Object)physicalRow);
            }
        }

        public void close() {
        }
    }

    public static interface MetadataConverter
    extends Serializable {
        public Object read(RawMessage<?> var1);
    }

    public static class CollectorOption {
        public static final ConfigOption<Boolean> COLUMN_ERROR_DEBUG = ConfigOptions.key((String)"columnErrorDebug".toLowerCase()).defaultValue((Object)true);
        public static final ConfigOption<String> LENGTH_CHECK = ConfigOptions.key((String)"lengthCheck".toLowerCase()).defaultValue((Object)"NONE");
        public static final ConfigOption<String> LINE_DELIMITER = ConfigOptions.key((String)"lineDelimiter".toLowerCase()).defaultValue((Object)"\n");
        public static final ConfigOption<String> FIELD_DELIMITER = ConfigOptions.key((String)"fieldDelimiter".toLowerCase()).defaultValue((Object)"\u0001");
        public static final ConfigOption<String> ENCODING = ConfigOptions.key((String)"encoding".toLowerCase()).defaultValue((Object)"UTF-8");
        public static final ConfigOption<String> COLUMN_LENGTH_FILTER = ConfigOptions.key((String)"columnLengthFilter".toLowerCase()).defaultValue((Object)"");
        public static final ConfigOption<String> PARSER_NULL_VALUES = ConfigOptions.key((String)"nullValues".toLowerCase()).noDefaultValue();
        public static final ConfigOption<Boolean> PARSER_SPLIT_BY_WHOLE_SEPARATOR = ConfigOptions.key((String)"splitByWholeSeparator".toLowerCase()).defaultValue((Object)false);
        public static final ConfigOption<String> PARSER_NULL_VALUES_DELIMITER = ConfigOptions.key((String)"nullValuesDelimiter".toLowerCase()).defaultValue((Object)"|");
        public static final ConfigOption<String> PARSER_HEADER_FIELDS = ConfigOptions.key((String)"headerColumns".toLowerCase()).noDefaultValue();
    }

    public static class Builder {
        Map<String, String> properties;
        String encoding = "UTF-8";
        String lineDelimiter = "\n";
        String fieldDelimiter = "\u0001";
        DirtyDataStrategy formatErrorStrategy = DirtyDataStrategy.SKIP;
        DirtyDataStrategy fieldMissingStrategy = DirtyDataStrategy.SKIP;
        DirtyDataStrategy fieldIncrementStrategy = DirtyDataStrategy.CUT;
        DirtyDataStrategy colLenStrategy = DirtyDataStrategy.SKIP;
        private boolean columnErrorDebug = false;
        private TableSchema schema;
        private List<String> headerFields = null;
        private boolean hasMetadata;
        private MetadataConverter[] metadataConverters;

        public Builder setTableSchema(TableSchema tableSchema) {
            this.schema = tableSchema;
            return this;
        }

        public Builder setHeaderFields(List<String> headerFields) {
            this.headerFields = headerFields;
            return this;
        }

        public Builder setFormatErrorStrategy(DirtyDataStrategy formatErrorStrategy) {
            this.formatErrorStrategy = formatErrorStrategy;
            return this;
        }

        public Builder setFieldMissingStrategy(DirtyDataStrategy fieldMissingStrategy) {
            this.fieldMissingStrategy = fieldMissingStrategy;
            return this;
        }

        public Builder setFieldIncrementStrategy(DirtyDataStrategy fieldIncrementStrategy) {
            this.fieldIncrementStrategy = fieldIncrementStrategy;
            return this;
        }

        public Builder setColLenStrategy(DirtyDataStrategy colLenStrategy) {
            this.colLenStrategy = colLenStrategy;
            return this;
        }

        public Builder setColumnErrorDebug(boolean columnErrorDebug) {
            this.columnErrorDebug = columnErrorDebug;
            return this;
        }

        public Builder setProperties(Map<String, String> properties) {
            this.properties = properties;
            if (null == properties) {
                return this;
            }
            Configuration config = new Configuration();
            for (String key : properties.keySet()) {
                config.setString(key, properties.get(key));
            }
            String lengthCheck = (String)config.get(CollectorOption.LENGTH_CHECK);
            switch (lengthCheck.toUpperCase()) {
                case "SKIP": {
                    this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
                    this.setFieldMissingStrategy(DirtyDataStrategy.SKIP);
                    this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP);
                    this.setColLenStrategy(DirtyDataStrategy.SKIP);
                    break;
                }
                case "PAD": {
                    this.setFormatErrorStrategy(DirtyDataStrategy.SKIP);
                    this.setFieldMissingStrategy(DirtyDataStrategy.PAD);
                    this.setFieldIncrementStrategy(DirtyDataStrategy.CUT);
                    this.setColLenStrategy(DirtyDataStrategy.SKIP);
                    break;
                }
                case "EXCEPTION": {
                    this.setFormatErrorStrategy(DirtyDataStrategy.EXCEPTION);
                    this.setFieldMissingStrategy(DirtyDataStrategy.EXCEPTION);
                    this.setFieldIncrementStrategy(DirtyDataStrategy.EXCEPTION);
                    this.setColLenStrategy(DirtyDataStrategy.SKIP);
                    break;
                }
                case "SKIP_SILENT": {
                    this.setFormatErrorStrategy(DirtyDataStrategy.SKIP_SILENT);
                    this.setFieldMissingStrategy(DirtyDataStrategy.SKIP_SILENT);
                    this.setFieldIncrementStrategy(DirtyDataStrategy.SKIP_SILENT);
                    this.setColLenStrategy(DirtyDataStrategy.SKIP_SILENT);
                    break;
                }
            }
            this.setColumnErrorDebug(config.getBoolean(CollectorOption.COLUMN_ERROR_DEBUG));
            this.setFieldDelimiter(config.getString(CollectorOption.FIELD_DELIMITER));
            this.setLineDelimiter(config.getString(CollectorOption.LINE_DELIMITER));
            this.setEncoding(config.getString(CollectorOption.ENCODING));
            if (!com.alibaba.ververica.connectors.common.util.StringUtils.isBlank(config.getString(CollectorOption.PARSER_HEADER_FIELDS), ",")) {
                this.headerFields = Lists.newArrayList((Object[])config.getString(CollectorOption.PARSER_HEADER_FIELDS).split(","));
            }
            return this;
        }

        public Builder setEncoding(String encoding) {
            this.encoding = encoding;
            return this;
        }

        public Builder setLineDelimiter(String lineDelimiter) {
            this.lineDelimiter = lineDelimiter;
            return this;
        }

        public Builder setFieldDelimiter(String fieldDelimiter) {
            this.fieldDelimiter = fieldDelimiter;
            return this;
        }

        public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
            this.metadataConverters = metadataConverters;
            return this;
        }

        public Builder setHasMetadata(boolean hasMetadata) {
            this.hasMetadata = hasMetadata;
            return this;
        }

        public DefaultSourceCollector build() {
            DefaultSourceCollector defaultSourceCollector = new DefaultSourceCollector(this.schema, this.headerFields, this.properties, this.formatErrorStrategy, this.fieldMissingStrategy, this.fieldIncrementStrategy, this.colLenStrategy, this.fieldDelimiter, this.encoding, this.lineDelimiter, this.hasMetadata, this.metadataConverters);
            defaultSourceCollector.setColumnErrorDebug(this.columnErrorDebug);
            return defaultSourceCollector;
        }
    }
}

