/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.minicluster;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniClusterJobDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
    private final Object lock = new Object();
    private final Configuration configuration;
    private final RpcService[] rpcServices;
    private final HighAvailabilityServices haServices;
    private final HeartbeatServices heartbeatServices;
    private final JobManagerServices jobManagerServices;
    private final MetricRegistry metricRegistry;
    private final int numJobManagers;
    private volatile JobManagerRunner[] runners;
    private volatile boolean shutdown;

    public MiniClusterJobDispatcher(Configuration config, RpcService rpcService, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
        this(config, haServices, heartbeatServices, metricRegistry, 1, new RpcService[]{rpcService});
    }

    public MiniClusterJobDispatcher(Configuration config, HighAvailabilityServices haServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int numJobManagers, RpcService[] rpcServices) throws Exception {
        Preconditions.checkArgument((numJobManagers >= 1 ? 1 : 0) != 0);
        Preconditions.checkArgument((rpcServices.length == numJobManagers ? 1 : 0) != 0);
        this.configuration = (Configuration)Preconditions.checkNotNull((Object)config);
        this.rpcServices = rpcServices;
        this.haServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)haServices);
        this.heartbeatServices = (HeartbeatServices)Preconditions.checkNotNull((Object)heartbeatServices);
        this.metricRegistry = (MetricRegistry)Preconditions.checkNotNull((Object)metricRegistry);
        this.numJobManagers = numJobManagers;
        LOG.info("Creating JobMaster services");
        this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.shutdown) {
                this.shutdown = true;
                LOG.info("Shutting down the job dispatcher");
                JobManagerRunner[] runners = this.runners;
                if (runners != null) {
                    this.runners = null;
                    for (JobManagerRunner runner : runners) {
                        runner.shutdown();
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runDetached(JobGraph job) throws JobExecutionException {
        Preconditions.checkNotNull((Object)job);
        LOG.info("Received job for detached execution: {} ({})", (Object)job.getName(), (Object)job.getJobID());
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"mini cluster is shut down");
            Preconditions.checkState((this.runners == null ? 1 : 0) != 0, (Object)"mini cluster can only execute one job at a time");
            DetachedFinalizer finalizer = new DetachedFinalizer(job.getJobID(), this.numJobManagers);
            this.runners = this.startJobRunners(job, finalizer, finalizer);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull((Object)job);
        LOG.info("Received job for blocking execution: {} ({})", (Object)job.getName(), (Object)job.getJobID());
        BlockingJobSync sync = new BlockingJobSync(job.getJobID(), this.numJobManagers);
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.shutdown ? 1 : 0) != 0, (Object)"mini cluster is shut down");
            Preconditions.checkState((this.runners == null ? 1 : 0) != 0, (Object)"mini cluster can only execute one job at a time");
            this.runners = this.startJobRunners(job, sync, sync);
        }
        try {
            object = sync.getResult();
            return object;
        }
        finally {
            this.runners = null;
            this.clearJobRunningState(job.getJobID());
        }
    }

    private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion, FatalErrorHandler errorHandler) throws JobExecutionException {
        LOG.info("Starting {} JobMaster(s) for job {} ({})", new Object[]{this.numJobManagers, job.getName(), job.getJobID()});
        JobManagerRunner[] runners = new JobManagerRunner[this.numJobManagers];
        for (int i = 0; i < this.numJobManagers; ++i) {
            try {
                runners[i] = new JobManagerRunner(ResourceID.generate(), job, this.configuration, this.rpcServices[i], this.haServices, this.heartbeatServices, this.jobManagerServices, this.metricRegistry, onCompletion, errorHandler);
                runners[i].start();
                continue;
            }
            catch (Throwable t) {
                for (int k = 0; k <= i; ++k) {
                    try {
                        if (runners[i] == null) continue;
                        runners[i].shutdown();
                        continue;
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                try {
                    this.haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
                }
                catch (Throwable tt) {
                    LOG.warn("Could not properly unregister job from high-availability services", tt);
                }
                throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
            }
        }
        return runners;
    }

    private void clearJobRunningState(JobID jobID) {
        try {
            this.haServices.getRunningJobsRegistry().clearJob(jobID);
        }
        catch (Throwable t) {
            LOG.warn("Could not clear job {} at the status registry of the high-availability services", (Object)jobID, (Object)t);
        }
    }

    static /* synthetic */ JobManagerRunner[] access$102(MiniClusterJobDispatcher x0, JobManagerRunner[] x1) {
        x0.runners = x1;
        return x1;
    }

    private static class BlockingJobSync
    implements OnCompletionActions,
    FatalErrorHandler {
        private final JobID jobId;
        private final CountDownLatch jobMastersToWaitFor;
        private volatile Throwable jobException;
        private volatile Throwable runnerException;
        private volatile JobExecutionResult result;

        BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
            this.jobId = jobId;
            this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
        }

        @Override
        public void jobFinished(JobExecutionResult jobResult) {
            this.result = jobResult;
            this.jobMastersToWaitFor.countDown();
        }

        @Override
        public void jobFailed(Throwable cause) {
            this.jobException = cause;
            this.jobMastersToWaitFor.countDown();
        }

        @Override
        public void jobFinishedByOther() {
            this.jobMastersToWaitFor.countDown();
        }

        @Override
        public void onFatalError(Throwable exception) {
            if (this.runnerException == null) {
                this.runnerException = exception;
            }
        }

        public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
            this.jobMastersToWaitFor.await();
            Throwable jobFailureCause = this.jobException;
            Throwable runnerException = this.runnerException;
            JobExecutionResult result = this.result;
            if (jobFailureCause != null) {
                if (jobFailureCause instanceof JobExecutionException) {
                    throw (JobExecutionException)jobFailureCause;
                }
                throw new JobExecutionException(this.jobId, "The job execution failed", jobFailureCause);
            }
            if (result != null) {
                return result;
            }
            if (runnerException != null) {
                throw new JobExecutionException(this.jobId, "The job execution failed because all JobManagers encountered fatal errors", runnerException);
            }
            throw new IllegalStateException("Bug: Job finished with neither error nor result.");
        }
    }

    private class DetachedFinalizer
    implements OnCompletionActions,
    FatalErrorHandler {
        private final JobID jobID;
        private final AtomicInteger numJobManagersToWaitFor;

        private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) {
            this.jobID = jobID;
            this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
        }

        @Override
        public void jobFinished(JobExecutionResult result) {
            this.decrementCheckAndCleanup();
        }

        @Override
        public void jobFailed(Throwable cause) {
            this.decrementCheckAndCleanup();
        }

        @Override
        public void jobFinishedByOther() {
            this.decrementCheckAndCleanup();
        }

        @Override
        public void onFatalError(Throwable exception) {
            this.decrementCheckAndCleanup();
        }

        private void decrementCheckAndCleanup() {
            if (this.numJobManagersToWaitFor.decrementAndGet() == 0) {
                MiniClusterJobDispatcher.access$102(MiniClusterJobDispatcher.this, null);
                MiniClusterJobDispatcher.this.clearJobRunningState(this.jobID);
            }
        }
    }
}

