/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.Recover;
import java.net.ConnectException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationLookupService;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.PartialFunction;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Try;

public class QueryableStateClient {
    private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
    private final KvStateLocationLookupService lookupService;
    private final KvStateClient kvStateClient;
    private final ExecutionContext executionContext;
    private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache = new ConcurrentHashMap<Tuple2<JobID, String>, Future<KvStateLocation>>();
    private final ActorSystem actorSystem;

    public QueryableStateClient(Configuration config, HighAvailabilityServices highAvailabilityServices) throws Exception {
        Preconditions.checkNotNull((Object)config, (String)"Configuration");
        LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
        String askTimeoutString = config.getString("akka.ask.timeout", ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
        Duration timeout = FiniteDuration.apply((String)askTimeoutString);
        if (!timeout.isFinite()) {
            throw new IllegalConfigurationException("akka.ask.timeout is not a finite timeout ('" + askTimeoutString + "')");
        }
        FiniteDuration askTimeout = (FiniteDuration)timeout;
        int lookupRetries = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES);
        int lookupRetryDelayMillis = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY);
        AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory retryStrategy = new AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory(lookupRetries, FiniteDuration.apply((long)lookupRetryDelayMillis, (String)"ms"));
        Some remoting = new Some((Object)new Tuple2((Object)"", (Object)0));
        this.actorSystem = AkkaUtils.createActorSystem(config, (Option<Tuple2<String, Object>>)remoting);
        AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(leaderRetrievalService, this.actorSystem, askTimeout, retryStrategy);
        int numEventLoopThreads = config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
        if (numEventLoopThreads == 0) {
            numEventLoopThreads = Runtime.getRuntime().availableProcessors();
        }
        KvStateClient networkClient = new KvStateClient(numEventLoopThreads, new DisabledKvStateRequestStats());
        this.lookupService = lookupService;
        this.kvStateClient = networkClient;
        this.executionContext = this.actorSystem.dispatcher();
        this.lookupService.start();
    }

    public QueryableStateClient(KvStateLocationLookupService lookupService, KvStateClient kvStateClient, ExecutionContext executionContext) {
        this.lookupService = (KvStateLocationLookupService)Preconditions.checkNotNull((Object)lookupService, (String)"KvStateLocationLookupService");
        this.kvStateClient = (KvStateClient)Preconditions.checkNotNull((Object)kvStateClient, (String)"KvStateClient");
        this.executionContext = (ExecutionContext)Preconditions.checkNotNull((Object)executionContext, (String)"ExecutionContext");
        this.actorSystem = null;
        this.lookupService.start();
    }

    public ExecutionContext getExecutionContext() {
        return this.executionContext;
    }

    public void shutDown() {
        try {
            this.lookupService.shutDown();
        }
        catch (Throwable t) {
            LOG.error("Failed to shut down KvStateLookupService", t);
        }
        try {
            this.kvStateClient.shutDown();
        }
        catch (Throwable t) {
            LOG.error("Failed to shut down KvStateClient", t);
        }
        if (this.actorSystem != null) {
            try {
                this.actorSystem.shutdown();
            }
            catch (Throwable t) {
                LOG.error("Failed to shut down ActorSystem", t);
            }
        }
    }

    public Future<byte[]> getKvState(final JobID jobId, final String queryableStateName, final int keyHashCode, final byte[] serializedKeyAndNamespace) {
        return this.getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false).recoverWith((PartialFunction)new Recover<Future<byte[]>>(){

            public Future<byte[]> recover(Throwable failure) throws Throwable {
                if (failure instanceof UnknownKvStateID || failure instanceof UnknownKvStateKeyGroupLocation || failure instanceof UnknownKvStateLocation || failure instanceof ConnectException) {
                    return QueryableStateClient.this.getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, true);
                }
                return Futures.failed((Throwable)failure);
            }
        }, this.executionContext);
    }

    private Future<byte[]> getKvState(JobID jobId, String queryableStateName, final int keyHashCode, final byte[] serializedKeyAndNamespace, boolean forceLookup) {
        return this.getKvStateLookupInfo(jobId, queryableStateName, forceLookup).flatMap((Function1)new Mapper<KvStateLocation, Future<byte[]>>(){

            public Future<byte[]> apply(KvStateLocation lookup) {
                int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups());
                KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
                if (serverAddress == null) {
                    return Futures.failed((Throwable)new UnknownKvStateKeyGroupLocation());
                }
                KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex);
                return QueryableStateClient.this.kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
            }
        }, this.executionContext);
    }

    private Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String queryableStateName, boolean forceUpdate) {
        if (forceUpdate) {
            Future<KvStateLocation> lookupFuture = this.lookupService.getKvStateLookupInfo(jobId, queryableStateName);
            this.lookupCache.put((Tuple2<JobID, String>)new Tuple2((Object)jobId, (Object)queryableStateName), lookupFuture);
            return lookupFuture;
        }
        Tuple2 cacheKey = new Tuple2((Object)jobId, (Object)queryableStateName);
        Future cachedFuture = (Future)this.lookupCache.get(cacheKey);
        if (cachedFuture == null) {
            Future<KvStateLocation> lookupFuture = this.lookupService.getKvStateLookupInfo(jobId, queryableStateName);
            Future<KvStateLocation> previous = this.lookupCache.putIfAbsent((Tuple2<JobID, String>)cacheKey, lookupFuture);
            if (previous == null) {
                return lookupFuture;
            }
            return previous;
        }
        if (cachedFuture.isCompleted() && ((Try)cachedFuture.value().get()).isFailure()) {
            Future<KvStateLocation> lookupFuture = this.lookupService.getKvStateLookupInfo(jobId, queryableStateName);
            if (this.lookupCache.replace((Tuple2<JobID, String>)cacheKey, (Future<KvStateLocation>)cachedFuture, lookupFuture)) {
                return lookupFuture;
            }
            return (Future)this.lookupCache.get(cacheKey);
        }
        return cachedFuture;
    }
}

