/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.GroupCoordinatorRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCoordinator
implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class);
    private final Heartbeat heartbeat;
    private final HeartbeatTask heartbeatTask;
    private final int sessionTimeoutMs;
    private final GroupCoordinatorMetrics sensors;
    protected final String groupId;
    protected final ConsumerNetworkClient client;
    protected final Time time;
    protected final long retryBackoffMs;
    private boolean needsJoinPrepare = true;
    private boolean rejoinNeeded = true;
    protected Node coordinator;
    protected String memberId;
    protected String protocol;
    protected int generation;

    public AbstractCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs, Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, Time time, long retryBackoffMs) {
        this.client = client;
        this.time = time;
        this.generation = -1;
        this.memberId = "";
        this.groupId = groupId;
        this.coordinator = null;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds());
        this.heartbeatTask = new HeartbeatTask();
        this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
        this.retryBackoffMs = retryBackoffMs;
    }

    protected abstract String protocolType();

    protected abstract LinkedHashMap<String, ByteBuffer> metadata();

    protected abstract void onJoinPrepare(int var1, String var2);

    protected abstract Map<String, ByteBuffer> performAssignment(String var1, String var2, Map<String, ByteBuffer> var3);

    protected abstract void onJoinComplete(int var1, String var2, String var3, ByteBuffer var4);

    public void ensureCoordinatorKnown() {
        while (this.coordinatorUnknown()) {
            RequestFuture<Void> future = this.sendGroupMetadataRequest();
            this.client.poll(future);
            if (!future.failed()) continue;
            if (future.isRetriable()) {
                this.client.awaitMetadataUpdate();
                continue;
            }
            throw future.exception();
        }
    }

    protected boolean needRejoin() {
        return this.rejoinNeeded;
    }

    public void ensureActiveGroup() {
        if (!this.needRejoin()) {
            return;
        }
        if (this.needsJoinPrepare) {
            this.onJoinPrepare(this.generation, this.memberId);
            this.needsJoinPrepare = false;
        }
        while (this.needRejoin()) {
            this.ensureCoordinatorKnown();
            if (this.client.pendingRequestCount(this.coordinator) > 0) {
                this.client.awaitPendingRequests(this.coordinator);
                continue;
            }
            RequestFuture<ByteBuffer> future = this.performGroupJoin();
            this.client.poll(future);
            if (future.succeeded()) {
                this.onJoinComplete(this.generation, this.memberId, this.protocol, future.value());
                this.needsJoinPrepare = true;
                this.heartbeatTask.reset();
                continue;
            }
            RuntimeException exception = future.exception();
            if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;
            if (!future.isRetriable()) {
                throw exception;
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private RequestFuture<ByteBuffer> performGroupJoin() {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        log.debug("(Re-)joining group {}", (Object)this.groupId);
        ArrayList<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<JoinGroupRequest.GroupProtocol>();
        for (Map.Entry<String, ByteBuffer> metadataEntry : this.metadata().entrySet()) {
            protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue()));
        }
        JoinGroupRequest request = new JoinGroupRequest(this.groupId, this.sessionTimeoutMs, this.memberId, this.protocolType(), protocols);
        log.debug("Issuing request ({}: {}) to coordinator {}", new Object[]{ApiKeys.JOIN_GROUP, request, this.coordinator.id()});
        return this.client.send(this.coordinator, ApiKeys.JOIN_GROUP, request).compose(new JoinGroupResponseHandler());
    }

    private RequestFuture<ByteBuffer> onJoinFollower() {
        SyncGroupRequest request = new SyncGroupRequest(this.groupId, this.generation, this.memberId, Collections.emptyMap());
        log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", new Object[]{ApiKeys.SYNC_GROUP, request, this.coordinator.id()});
        return this.sendSyncGroupRequest(request);
    }

    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {
            Map<String, ByteBuffer> groupAssignment = this.performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
            SyncGroupRequest request = new SyncGroupRequest(this.groupId, this.generation, this.memberId, groupAssignment);
            log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", new Object[]{ApiKeys.SYNC_GROUP, request, this.coordinator.id()});
            return this.sendSyncGroupRequest(request);
        }
        catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        return this.client.send(this.coordinator, ApiKeys.SYNC_GROUP, request).compose(new SyncGroupRequestHandler());
    }

    private RequestFuture<Void> sendGroupMetadataRequest() {
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            return RequestFuture.noBrokersAvailable();
        }
        log.debug("Issuing group metadata request to broker {}", (Object)node.id());
        GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
        return this.client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest).compose(new RequestFutureAdapter<ClientResponse, Void>(){

            @Override
            public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
                AbstractCoordinator.this.handleGroupMetadataResponse(response, future);
            }
        });
    }

    private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Group metadata response {}", (Object)resp);
        if (!this.coordinatorUnknown()) {
            future.complete(null);
        } else {
            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            short errorCode = groupCoordinatorResponse.errorCode();
            if (errorCode == Errors.NONE.code()) {
                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port());
                this.client.tryConnect(this.coordinator);
                if (this.generation > 0) {
                    this.heartbeatTask.reset();
                }
                future.complete(null);
            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
                future.raise(new GroupAuthorizationException(this.groupId));
            } else {
                future.raise(Errors.forCode(errorCode));
            }
        }
    }

    public boolean coordinatorUnknown() {
        if (this.coordinator == null) {
            return true;
        }
        if (this.client.connectionFailed(this.coordinator)) {
            this.coordinatorDead();
            return true;
        }
        return false;
    }

    protected void coordinatorDead() {
        if (this.coordinator != null) {
            log.info("Marking the coordinator {} dead.", (Object)this.coordinator.id());
            this.coordinator = null;
        }
    }

    @Override
    public void close() {
        this.client.disableWakeups();
        this.maybeLeaveGroup();
    }

    public void maybeLeaveGroup() {
        this.client.unschedule(this.heartbeatTask);
        if (!this.coordinatorUnknown() && this.generation > 0) {
            this.sendLeaveGroupRequest();
        }
        this.generation = -1;
        this.memberId = "";
        this.rejoinNeeded = true;
    }

    private void sendLeaveGroupRequest() {
        LeaveGroupRequest request = new LeaveGroupRequest(this.groupId, this.memberId);
        RequestFuture<Void> future = this.client.send(this.coordinator, ApiKeys.LEAVE_GROUP, request).compose(new LeaveGroupResponseHandler());
        future.addListener(new RequestFutureListener<Void>(){

            @Override
            public void onSuccess(Void value) {
            }

            @Override
            public void onFailure(RuntimeException e) {
                log.info("LeaveGroup request failed with error", (Throwable)e);
            }
        });
        this.client.poll(future, 0L);
    }

    public RequestFuture<Void> sendHeartbeatRequest() {
        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
        return this.client.send(this.coordinator, ApiKeys.HEARTBEAT, req).compose(new HeartbeatCompletionHandler());
    }

    private class GroupCoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor heartbeatLatency;
        public final Sensor joinLatency;
        public final Sensor syncLatency;

        public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a heartbeat request", tags), new Max());
            this.heartbeatLatency.add(new MetricName("heartbeat-rate", this.metricGrpName, "The average number of heartbeats per second", tags), new Rate(new Count()));
            this.joinLatency = metrics.sensor("join-latency");
            this.joinLatency.add(new MetricName("join-time-avg", this.metricGrpName, "The average time taken for a group rejoin", tags), new Avg());
            this.joinLatency.add(new MetricName("join-time-max", this.metricGrpName, "The max time taken for a group rejoin", tags), new Avg());
            this.joinLatency.add(new MetricName("join-rate", this.metricGrpName, "The number of group joins per second", tags), new Rate(new Count()));
            this.syncLatency = metrics.sensor("sync-latency");
            this.syncLatency.add(new MetricName("sync-time-avg", this.metricGrpName, "The average time taken for a group sync", tags), new Avg());
            this.syncLatency.add(new MetricName("sync-time-max", this.metricGrpName, "The max time taken for a group sync", tags), new Avg());
            this.syncLatency.add(new MetricName("sync-rate", this.metricGrpName, "The number of group syncs per second", tags), new Rate(new Count()));
            Measurable lastHeartbeat = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return TimeUnit.SECONDS.convert(now - AbstractCoordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            };
            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat", tags), lastHeartbeat);
        }
    }

    protected abstract class CoordinatorResponseHandler<R, T>
    extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;

        protected CoordinatorResponseHandler() {
        }

        public abstract R parse(ClientResponse var1);

        public abstract void handle(R var1, RequestFuture<T> var2);

        @Override
        public void onFailure(RuntimeException e, RequestFuture<T> future) {
            if (e instanceof DisconnectException) {
                AbstractCoordinator.this.coordinatorDead();
            }
            future.raise(e);
        }

        @Override
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) {
            block2: {
                try {
                    this.response = clientResponse;
                    R responseObj = this.parse(clientResponse);
                    this.handle(responseObj, future);
                }
                catch (RuntimeException e) {
                    if (future.isDone()) break block2;
                    future.raise(e);
                }
            }
        }
    }

    private class HeartbeatCompletionHandler
    extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        private HeartbeatCompletionHandler() {
        }

        @Override
        public HeartbeatResponse parse(ClientResponse response) {
            return new HeartbeatResponse(response.responseBody());
        }

        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            ((AbstractCoordinator)AbstractCoordinator.this).sensors.heartbeatLatency.record(this.response.requestLatencyMs());
            short errorCode = heartbeatResponse.errorCode();
            if (errorCode == Errors.NONE.code()) {
                log.debug("Received successful heartbeat response.");
                future.complete(null);
            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
                AbstractCoordinator.this.coordinatorDead();
                future.raise(Errors.forCode(errorCode));
            } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
                log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
                log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
                AbstractCoordinator.this.memberId = "";
                AbstractCoordinator.this.rejoinNeeded = true;
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " + Errors.forCode(errorCode).exception().getMessage()));
            }
        }
    }

    private class LeaveGroupResponseHandler
    extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
        private LeaveGroupResponseHandler() {
        }

        @Override
        public LeaveGroupResponse parse(ClientResponse response) {
            return new LeaveGroupResponse(response.responseBody());
        }

        @Override
        public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) {
            short errorCode = leaveResponse.errorCode();
            if (errorCode == Errors.NONE.code()) {
                future.complete(null);
            } else {
                future.raise(Errors.forCode(errorCode));
            }
        }
    }

    private class SyncGroupRequestHandler
    extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        private SyncGroupRequestHandler() {
        }

        @Override
        public SyncGroupResponse parse(ClientResponse response) {
            return new SyncGroupResponse(response.responseBody());
        }

        @Override
        public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(syncResponse.errorCode());
            if (error == Errors.NONE) {
                log.debug("Received successful sync group response for group {}: {}", (Object)AbstractCoordinator.this.groupId, (Object)syncResponse.toStruct());
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.syncLatency.record(this.response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());
            } else {
                AbstractCoordinator.this.rejoinNeeded = true;
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", (Object)AbstractCoordinator.this.groupId);
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                    log.info("SyncGroup for group {} failed due to {}, rejoining the group", (Object)AbstractCoordinator.this.groupId, (Object)error);
                    AbstractCoordinator.this.memberId = "";
                    future.raise(error);
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", (Object)AbstractCoordinator.this.groupId, (Object)error);
                    AbstractCoordinator.this.coordinatorDead();
                    future.raise(error);
                } else {
                    future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage()));
                }
            }
        }
    }

    private class JoinGroupResponseHandler
    extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        private JoinGroupResponseHandler() {
        }

        @Override
        public JoinGroupResponse parse(ClientResponse response) {
            return new JoinGroupResponse(response.responseBody());
        }

        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            short errorCode = joinResponse.errorCode();
            if (errorCode == Errors.NONE.code()) {
                log.debug("Joined group: {}", (Object)joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded = false;
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                ((AbstractCoordinator)AbstractCoordinator.this).sensors.joinLatency.record(this.response.requestLatencyMs());
                if (joinResponse.isLeader()) {
                    AbstractCoordinator.this.onJoinLeader(joinResponse).chain(future);
                } else {
                    AbstractCoordinator.this.onJoinFollower().chain(future);
                }
            } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
                log.debug("Attempt to join group {} rejected since coordinator is loading the group.", (Object)AbstractCoordinator.this.groupId);
                future.raise(Errors.forCode(errorCode));
            } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                AbstractCoordinator.this.memberId = "";
                log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.", (Object)AbstractCoordinator.this.groupId);
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                AbstractCoordinator.this.coordinatorDead();
                log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", (Object)AbstractCoordinator.this.groupId);
                future.raise(Errors.forCode(errorCode));
            } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code() || errorCode == Errors.INVALID_GROUP_ID.code()) {
                Errors error = Errors.forCode(errorCode);
                log.error("Attempt to join group {} failed due to: {}", (Object)AbstractCoordinator.this.groupId, (Object)error.exception().getMessage());
                future.raise(error);
            } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
                future.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
            }
        }
    }

    private class HeartbeatTask
    implements DelayedTask {
        private boolean requestInFlight = false;

        private HeartbeatTask() {
        }

        public void reset() {
            long now = AbstractCoordinator.this.time.milliseconds();
            AbstractCoordinator.this.heartbeat.resetSessionTimeout(now);
            AbstractCoordinator.this.client.unschedule(this);
            if (!this.requestInFlight) {
                AbstractCoordinator.this.client.schedule(this, now);
            }
        }

        @Override
        public void run(long now) {
            if (AbstractCoordinator.this.generation < 0 || AbstractCoordinator.this.needRejoin() || AbstractCoordinator.this.coordinatorUnknown()) {
                return;
            }
            if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
                AbstractCoordinator.this.coordinatorDead();
                return;
            }
            if (!AbstractCoordinator.this.heartbeat.shouldHeartbeat(now)) {
                AbstractCoordinator.this.client.schedule(this, now + AbstractCoordinator.this.heartbeat.timeToNextHeartbeat(now));
            } else {
                AbstractCoordinator.this.heartbeat.sentHeartbeat(now);
                this.requestInFlight = true;
                RequestFuture<Void> future = AbstractCoordinator.this.sendHeartbeatRequest();
                future.addListener(new RequestFutureListener<Void>(){

                    @Override
                    public void onSuccess(Void value) {
                        HeartbeatTask.this.requestInFlight = false;
                        long now = AbstractCoordinator.this.time.milliseconds();
                        AbstractCoordinator.this.heartbeat.receiveHeartbeat(now);
                        long nextHeartbeatTime = now + AbstractCoordinator.this.heartbeat.timeToNextHeartbeat(now);
                        AbstractCoordinator.this.client.schedule(HeartbeatTask.this, nextHeartbeatTime);
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        HeartbeatTask.this.requestInFlight = false;
                        AbstractCoordinator.this.client.schedule(HeartbeatTask.this, AbstractCoordinator.this.time.milliseconds() + AbstractCoordinator.this.retryBackoffMs);
                    }
                });
            }
        }
    }
}

