/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.async;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.NotFoundException;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.OperationKey;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.types.Either;
import org.apache.flink.util.FlinkException;

public abstract class AbstractAsynchronousOperationHandlers<K extends OperationKey, R> {
    private final CompletedOperationCache<K, R> completedOperationCache = new CompletedOperationCache();

    static class UnknownOperationKey
    extends FlinkException {
        private static final long serialVersionUID = 1L;

        UnknownOperationKey(Object operationKey) {
            super("No ongoing operation for " + operationKey);
        }
    }

    @ThreadSafe
    protected static class CompletedOperationCache<K, R> {
        private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
        private final Set<K> registeredOperationTriggers = ConcurrentHashMap.newKeySet();
        private final Cache<K, Either<Throwable, R>> completedOperations = CacheBuilder.newBuilder().expireAfterWrite(300L, TimeUnit.SECONDS).build();

        protected CompletedOperationCache() {
        }

        public void registerOngoingOperation(K operationKey, CompletableFuture<R> operationResultFuture) {
            this.registeredOperationTriggers.add(operationKey);
            operationResultFuture.whenComplete((savepointLocation, error) -> {
                if (error == null) {
                    this.completedOperations.put(operationKey, (Object)Either.Right((Object)savepointLocation));
                } else {
                    this.completedOperations.put(operationKey, (Object)Either.Left((Object)error));
                }
                this.registeredOperationTriggers.remove(operationKey);
            });
        }

        @Nullable
        public Either<Throwable, R> get(K operationKey) throws UnknownOperationKey {
            Either operationResultOrError = null;
            if (!this.registeredOperationTriggers.contains(operationKey) && (operationResultOrError = (Either)this.completedOperations.getIfPresent(operationKey)) == null) {
                throw new UnknownOperationKey(operationKey);
            }
            return operationResultOrError;
        }
    }

    protected abstract class StatusHandler<T extends RestfulGateway, V, M extends MessageParameters>
    extends AbstractRestHandler<T, EmptyRequestBody, AsynchronousOperationResult<V>, M> {
        protected StatusHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends T> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, AsynchronousOperationResult<V>, M> messageHeaders) {
            super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
        }

        @Override
        public CompletableFuture<AsynchronousOperationResult<V>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, M> request, @Nonnull T gateway) throws RestHandlerException {
            Either operationResultOrError;
            Object key = this.getOperationKey(request);
            try {
                operationResultOrError = AbstractAsynchronousOperationHandlers.this.completedOperationCache.get(key);
            }
            catch (UnknownOperationKey e) {
                return FutureUtils.completedExceptionally((Throwable)((Object)new NotFoundException("Operation not found under key: " + key, (Throwable)((Object)e))));
            }
            if (operationResultOrError != null) {
                if (operationResultOrError.isLeft()) {
                    return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(this.exceptionalOperationResultResponse((Throwable)operationResultOrError.left())));
                }
                return CompletableFuture.completedFuture(AsynchronousOperationResult.completed(this.operationResultResponse(operationResultOrError.right())));
            }
            return CompletableFuture.completedFuture(AsynchronousOperationResult.inProgress());
        }

        protected abstract K getOperationKey(HandlerRequest<EmptyRequestBody, M> var1);

        protected abstract V exceptionalOperationResultResponse(Throwable var1);

        protected abstract V operationResultResponse(R var1);
    }

    protected abstract class TriggerHandler<T extends RestfulGateway, B extends RequestBody, M extends MessageParameters>
    extends AbstractRestHandler<T, B, TriggerResponse, M> {
        protected TriggerHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<? extends T> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<B, TriggerResponse, M> messageHeaders) {
            super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
        }

        @Override
        public CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<B, M> request, @Nonnull T gateway) throws RestHandlerException {
            CompletableFuture resultFuture = this.triggerOperation(request, gateway);
            Object operationKey = this.createOperationKey(request);
            AbstractAsynchronousOperationHandlers.this.completedOperationCache.registerOngoingOperation(operationKey, resultFuture);
            return CompletableFuture.completedFuture(new TriggerResponse(((OperationKey)operationKey).getTriggerId()));
        }

        protected abstract CompletableFuture<R> triggerOperation(HandlerRequest<B, M> var1, T var2) throws RestHandlerException;

        protected abstract K createOperationKey(HandlerRequest<B, M> var1);
    }
}

