/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.distribution.DistributionManagerImpl;
import org.infinispan.distribution.PendingPreparesMap;
import org.infinispan.distribution.RehashTask;
import org.infinispan.distribution.TransactionLogMap;
import org.infinispan.distribution.TransactionLogger;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashHelper;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.NotifyingFutureImpl;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class InvertedLeaveTask
extends RehashTask {
    private static final Log log = LogFactory.getLog(InvertedLeaveTask.class);
    private static final boolean trace = log.isTraceEnabled();
    private final List<Address> leavers;
    private final Address self;
    private final List<Address> leaversHandled;
    private final List<Address> providers;
    private final List<Address> receivers;
    private final boolean isReceiver;
    private final boolean isSender;

    public InvertedLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration conf, CommandsFactory commandsFactory, DataContainer dataContainer, List<Address> leavers, List<Address> stateProviders, List<Address> stateReceivers, boolean isReceiver) {
        super(dmi, rpcManager, conf, commandsFactory, dataContainer);
        this.leavers = leavers;
        this.leaversHandled = new LinkedList<Address>(leavers);
        this.providers = stateProviders;
        this.receivers = stateReceivers;
        this.isReceiver = isReceiver;
        this.self = rpcManager.getTransport().getAddress();
        this.isSender = stateProviders.contains(this.self);
    }

    private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
        return (Map)r.getResponseValue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void performRehash() throws Exception {
        long start = trace ? System.currentTimeMillis() : 0L;
        int replCount = this.configuration.getNumOwners();
        ConsistentHash newCH = this.dmi.getConsistentHash();
        ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(this.configuration, newCH.getCaches(), this.leaversHandled, this.dmi.topologyInfo);
        try {
            log.debug((Object)"Starting leave rehash[enabled={0},isReceiver={1},isSender={2}] on node {3}", this.configuration.isRehashEnabled(), this.isReceiver, this.isSender, this.self);
            if (this.configuration.isRehashEnabled()) {
                if (this.isReceiver) {
                    try {
                        RehashControlCommand cmd = this.cf.buildRehashControlCommand(RehashControlCommand.Type.PULL_STATE_LEAVE, this.self, null, oldCH, newCH, this.leaversHandled);
                        log.debug((Object)"I {0} am pulling state from {1}", this.self, this.providers);
                        List<Response> resps = this.rpcManager.invokeRemotely(this.providers, cmd, ResponseMode.SYNCHRONOUS, this.configuration.getRehashRpcTimeout(), true);
                        log.debug((Object)"I {0} received response {1} ", this.self, resps);
                        for (Response r : resps) {
                            if (!(r instanceof SuccessfulResponse)) continue;
                            Map<Object, InternalCacheValue> state = this.getStateFromResponse((SuccessfulResponse)r);
                            log.debug((Object)"I {0} am applying state {1} ", this.self, state);
                            this.dmi.applyState(newCH, state);
                        }
                    }
                    finally {
                        RehashControlCommand c = this.cf.buildRehashControlCommand(RehashControlCommand.Type.LEAVE_REHASH_END, this.self);
                        this.rpcManager.invokeRemotely(this.providers, c, ResponseMode.ASYNCHRONOUS, this.configuration.getRehashRpcTimeout(), false);
                    }
                }
                if (this.isSender) {
                    HashSet<Address> recCopy = new HashSet<Address>(this.receivers);
                    if (this.isReceiver) {
                        recCopy.remove(this.self);
                    }
                    this.dmi.awaitLeaveRehashAcks(recCopy, this.configuration.getStateRetrievalTimeout());
                    this.processAndDrainTxLog(oldCH, newCH, replCount);
                    this.invalidateInvalidHolders(this.leaversHandled, oldCH, newCH);
                }
            }
            this.leavers.removeAll(this.leaversHandled);
        }
        catch (Exception e) {
            try {
                throw new CacheException("Unexpected exception", e);
            }
            catch (Throwable throwable) {
                this.leavers.removeAll(this.leaversHandled);
                if (trace) {
                    log.info((Object)"Completed leave rehash on node {0} in {1}", this.self, Util.prettyPrintTime(System.currentTimeMillis() - start));
                } else {
                    log.info((Object)"Completed leave rehash on node {0}", this.self);
                }
                for (Address addr : this.leaversHandled) {
                    this.dmi.topologyInfo.removeNodeInfo(addr);
                }
                this.dmi.rehashInProgress = false;
                throw throwable;
            }
        }
        if (trace) {
            log.info((Object)"Completed leave rehash on node {0} in {1}", this.self, Util.prettyPrintTime(System.currentTimeMillis() - start));
        } else {
            log.info((Object)"Completed leave rehash on node {0}", this.self);
        }
        for (Address addr : this.leaversHandled) {
            this.dmi.topologyInfo.removeNodeInfo(addr);
        }
        this.dmi.rehashInProgress = false;
    }

    private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
        List<WriteCommand> c;
        int i = 0;
        TransactionLogger transactionLogger = this.dmi.getTransactionLogger();
        if (trace) {
            log.trace("Processing transaction log iteratively: " + transactionLogger);
        }
        while (transactionLogger.shouldDrainWithoutLock()) {
            if (trace) {
                log.trace((Object)"Processing transaction log, iteration {0}", i++);
            }
            c = transactionLogger.drain();
            if (trace) {
                log.trace((Object)"Found {0} modifications", c.size());
            }
            this.apply(oldCH, newCH, replCount, c);
        }
        if (trace) {
            log.trace("Processing transaction log: final drain and lock");
        }
        c = transactionLogger.drainAndLock();
        if (trace) {
            log.trace((Object)"Found {0} modifications", c.size());
        }
        this.apply(oldCH, newCH, replCount, c);
        if (trace) {
            log.trace("Handling pending prepares");
        }
        PendingPreparesMap state = new PendingPreparesMap(this.leavers, oldCH, newCH, replCount);
        Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
        if (trace) {
            log.trace((Object)"Found {0} pending prepares", pendingPrepares.size());
        }
        for (PrepareCommand pc : pendingPrepares) {
            state.addState(pc);
        }
        if (trace) {
            log.trace((Object)"State map for pending prepares is {0}", state.getState());
        }
        HashSet<NotifyingFutureImpl> pushFutures = new HashSet<NotifyingFutureImpl>();
        for (Map.Entry entry : state.getState().entrySet()) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Pushing {0} uncommitted prepares to {1}", ((List)entry.getValue()).size(), entry.getKey());
            }
            RehashControlCommand push = this.cf.buildRehashControlCommandTxLogPendingPrepares(this.self, (List)entry.getValue());
            NotifyingFutureImpl f = new NotifyingFutureImpl(null);
            pushFutures.add(f);
            this.rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, this.configuration.getRehashRpcTimeout());
        }
        for (Future future : pushFutures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                log.error((Object)"Error pushing tx log", e);
            }
        }
        if (trace) {
            log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
        }
        transactionLogger.unlockAndDisable();
    }

    private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount, List<WriteCommand> wc) {
        TransactionLogMap state = new TransactionLogMap(this.leavers, oldCH, newCH, replCount);
        for (WriteCommand c : wc) {
            state.addState(c);
        }
        if (trace) {
            log.trace((Object)"State map for modifications is {0}", state.getState());
        }
        HashSet<NotifyingFutureImpl> pushFutures = new HashSet<NotifyingFutureImpl>();
        for (Map.Entry entry : state.getState().entrySet()) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Pushing {0} modifications to {1}", ((List)entry.getValue()).size(), entry.getKey());
            }
            RehashControlCommand push = this.cf.buildRehashControlCommandTxLog(this.self, (List)entry.getValue());
            NotifyingFutureImpl f = new NotifyingFutureImpl(null);
            pushFutures.add(f);
            this.rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, this.configuration.getRehashRpcTimeout());
        }
        for (Future future : pushFutures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                log.error((Object)"Error pushing tx log", e);
            }
        }
    }

    @Override
    protected Log getLog() {
        return log;
    }
}

