/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.BlockFetchingListener;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.collect.Sets;
import org.spark_project.guava.util.concurrent.Uninterruptibles;

public class RetryingBlockFetcher {
    private static final ExecutorService executorService = Executors.newCachedThreadPool(NettyUtils.createThreadFactory((String)"Block Fetch Retry"));
    private static final Logger logger = LoggerFactory.getLogger(RetryingBlockFetcher.class);
    private final BlockFetchStarter fetchStarter;
    private final BlockFetchingListener listener;
    private final int maxRetries;
    private final int retryWaitTime;
    private int retryCount = 0;
    private final LinkedHashSet<String> outstandingBlocksIds;
    private RetryingBlockFetchListener currentListener;

    public RetryingBlockFetcher(TransportConf conf, BlockFetchStarter fetchStarter, String[] blockIds, BlockFetchingListener listener) {
        this.fetchStarter = fetchStarter;
        this.listener = listener;
        this.maxRetries = conf.maxIORetries();
        this.retryWaitTime = conf.ioRetryWaitTimeMs();
        this.outstandingBlocksIds = Sets.newLinkedHashSet();
        Collections.addAll(this.outstandingBlocksIds, blockIds);
        this.currentListener = new RetryingBlockFetchListener();
    }

    public void start() {
        this.fetchAllOutstanding();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchAllOutstanding() {
        RetryingBlockFetchListener myListener;
        int numRetries;
        String[] blockIdsToFetch;
        RetryingBlockFetcher retryingBlockFetcher = this;
        synchronized (retryingBlockFetcher) {
            blockIdsToFetch = this.outstandingBlocksIds.toArray(new String[this.outstandingBlocksIds.size()]);
            numRetries = this.retryCount;
            myListener = this.currentListener;
        }
        try {
            this.fetchStarter.createAndStart(blockIdsToFetch, myListener);
        }
        catch (Exception e) {
            logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s", blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), (Throwable)e);
            if (this.shouldRetry(e)) {
                this.initiateRetry();
            }
            for (String bid : blockIdsToFetch) {
                this.listener.onBlockFetchFailure(bid, e);
            }
        }
    }

    private synchronized void initiateRetry() {
        ++this.retryCount;
        this.currentListener = new RetryingBlockFetchListener();
        logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms", new Object[]{this.retryCount, this.maxRetries, this.outstandingBlocksIds.size(), this.retryWaitTime});
        executorService.submit(new Runnable(){

            @Override
            public void run() {
                Uninterruptibles.sleepUninterruptibly((long)RetryingBlockFetcher.this.retryWaitTime, (TimeUnit)TimeUnit.MILLISECONDS);
                RetryingBlockFetcher.this.fetchAllOutstanding();
            }
        });
    }

    private synchronized boolean shouldRetry(Throwable e) {
        boolean isIOException = e instanceof IOException || e.getCause() != null && e.getCause() instanceof IOException;
        boolean hasRemainingRetries = this.retryCount < this.maxRetries;
        return isIOException && hasRemainingRetries;
    }

    private class RetryingBlockFetchListener
    implements BlockFetchingListener {
        private RetryingBlockFetchListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
            boolean shouldForwardSuccess = false;
            RetryingBlockFetcher retryingBlockFetcher = RetryingBlockFetcher.this;
            synchronized (retryingBlockFetcher) {
                if (this == RetryingBlockFetcher.this.currentListener && RetryingBlockFetcher.this.outstandingBlocksIds.contains(blockId)) {
                    RetryingBlockFetcher.this.outstandingBlocksIds.remove(blockId);
                    shouldForwardSuccess = true;
                }
            }
            if (shouldForwardSuccess) {
                RetryingBlockFetcher.this.listener.onBlockFetchSuccess(blockId, data);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onBlockFetchFailure(String blockId, Throwable exception) {
            boolean shouldForwardFailure = false;
            RetryingBlockFetcher retryingBlockFetcher = RetryingBlockFetcher.this;
            synchronized (retryingBlockFetcher) {
                if (this == RetryingBlockFetcher.this.currentListener && RetryingBlockFetcher.this.outstandingBlocksIds.contains(blockId)) {
                    if (RetryingBlockFetcher.this.shouldRetry(exception)) {
                        RetryingBlockFetcher.this.initiateRetry();
                    } else {
                        logger.error(String.format("Failed to fetch block %s, and will not retry (%s retries)", blockId, RetryingBlockFetcher.this.retryCount), exception);
                        RetryingBlockFetcher.this.outstandingBlocksIds.remove(blockId);
                        shouldForwardFailure = true;
                    }
                }
            }
            if (shouldForwardFailure) {
                RetryingBlockFetcher.this.listener.onBlockFetchFailure(blockId, exception);
            }
        }
    }

    public static interface BlockFetchStarter {
        public void createAndStart(String[] var1, BlockFetchingListener var2) throws IOException;
    }
}

