/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.entrypoint;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration;
import org.apache.flink.runtime.entrypoint.EntrypointClusterConfigurationParserFactory;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public abstract class ClusterEntrypoint
implements FatalErrorHandler {
    public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions.key((String)"internal.cluster.execution-mode").defaultValue((Object)ExecutionMode.NORMAL.toString());
    protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    protected static final int SUCCESS_RETURN_CODE = 0;
    protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    private final Object lock = new Object();
    private final Configuration configuration;
    private final CompletableFuture<Void> terminationFuture;
    private final AtomicBoolean isTerminating = new AtomicBoolean(false);
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    @GuardedBy(value="lock")
    private MetricRegistryImpl metricRegistry;
    @GuardedBy(value="lock")
    private HighAvailabilityServices haServices;
    @GuardedBy(value="lock")
    private BlobServer blobServer;
    @GuardedBy(value="lock")
    private HeartbeatServices heartbeatServices;
    @GuardedBy(value="lock")
    private RpcService commonRpcService;
    @GuardedBy(value="lock")
    private ResourceManager<?> resourceManager;
    @GuardedBy(value="lock")
    private Dispatcher dispatcher;
    @GuardedBy(value="lock")
    private LeaderRetrievalService dispatcherLeaderRetrievalService;
    @GuardedBy(value="lock")
    private LeaderRetrievalService resourceManagerRetrievalService;
    @GuardedBy(value="lock")
    private WebMonitorEndpoint<?> webMonitorEndpoint;
    @GuardedBy(value="lock")
    private ArchivedExecutionGraphStore archivedExecutionGraphStore;
    @GuardedBy(value="lock")
    private TransientBlobCache transientBlobCache;
    @GuardedBy(value="lock")
    private ClusterInformation clusterInformation;
    @GuardedBy(value="lock")
    private JobManagerMetricGroup jobManagerMetricGroup;
    private final Thread shutDownHook;

    protected ClusterEntrypoint(Configuration configuration) {
        this.configuration = this.generateClusterConfiguration(configuration);
        this.terminationFuture = new CompletableFuture();
        this.shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, (String)this.getClass().getSimpleName(), (Logger)LOG);
    }

    public CompletableFuture<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    protected void startCluster() {
        LOG.info("Starting {}.", (Object)this.getClass().getSimpleName());
        try {
            this.configureFileSystems(this.configuration);
            SecurityContext securityContext = this.installSecurityContext(this.configuration);
            securityContext.runSecured(() -> {
                this.runCluster(this.configuration);
                return null;
            });
        }
        catch (Throwable t) {
            LOG.error("Cluster initialization failed.", t);
            this.shutDownAndTerminate(1, ApplicationStatus.FAILED, t.getMessage(), false);
        }
    }

    protected void configureFileSystems(Configuration configuration) throws Exception {
        LOG.info("Install default filesystem.");
        try {
            FileSystem.initialize((Configuration)configuration);
        }
        catch (IOException e) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", e);
        }
    }

    protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
        LOG.info("Install security context.");
        SecurityUtils.install(new SecurityConfiguration(configuration));
        return SecurityUtils.getInstalledContext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void runCluster(Configuration configuration) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.initializeServices(configuration);
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            this.startClusterComponents(configuration, this.commonRpcService, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry);
            this.dispatcher.getTerminationFuture().whenComplete((value, throwable) -> {
                if (throwable != null) {
                    LOG.info("Could not properly terminate the Dispatcher.", throwable);
                }
                this.shutDownAndTerminate(0, ApplicationStatus.SUCCEEDED, throwable != null ? throwable.getMessage() : null, true);
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initializeServices(Configuration configuration) throws Exception {
        LOG.info("Initializing cluster services.");
        Object object = this.lock;
        synchronized (object) {
            String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
            String portRange = this.getRPCPortRange(configuration);
            this.commonRpcService = this.createRpcService(configuration, bindAddress, portRange);
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            this.haServices = this.createHaServices(configuration, this.commonRpcService.getExecutor());
            this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
            this.blobServer.start();
            this.heartbeatServices = this.createHeartbeatServices(configuration);
            this.metricRegistry = this.createMetricRegistry(configuration);
            ActorSystem actorSystem = ((AkkaRpcService)this.commonRpcService).getActorSystem();
            this.metricRegistry.startQueryService(actorSystem, null);
            this.archivedExecutionGraphStore = this.createSerializableExecutionGraphStore(configuration, this.commonRpcService.getScheduledExecutor());
            this.clusterInformation = new ClusterInformation(this.commonRpcService.getAddress(), this.blobServer.getPort());
            this.transientBlobCache = new TransientBlobCache(configuration, new InetSocketAddress(this.clusterInformation.getBlobServerHostname(), this.clusterInformation.getBlobServerPort()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            this.dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
            this.resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
            RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<DispatcherId, DispatcherGateway>(rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds((long)50L));
            RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway>(rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds((long)50L));
            ActorSystem actorSystem = ((AkkaRpcService)rpcService).getActorSystem();
            Time timeout = Time.milliseconds((long)configuration.getLong(WebOptions.TIMEOUT));
            this.webMonitorEndpoint = this.createRestEndpoint(configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, this.transientBlobCache, rpcService.getExecutor(), new AkkaQueryServiceRetriever(actorSystem, timeout), highAvailabilityServices.getWebMonitorLeaderElectionService());
            LOG.debug("Starting Dispatcher REST endpoint.");
            this.webMonitorEndpoint.start();
            this.resourceManager = this.createResourceManager(configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, metricRegistry, this, this.clusterInformation, this.webMonitorEndpoint.getRestBaseUrl());
            this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
            HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, this.webMonitorEndpoint);
            this.dispatcher = this.createDispatcher(configuration, rpcService, highAvailabilityServices, this.resourceManager.getSelfGateway(ResourceManagerGateway.class), blobServer, heartbeatServices, this.jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), this.archivedExecutionGraphStore, this, this.webMonitorEndpoint.getRestBaseUrl(), historyServerArchivist);
            LOG.debug("Starting ResourceManager.");
            this.resourceManager.start();
            this.resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
            LOG.debug("Starting Dispatcher.");
            this.dispatcher.start();
            this.dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
        }
    }

    protected String getRPCPortRange(Configuration configuration) {
        if (ZooKeeperUtils.isZooKeeperRecoveryMode(configuration)) {
            return configuration.getString(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE);
        }
        return String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    }

    protected RpcService createRpcService(Configuration configuration, String bindAddress, String portRange) throws Exception {
        ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
        FiniteDuration duration = AkkaUtils.getTimeout(configuration);
        return new AkkaRpcService(actorSystem, Time.of((long)duration.length(), (TimeUnit)duration.unit()));
    }

    protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    }

    protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
        return HeartbeatServices.fromConfiguration(configuration);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
        Object object = this.lock;
        synchronized (object) {
            Throwable exception = null;
            ArrayList terminationFutures = new ArrayList(3);
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, exception);
                }
            }
            if (this.haServices != null) {
                try {
                    if (cleanupHaData) {
                        this.haServices.closeAndCleanupAllData();
                    } else {
                        this.haServices.close();
                    }
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            if (this.archivedExecutionGraphStore != null) {
                try {
                    this.archivedExecutionGraphStore.close();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            if (this.transientBlobCache != null) {
                try {
                    this.transientBlobCache.close();
                }
                catch (Throwable t) {
                    exception = ExceptionUtils.firstOrSuppressed((Throwable)t, (Throwable)exception);
                }
            }
            if (this.metricRegistry != null) {
                terminationFutures.add(this.metricRegistry.shutdown());
            }
            if (this.commonRpcService != null) {
                terminationFutures.add(this.commonRpcService.stopService());
            }
            if (exception != null) {
                terminationFutures.add(FutureUtils.completedExceptionally(exception));
            }
            return FutureUtils.completeAll(terminationFutures);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> stopClusterComponents() {
        Object object = this.lock;
        synchronized (object) {
            Exception exception = null;
            ArrayList terminationFutures = new ArrayList(4);
            if (this.dispatcherLeaderRetrievalService != null) {
                try {
                    this.dispatcherLeaderRetrievalService.stop();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, exception);
                }
            }
            if (this.resourceManagerRetrievalService != null) {
                try {
                    this.resourceManagerRetrievalService.stop();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
            }
            if (this.webMonitorEndpoint != null) {
                terminationFutures.add(this.webMonitorEndpoint.closeAsync());
            }
            if (this.dispatcher != null) {
                this.dispatcher.shutDown();
                terminationFutures.add(this.dispatcher.getTerminationFuture());
            }
            if (this.resourceManager != null) {
                this.resourceManager.shutDown();
                terminationFutures.add(this.resourceManager.getTerminationFuture());
            }
            if (exception != null) {
                terminationFutures.add(FutureUtils.completedExceptionally(exception));
            }
            FutureUtils.ConjunctFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
            if (this.jobManagerMetricGroup != null) {
                return FutureUtils.runAfterwards(componentTerminationFuture, () -> {
                    Object object = this.lock;
                    synchronized (object) {
                        this.jobManagerMetricGroup.close();
                    }
                });
            }
            return componentTerminationFuture;
        }
    }

    @Override
    public void onFatalError(Throwable exception) {
        LOG.error("Fatal error occurred in the cluster entrypoint.", exception);
        System.exit(2);
    }

    private Configuration generateClusterConfiguration(Configuration configuration) {
        Configuration resultConfiguration = new Configuration((Configuration)Preconditions.checkNotNull((Object)configuration));
        String webTmpDir = configuration.getString(WebOptions.TMP_DIR);
        File uniqueWebTmpDir = new File(webTmpDir, "flink-web-" + UUID.randomUUID());
        resultConfiguration.setString(WebOptions.TMP_DIR, uniqueWebTmpDir.getAbsolutePath());
        return resultConfiguration;
    }

    private CompletableFuture<Void> shutDownAsync(boolean cleanupHaData, ApplicationStatus applicationStatus, @Nullable String diagnostics) {
        if (this.isShutDown.compareAndSet(false, true)) {
            LOG.info("Stopping {}.", (Object)this.getClass().getSimpleName());
            CompletableFuture<Void> shutDownApplicationFuture = this.deregisterApplication(applicationStatus, diagnostics);
            CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(shutDownApplicationFuture, this::stopClusterComponents);
            CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(componentShutdownFuture, () -> this.stopClusterServices(cleanupHaData));
            CompletableFuture<Void> cleanupDirectoriesFuture = FutureUtils.runAfterwards(serviceShutdownFuture, this::cleanupDirectories);
            cleanupDirectoriesFuture.whenComplete((ignored2, serviceThrowable) -> {
                if (serviceThrowable != null) {
                    this.terminationFuture.completeExceptionally((Throwable)serviceThrowable);
                } else {
                    this.terminationFuture.complete(null);
                }
            });
        }
        return this.terminationFuture;
    }

    protected void shutDownAndTerminate(int returnCode, ApplicationStatus applicationStatus, @Nullable String diagnostics, boolean cleanupHaData) {
        if (this.isTerminating.compareAndSet(false, true)) {
            LOG.info("Shut down and terminate {} with return code {} and application status {}.", new Object[]{this.getClass().getSimpleName(), returnCode, applicationStatus});
            this.shutDownAsync(cleanupHaData, applicationStatus, diagnostics).whenComplete((ignored, t) -> {
                if (t != null) {
                    LOG.info("Could not properly shut down cluster entrypoint.", t);
                }
                System.exit(returnCode);
            });
        } else {
            LOG.debug("Concurrent termination call detected. Ignoring termination call with return code {} and application status {}.", (Object)returnCode, (Object)applicationStatus);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
        Object object = this.lock;
        synchronized (object) {
            if (this.resourceManager != null) {
                ResourceManagerGateway selfGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
                return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private void cleanupDirectories() throws IOException {
        ShutdownHookUtil.removeShutdownHook((Thread)this.shutDownHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        String webTmpDir = this.configuration.getString(WebOptions.TMP_DIR);
        FileUtils.deleteDirectory((File)new File(webTmpDir));
    }

    protected abstract Dispatcher createDispatcher(Configuration var1, RpcService var2, HighAvailabilityServices var3, ResourceManagerGateway var4, BlobServer var5, HeartbeatServices var6, JobManagerMetricGroup var7, @Nullable String var8, ArchivedExecutionGraphStore var9, FatalErrorHandler var10, @Nullable String var11, HistoryServerArchivist var12) throws Exception;

    protected abstract ResourceManager<?> createResourceManager(Configuration var1, ResourceID var2, RpcService var3, HighAvailabilityServices var4, HeartbeatServices var5, MetricRegistry var6, FatalErrorHandler var7, ClusterInformation var8, @Nullable String var9) throws Exception;

    protected abstract WebMonitorEndpoint<?> createRestEndpoint(Configuration var1, LeaderGatewayRetriever<DispatcherGateway> var2, LeaderGatewayRetriever<ResourceManagerGateway> var3, TransientBlobService var4, Executor var5, MetricQueryServiceRetriever var6, LeaderElectionService var7) throws Exception;

    protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(Configuration var1, ScheduledExecutor var2) throws IOException;

    protected static EntrypointClusterConfiguration parseArguments(String[] args) throws FlinkParseException {
        CommandLineParser<EntrypointClusterConfiguration> clusterConfigurationParser = new CommandLineParser<EntrypointClusterConfiguration>(new EntrypointClusterConfigurationParserFactory());
        return clusterConfigurationParser.parse(args);
    }

    protected static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) {
        Configuration dynamicProperties = ConfigurationUtils.createConfiguration((Properties)entrypointClusterConfiguration.getDynamicProperties());
        Configuration configuration = GlobalConfiguration.loadConfiguration((String)entrypointClusterConfiguration.getConfigDir(), (Configuration)dynamicProperties);
        int restPort = entrypointClusterConfiguration.getRestPort();
        if (restPort >= 0) {
            configuration.setInteger(RestOptions.PORT, restPort);
        }
        return configuration;
    }

    public static enum ExecutionMode {
        NORMAL,
        DETACHED;

    }
}

