/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Scheduler
implements InstanceListener,
SlotAvailabilityListener,
SlotProvider {
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final Object globalLock = new Object();
    private final Set<Instance> allInstances = new HashSet<Instance>();
    private final HashMap<String, Set<Instance>> allInstancesByHost = new HashMap();
    private final Map<ResourceID, Instance> instancesWithAvailableResources = new LinkedHashMap<ResourceID, Instance>();
    private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
    private final BlockingQueue<Instance> newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
    private int unconstrainedAssignments;
    private int localizedAssignments;
    private int nonLocalizedAssignments;
    private final Executor executor;

    public Scheduler(Executor executor) {
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance i : this.allInstances) {
                i.removeSlotListener();
                i.cancelAndReleaseAllSlots();
            }
            this.allInstances.clear();
            this.allInstancesByHost.clear();
            this.instancesWithAvailableResources.clear();
            this.taskQueue.clear();
        }
    }

    @Override
    public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
        try {
            Object ret = this.scheduleTask(task, allowQueued);
            if (ret instanceof SimpleSlot) {
                return FlinkCompletableFuture.completed((SimpleSlot)ret);
            }
            if (ret instanceof Future) {
                Future typed = (Future)ret;
                return typed;
            }
            throw new RuntimeException();
        }
        catch (NoResourceAvailableException e) {
            return FlinkCompletableFuture.completedExceptionally(e);
        }
    }

    private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
        if (task == null) {
            throw new NullPointerException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Scheduling task " + task);
        }
        ExecutionVertex vertex = task.getTaskToExecute().getVertex();
        Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
        boolean forceExternalLocation = false;
        Object object = this.globalLock;
        synchronized (object) {
            SlotSharingGroup sharingUnit = task.getSlotSharingGroup();
            if (sharingUnit != null) {
                if (queueIfNoResource) {
                    throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
                }
                SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
                CoLocationConstraint constraint = task.getLocationConstraint();
                if (constraint != null && forceExternalLocation) {
                    throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a co-location constraint and an external location constraint.");
                }
                SimpleSlot slotFromGroup = constraint == null ? assignment.getSlotForTask(vertex) : assignment.getSlotForTask(vertex, constraint);
                SimpleSlot newSlot = null;
                SimpleSlot toUse = null;
                try {
                    boolean localOnly;
                    Iterable<TaskManagerLocation> locations;
                    if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) {
                        if (constraint != null && !constraint.isAssigned()) {
                            constraint.lockLocation();
                        }
                        this.updateLocalityCounters(slotFromGroup, vertex);
                        return slotFromGroup;
                    }
                    if (constraint != null && constraint.isAssigned()) {
                        locations = Collections.singleton(constraint.getLocation());
                        localOnly = true;
                    } else {
                        locations = vertex.getPreferredLocationsBasedOnInputs();
                        localOnly = forceExternalLocation;
                    }
                    newSlot = this.getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly);
                    if (newSlot == null) {
                        if (slotFromGroup == null) {
                            if (constraint != null && constraint.isAssigned()) {
                                throw new NoResourceAvailableException("Could not allocate a slot on instance " + constraint.getLocation() + ", as required by the co-location constraint.");
                            }
                            if (forceExternalLocation) {
                                String hosts = Scheduler.getHostnamesFromInstances(preferredLocations);
                                throw new NoResourceAvailableException("Could not schedule task " + vertex + " to any of the required hosts: " + hosts);
                            }
                            throw new NoResourceAvailableException(task, this.getNumberOfAvailableInstances(), this.getTotalNumberOfSlots(), this.getNumberOfAvailableSlots());
                        }
                        toUse = slotFromGroup;
                    } else if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
                        if (slotFromGroup != null) {
                            slotFromGroup.releaseSlot();
                        }
                        toUse = newSlot;
                    } else {
                        newSlot.releaseSlot();
                        toUse = slotFromGroup;
                    }
                    if (constraint != null && !constraint.isAssigned()) {
                        constraint.lockLocation();
                    }
                    this.updateLocalityCounters(toUse, vertex);
                }
                catch (NoResourceAvailableException e) {
                    throw e;
                }
                catch (Throwable t) {
                    if (slotFromGroup != null) {
                        slotFromGroup.releaseSlot();
                    }
                    if (newSlot != null) {
                        newSlot.releaseSlot();
                    }
                    ExceptionUtils.rethrow((Throwable)t, (String)"An error occurred while allocating a slot in a sharing group");
                }
                return toUse;
            }
            SimpleSlot slot = this.getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
            if (slot != null) {
                this.updateLocalityCounters(slot, vertex);
                return slot;
            }
            if (queueIfNoResource) {
                FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<SimpleSlot>();
                this.taskQueue.add(new QueuedTask(task, future));
                return future;
            }
            if (forceExternalLocation) {
                String hosts = Scheduler.getHostnamesFromInstances(preferredLocations);
                throw new NoResourceAvailableException("Could not schedule task " + vertex + " to any of the required hosts: " + hosts);
            }
            throw new NoResourceAvailableException(this.getNumberOfAvailableInstances(), this.getTotalNumberOfSlots(), this.getNumberOfAvailableSlots());
        }
    }

    protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
        Pair<Instance, Locality> instanceLocalityPair;
        while ((instanceLocalityPair = this.findInstance(requestedLocations, localOnly)) != null) {
            Instance instanceToUse = (Instance)instanceLocalityPair.getLeft();
            Locality locality = (Locality)((Object)instanceLocalityPair.getRight());
            try {
                SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId());
                if (instanceToUse.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
                }
                if (slot == null) continue;
                slot.setLocality(locality);
                return slot;
            }
            catch (InstanceDiedException e) {
                this.removeInstance(instanceToUse);
                continue;
            }
            break;
        }
        return null;
    }

    protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex, Iterable<TaskManagerLocation> requestedLocations, SlotSharingGroupAssignment groupAssignment, CoLocationConstraint constraint, boolean localOnly) {
        Pair<Instance, Locality> instanceLocalityPair;
        while ((instanceLocalityPair = this.findInstance(requestedLocations, localOnly)) != null) {
            Instance instanceToUse = (Instance)instanceLocalityPair.getLeft();
            Locality locality = (Locality)((Object)instanceLocalityPair.getRight());
            try {
                SimpleSlot slot;
                JobVertexID groupID = vertex.getJobvertexId();
                SharedSlot sharedSlot = instanceToUse.allocateSharedSlot(vertex.getJobId(), groupAssignment);
                if (instanceToUse.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
                }
                if (sharedSlot == null) continue;
                SimpleSlot simpleSlot = slot = constraint == null ? groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupID) : groupAssignment.addSharedSlotAndAllocateSubSlot(sharedSlot, locality, constraint);
                if (slot != null) {
                    return slot;
                }
                sharedSlot.releaseSlot();
                continue;
            }
            catch (InstanceDiedException e) {
                this.removeInstance(instanceToUse);
                continue;
            }
            break;
        }
        return null;
    }

    private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
        Iterator<TaskManagerLocation> locations;
        while (this.newlyAvailableInstances.size() > 0) {
            Instance queuedInstance = (Instance)this.newlyAvailableInstances.poll();
            if (queuedInstance == null) continue;
            this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
        }
        if (this.instancesWithAvailableResources.isEmpty()) {
            return null;
        }
        Iterator<TaskManagerLocation> iterator = locations = requestedLocations == null ? null : requestedLocations.iterator();
        if (locations != null && locations.hasNext()) {
            while (locations.hasNext()) {
                Instance instance;
                TaskManagerLocation location = locations.next();
                if (location == null || (instance = this.instancesWithAvailableResources.remove(location.getResourceID())) == null) continue;
                return new ImmutablePair((Object)instance, (Object)Locality.LOCAL);
            }
            if (localOnly) {
                return null;
            }
            Iterator<Instance> instances = this.instancesWithAvailableResources.values().iterator();
            Instance instanceToUse = instances.next();
            instances.remove();
            return new ImmutablePair((Object)instanceToUse, (Object)Locality.NON_LOCAL);
        }
        Iterator<Instance> instances = this.instancesWithAvailableResources.values().iterator();
        Instance instanceToUse = instances.next();
        instances.remove();
        return new ImmutablePair((Object)instanceToUse, (Object)Locality.UNCONSTRAINED);
    }

    @Override
    public void newSlotAvailable(Instance instance) {
        this.newlyAvailableInstances.add(instance);
        this.executor.execute(new Runnable(){

            @Override
            public void run() {
                Scheduler.this.handleNewSlot();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleNewSlot() {
        Object object = this.globalLock;
        synchronized (object) {
            block11: {
                Instance instance = (Instance)this.newlyAvailableInstances.poll();
                if (instance == null || !instance.hasResourcesAvailable()) {
                    return;
                }
                QueuedTask queued = this.taskQueue.peek();
                if (queued != null) {
                    ScheduledUnit task = queued.getTask();
                    ExecutionVertex vertex = task.getTaskToExecute().getVertex();
                    try {
                        SimpleSlot newSlot = instance.allocateSimpleSlot(vertex.getJobId());
                        if (newSlot == null) break block11;
                        this.taskQueue.poll();
                        if (queued.getFuture() == null) break block11;
                        try {
                            queued.getFuture().complete(newSlot);
                        }
                        catch (Throwable t) {
                            LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t);
                            task.getTaskToExecute().fail(t);
                        }
                    }
                    catch (InstanceDiedException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Instance " + instance + " was marked dead asynchronously.");
                        }
                        this.removeInstance(instance);
                    }
                } else {
                    this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                }
            }
        }
    }

    private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex vertex) {
        Locality locality = slot.getLocality();
        switch (locality) {
            case UNCONSTRAINED: {
                ++this.unconstrainedAssignments;
                break;
            }
            case LOCAL: {
                ++this.localizedAssignments;
                break;
            }
            case NON_LOCAL: {
                ++this.nonLocalizedAssignments;
                break;
            }
            default: {
                throw new RuntimeException(locality.name());
            }
        }
        if (LOG.isDebugEnabled()) {
            switch (locality) {
                case UNCONSTRAINED: {
                    LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                    break;
                }
                case LOCAL: {
                    LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                    break;
                }
                case NON_LOCAL: {
                    LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void newInstanceAvailable(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        if (instance.getNumberOfAvailableSlots() <= 0) {
            throw new IllegalArgumentException("The given instance has no resources.");
        }
        if (!instance.isAlive()) {
            throw new IllegalArgumentException("The instance is not alive.");
        }
        Object object = this.globalLock;
        synchronized (object) {
            if (!this.allInstances.add(instance)) {
                throw new IllegalArgumentException("The instance is already contained.");
            }
            try {
                instance.setSlotAvailabilityListener(this);
                String instanceHostName = instance.getTaskManagerLocation().getHostname();
                Set<Instance> instanceSet = this.allInstancesByHost.get(instanceHostName);
                if (instanceSet == null) {
                    instanceSet = new HashSet<Instance>();
                    this.allInstancesByHost.put(instanceHostName, instanceSet);
                }
                instanceSet.add(instance);
                this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                for (int i = 0; i < instance.getNumberOfAvailableSlots(); ++i) {
                    this.newSlotAvailable(instance);
                }
            }
            catch (Throwable t) {
                LOG.error("Scheduler could not add new instance " + instance, t);
                this.removeInstance(instance);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void instanceDied(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        instance.markDead();
        Object object = this.globalLock;
        synchronized (object) {
            this.removeInstance(instance);
        }
    }

    private void removeInstance(Instance instance) {
        if (instance == null) {
            throw new NullPointerException();
        }
        this.allInstances.remove(instance);
        this.instancesWithAvailableResources.remove(instance.getTaskManagerID());
        String instanceHostName = instance.getTaskManagerLocation().getHostname();
        Set<Instance> instanceSet = this.allInstancesByHost.get(instanceHostName);
        if (instanceSet != null) {
            instanceSet.remove(instance);
            if (instanceSet.isEmpty()) {
                this.allInstancesByHost.remove(instanceHostName);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfAvailableSlots() {
        int count = 0;
        Object object = this.globalLock;
        synchronized (object) {
            this.processNewlyAvailableInstances();
            for (Instance instance : this.instancesWithAvailableResources.values()) {
                count += instance.getNumberOfAvailableSlots();
            }
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getTotalNumberOfSlots() {
        int count = 0;
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance instance : this.allInstances) {
                if (!instance.isAlive()) continue;
                count += instance.getTotalNumberOfSlots();
            }
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfAvailableInstances() {
        int numberAvailableInstances = 0;
        Object object = this.globalLock;
        synchronized (object) {
            for (Instance instance : this.allInstances) {
                if (!instance.isAlive()) continue;
                ++numberAvailableInstances;
            }
        }
        return numberAvailableInstances;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfInstancesWithAvailableSlots() {
        Object object = this.globalLock;
        synchronized (object) {
            this.processNewlyAvailableInstances();
            return this.instancesWithAvailableResources.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<String, List<Instance>> getInstancesByHost() {
        Object object = this.globalLock;
        synchronized (object) {
            HashMap<String, List<Instance>> copy = new HashMap<String, List<Instance>>();
            for (Map.Entry<String, Set<Instance>> entry : this.allInstancesByHost.entrySet()) {
                copy.put(entry.getKey(), new ArrayList(entry.getValue()));
            }
            return copy;
        }
    }

    public int getNumberOfUnconstrainedAssignments() {
        return this.unconstrainedAssignments;
    }

    public int getNumberOfLocalizedAssignments() {
        return this.localizedAssignments;
    }

    public int getNumberOfNonLocalizedAssignments() {
        return this.nonLocalizedAssignments;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processNewlyAvailableInstances() {
        Object object = this.globalLock;
        synchronized (object) {
            Instance instance;
            while ((instance = (Instance)this.newlyAvailableInstances.poll()) != null) {
                if (!instance.hasResourcesAvailable()) continue;
                this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
            }
        }
    }

    private static String getHostnamesFromInstances(Iterable<TaskManagerLocation> locations) {
        StringBuilder bld = new StringBuilder();
        boolean successive = false;
        for (TaskManagerLocation loc : locations) {
            if (successive) {
                bld.append(", ");
            } else {
                successive = true;
            }
            bld.append(loc.getHostname());
        }
        return bld.toString();
    }

    private static final class QueuedTask {
        private final ScheduledUnit task;
        private final CompletableFuture<SimpleSlot> future;

        public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
            this.task = task;
            this.future = future;
        }

        public ScheduledUnit getTask() {
            return this.task;
        }

        public CompletableFuture<SimpleSlot> getFuture() {
            return this.future;
        }
    }
}

