/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.protocol;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.MapMaker;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.ConnectionEvents;
import com.lambdaworks.redis.RedisChannelHandler;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.protocol.ChannelLogDescriptor;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.RedisCommand;
import com.lambdaworks.redis.protocol.RedisStateMachine;
import com.lambdaworks.redis.resource.ClientResources;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.local.LocalAddress;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

@ChannelHandler.Sharable
public class CommandHandler<K, V>
extends ChannelDuplexHandler
implements RedisChannelWriter<K, V> {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(CommandHandler.class);
    private static final WriteLogListener WRITE_LOG_LISTENER = new WriteLogListener();
    private static final Set<String> SUPPRESS_IO_EXCEPTION_MESSAGES = ImmutableSet.of((Object)"Connection reset by peer", (Object)"Broken pipe", (Object)"Connection timed out");
    protected final ClientOptions clientOptions;
    protected final ClientResources clientResources;
    protected final Queue<RedisCommand<K, V, ?>> queue;
    protected final ReentrantLock writeLock = new ReentrantLock();
    protected volatile Deque<RedisCommand<K, V, ?>> commandBuffer = this.newCommandBuffer();
    protected final Deque<RedisCommand<K, V, ?>> transportBuffer = this.newCommandBuffer();
    protected ByteBuf buffer;
    protected RedisStateMachine<K, V> rsm;
    protected Channel channel;
    private volatile ConnectionWatchdog connectionWatchdog;
    private final boolean traceEnabled;
    private final boolean debugEnabled;
    private final Reliability reliability;
    private LifecycleState lifecycleState = LifecycleState.NOT_CONNECTED;
    private RedisChannelHandler<K, V> redisChannelHandler;
    private Throwable connectionError;
    private String logPrefix;
    private boolean autoFlushCommands = true;
    private final Object stateLock = new Object();
    private final Map<RedisCommand<K, V, ?>, SentReceived> sentTimes = new MapMaker().concurrencyLevel(4).weakKeys().makeMap();

    public CommandHandler(ClientOptions clientOptions, ClientResources clientResources, Queue<RedisCommand<K, V, ?>> queue) {
        Preconditions.checkArgument((clientOptions != null ? 1 : 0) != 0, (Object)"clientOptions must not be null");
        Preconditions.checkArgument((clientResources != null ? 1 : 0) != 0, (Object)"clientResources must not be null");
        Preconditions.checkArgument((queue != null ? 1 : 0) != 0, (Object)"queue must not be null");
        this.clientOptions = clientOptions;
        this.clientResources = clientResources;
        this.queue = queue;
        this.traceEnabled = logger.isTraceEnabled();
        this.debugEnabled = logger.isDebugEnabled();
        this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.setState(LifecycleState.REGISTERED);
        this.buffer = ctx.alloc().directBuffer(65536);
        this.rsm = new RedisStateMachine();
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = ctx.channel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.releaseBuffer();
        if (this.lifecycleState == LifecycleState.CLOSED) {
            this.cancelCommands("Connection closed");
        }
        Object object = this.stateLock;
        synchronized (object) {
            this.channel = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf input = (ByteBuf)msg;
        if (!input.isReadable() || input.refCnt() == 0 || this.buffer == null) {
            return;
        }
        try {
            this.buffer.writeBytes(input);
            if (this.traceEnabled) {
                logger.trace("{} Received: {}", (Object)this.logPrefix(), (Object)this.buffer.toString(Charset.defaultCharset()).trim());
            }
            this.decode(ctx, this.buffer);
        }
        finally {
            input.release();
        }
    }

    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
        while (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.peek();
            SentReceived sentReceived = this.sentTimes.get(command);
            if (this.debugEnabled) {
                logger.debug("{} Queue contains: {} commands", (Object)this.logPrefix(), (Object)this.queue.size());
            }
            if (sentReceived != null && sentReceived.firstResponse == -1L) {
                sentReceived.firstResponse = this.nanoTime();
            }
            if (!this.rsm.decode(buffer, command, command.getOutput())) {
                return;
            }
            command = this.queue.poll();
            this.sentTimes.remove(command);
            this.recordLatency(command, sentReceived);
            command.complete();
            if (buffer == null || buffer.refCnt() == 0) continue;
            buffer.discardReadBytes();
        }
    }

    private void recordLatency(RedisCommand<K, V, ?> command, SentReceived sentReceived) {
        if (sentReceived != null && this.channel != null && this.remote() != null && this.clientResources.commandLatencyCollector().isEnabled()) {
            long firstResponseLatency = this.nanoTime() - sentReceived.firstResponse;
            long completionLatency = this.nanoTime() - sentReceived.sent;
            this.clientResources.commandLatencyCollector().recordCommandLatency(this.local(), this.remote(), command.getType(), firstResponseLatency, completionLatency);
        }
    }

    private SocketAddress remote() {
        return this.channel.remoteAddress();
    }

    private SocketAddress local() {
        if (this.channel.localAddress() != null) {
            return this.channel.localAddress();
        }
        return LocalAddress.ANY;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
        Preconditions.checkArgument((command != null ? 1 : 0) != 0, (Object)"command must not be null");
        try {
            this.writeLock.lock();
            if (this.lifecycleState == LifecycleState.CLOSED) {
                throw new RedisException("Connection is closed");
            }
            if (this.commandBuffer.size() + this.queue.size() >= this.clientOptions.getRequestQueueSize()) {
                throw new RedisException("Request queue size exceeded: " + this.clientOptions.getRequestQueueSize() + ". Commands are not accepted until the queue size drops.");
            }
            if (!(this.channel != null && this.isConnected() || this.clientOptions.isAutoReconnect())) {
                throw new RedisException("Connection is in a disconnected state and reconnect is disabled. Commands are not accepted.");
            }
            Channel channel = this.channel;
            if (this.autoFlushCommands) {
                if (channel != null && this.isConnected() && channel.isActive()) {
                    if (this.debugEnabled) {
                        logger.debug("{} write() writeAndFlush Command {}", (Object)this.logPrefix(), command);
                    }
                    if (this.reliability == Reliability.AT_MOST_ONCE) {
                        this.writeAndFlush(command).addListener(new AtMostOnceWriteListener<K, V, T>(command, this.queue, this.sentTimes));
                    }
                    if (this.reliability == Reliability.AT_LEAST_ONCE) {
                        this.writeAndFlush(command).addListener((GenericFutureListener)WRITE_LOG_LISTENER);
                    }
                } else {
                    if (this.commandBuffer.contains(command) || this.queue.contains(command)) {
                        RedisCommand<K, V, T> redisCommand = command;
                        return redisCommand;
                    }
                    if (this.connectionError != null) {
                        if (this.debugEnabled) {
                            logger.debug("{} write() completing Command {} due to connection error", (Object)this.logPrefix(), command);
                        }
                        command.setException(this.connectionError);
                        command.complete();
                        RedisCommand<K, V, T> redisCommand = command;
                        return redisCommand;
                    }
                    this.bufferCommand(command);
                }
            } else {
                this.bufferCommand(command);
            }
        }
        finally {
            this.writeLock.unlock();
            if (this.debugEnabled) {
                logger.debug("{} write() done", (Object)this.logPrefix());
            }
        }
        return command;
    }

    private <T> void bufferCommand(RedisCommand<K, V, T> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() buffering Command {}", (Object)this.logPrefix(), command);
        }
        this.commandBuffer.add(command);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isConnected() {
        LifecycleState lifecycleState = this.lifecycleState;
        synchronized (lifecycleState) {
            return this.lifecycleState.ordinal() >= LifecycleState.CONNECTED.ordinal() && this.lifecycleState.ordinal() < LifecycleState.DISCONNECTED.ordinal();
        }
    }

    @Override
    public void flushCommands() {
        if (this.channel != null && this.isConnected()) {
            Deque queuedCommands;
            try {
                this.writeLock.lock();
                queuedCommands = this.commandBuffer;
                this.commandBuffer = this.newCommandBuffer();
            }
            finally {
                this.writeLock.unlock();
            }
            if (this.reliability == Reliability.AT_MOST_ONCE) {
                this.writeAndFlush(queuedCommands).addListener(new AtMostOnceWriteListener(queuedCommands, this.queue, this.sentTimes));
            }
            if (this.reliability == Reliability.AT_LEAST_ONCE) {
                this.writeAndFlush(queuedCommands).addListener((GenericFutureListener)WRITE_LOG_LISTENER);
            }
        }
    }

    private ChannelFuture writeAndFlush(Collection<RedisCommand<K, V, ?>> commands) {
        if (this.debugEnabled) {
            logger.debug("{} write() writeAndFlush Commands {}", (Object)this.logPrefix(), commands);
        }
        this.transportBuffer.addAll(commands);
        return this.channel.writeAndFlush(commands);
    }

    private ChannelFuture writeAndFlush(RedisCommand<K, V, ?> command) {
        if (this.debugEnabled) {
            logger.debug("{} write() writeAndFlush Command {}", (Object)this.logPrefix(), command);
        }
        this.transportBuffer.add(command);
        return this.channel.writeAndFlush(command);
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof RedisCommand) {
            this.writeSingleCommand(ctx, (RedisCommand)msg, promise);
            return;
        }
        if (msg instanceof Collection) {
            this.writeBatch(ctx, (Collection)msg, promise);
        }
    }

    private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command, ChannelPromise promise) throws Exception {
        if (command.isCancelled()) {
            this.transportBuffer.remove(command);
            return;
        }
        this.queueCommand(command, promise);
        ctx.write(command, promise);
    }

    private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V, ?>> msg, ChannelPromise promise) throws Exception {
        Collection<RedisCommand<K, V, ?>> commands;
        Collection<RedisCommand<K, V, ?>> toWrite = commands = msg;
        boolean cancelledCommands = false;
        for (RedisCommand<K, V, ?> command : commands) {
            if (!command.isCancelled()) continue;
            cancelledCommands = true;
            break;
        }
        if (cancelledCommands) {
            toWrite = new ArrayList(commands.size());
            for (RedisCommand<K, V, ?> command : commands) {
                if (command.isCancelled()) {
                    this.transportBuffer.remove(command);
                    continue;
                }
                toWrite.add(command);
                this.queueCommand(command, promise);
            }
        } else {
            for (RedisCommand<K, V, ?> command : toWrite) {
                this.queueCommand(command, promise);
            }
        }
        if (!toWrite.isEmpty()) {
            ctx.write(toWrite, promise);
        }
    }

    private void queueCommand(RedisCommand<K, V, ?> command, ChannelPromise promise) throws Exception {
        try {
            if (command.getOutput() == null) {
                command.complete();
            } else {
                this.sentTimes.put(command, new SentReceived(this.nanoTime()));
                this.queue.add(command);
            }
            this.transportBuffer.remove(command);
        }
        catch (Exception e) {
            command.setException(e);
            command.cancel(true);
            promise.setFailure((Throwable)e);
            throw e;
        }
    }

    private long nanoTime() {
        return System.nanoTime();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.connectionWatchdog = null;
        this.logPrefix = null;
        if (this.debugEnabled) {
            logger.debug("{} channelActive()", (Object)this.logPrefix());
        }
        this.setStateIfNotClosed(LifecycleState.CONNECTED);
        if (ctx != null && ctx.pipeline() != null) {
            Map map = ctx.pipeline().toMap();
            for (ChannelHandler handler : map.values()) {
                if (!(handler instanceof ConnectionWatchdog)) continue;
                this.connectionWatchdog = (ConnectionWatchdog)handler;
            }
        }
        try {
            this.writeLock.lock();
            this.moveQueuedCommandsToCommandBuffer();
            this.activateCommandHandlerAndExecuteBufferedCommands(ctx);
        }
        catch (Exception e) {
            if (this.debugEnabled) {
                logger.debug("{} channelActive() ran into an exception", (Object)this.logPrefix());
            }
            if (this.clientOptions.isCancelCommandsOnReconnectFailure()) {
                this.reset();
            }
            throw e;
        }
        finally {
            this.writeLock.unlock();
        }
        super.channelActive(ctx);
        if (this.channel != null) {
            this.channel.eventLoop().submit(new Runnable(){

                @Override
                public void run() {
                    CommandHandler.this.channel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Activated());
                }
            });
        }
        if (this.debugEnabled) {
            logger.debug("{} channelActive() done", (Object)this.logPrefix());
        }
    }

    private void moveQueuedCommandsToCommandBuffer() {
        List<RedisCommand<K, V, ?>> queuedCommands = this.drainCommands(this.queue);
        Collections.reverse(queuedCommands);
        List<RedisCommand<K, V, ?>> transportBufferCommands = this.drainCommands(this.transportBuffer);
        Collections.reverse(transportBufferCommands);
        queuedCommands.addAll(transportBufferCommands);
        logger.debug("{} moveQueuedCommandsToCommandBuffer {} command(s) added to buffer", (Object)this.logPrefix(), (Object)queuedCommands.size());
        for (RedisCommand<K, V, ?> command : queuedCommands) {
            this.commandBuffer.addFirst(command);
        }
    }

    private List<RedisCommand<K, V, ?>> drainCommands(Collection<RedisCommand<K, V, ?>> source) {
        ArrayList target = new ArrayList(source.size());
        target.addAll(source);
        source.removeAll(target);
        return target;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void activateCommandHandlerAndExecuteBufferedCommands(ChannelHandlerContext ctx) {
        try {
            this.writeLock.lock();
            this.connectionError = null;
            this.sentTimes.clear();
            if (this.debugEnabled) {
                logger.debug("{} activateCommandHandlerAndExecuteBufferedCommands {} command(s) buffered", (Object)this.logPrefix(), (Object)this.commandBuffer.size());
            }
            Object object = this.stateLock;
            synchronized (object) {
                this.channel = ctx.channel();
            }
            if (this.redisChannelHandler != null) {
                if (this.debugEnabled) {
                    logger.debug("{} activating channel handler", (Object)this.logPrefix());
                }
                this.setStateIfNotClosed(LifecycleState.ACTIVATING);
                this.redisChannelHandler.activated();
            }
            this.setStateIfNotClosed(LifecycleState.ACTIVE);
            this.flushCommands();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.debugEnabled) {
            logger.debug("{} channelInactive()", (Object)this.logPrefix());
        }
        try {
            this.writeLock.lock();
            this.setStateIfNotClosed(LifecycleState.DISCONNECTED);
            if (this.redisChannelHandler != null) {
                if (this.debugEnabled) {
                    logger.debug("{} deactivating channel handler", (Object)this.logPrefix());
                }
                this.setStateIfNotClosed(LifecycleState.DEACTIVATING);
                this.redisChannelHandler.deactivated();
            }
            this.setStateIfNotClosed(LifecycleState.DEACTIVATED);
            this.commandBuffer.addAll(this.queue);
            this.queue.removeAll(this.commandBuffer);
        }
        finally {
            this.writeLock.unlock();
        }
        if (this.buffer != null) {
            this.rsm.reset();
            this.buffer.clear();
        }
        if (this.debugEnabled) {
            logger.debug("{} channelInactive() done", (Object)this.logPrefix());
        }
        super.channelInactive(ctx);
    }

    protected void setStateIfNotClosed(LifecycleState lifecycleState) {
        if (this.lifecycleState != LifecycleState.CLOSED) {
            this.setState(lifecycleState);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setState(LifecycleState lifecycleState) {
        Object object = this.stateLock;
        synchronized (object) {
            this.lifecycleState = lifecycleState;
        }
    }

    protected LifecycleState getState() {
        return this.lifecycleState;
    }

    private void cancelCommands(String message) {
        int size = 0;
        if (this.queue != null) {
            size += this.queue.size();
        }
        if (this.commandBuffer != null) {
            size += this.commandBuffer.size();
        }
        ArrayList toCancel = new ArrayList(size);
        if (this.queue != null) {
            toCancel.addAll(this.queue);
            this.queue.clear();
        }
        if (this.commandBuffer != null) {
            toCancel.addAll(this.commandBuffer);
            this.commandBuffer.clear();
        }
        this.sentTimes.clear();
        for (RedisCommand redisCommand : toCancel) {
            if (redisCommand.getOutput() != null) {
                redisCommand.getOutput().setError(message);
            }
            redisCommand.cancel(true);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        InternalLogLevel logLevel = InternalLogLevel.WARN;
        if (!this.queue.isEmpty()) {
            RedisCommand<K, V, ?> command = this.queue.poll();
            this.sentTimes.remove(command);
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in {}", (Object)this.logPrefix(), command);
            }
            logLevel = InternalLogLevel.DEBUG;
            command.setException(cause);
            command.complete();
        }
        if (this.channel == null || !this.channel.isActive() || !this.isConnected()) {
            if (this.debugEnabled) {
                logger.debug("{} Storing exception in connectionError", (Object)this.logPrefix());
            }
            logLevel = InternalLogLevel.DEBUG;
            this.connectionError = cause;
        }
        if (cause instanceof IOException && logLevel.ordinal() > InternalLogLevel.INFO.ordinal()) {
            logLevel = InternalLogLevel.INFO;
            if (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                logLevel = InternalLogLevel.DEBUG;
            }
        }
        logger.log(logLevel, "{} Unexpected exception during request: {}", new Object[]{this.logPrefix, cause.toString(), cause});
    }

    @Override
    public void close() {
        if (this.debugEnabled) {
            logger.debug("{} close()", (Object)this.logPrefix());
        }
        if (this.lifecycleState == LifecycleState.CLOSED) {
            return;
        }
        this.setStateIfNotClosed(LifecycleState.CLOSED);
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            ChannelFuture close = currentChannel.pipeline().close();
            if (currentChannel.isOpen()) {
                close.syncUninterruptibly();
            }
        } else if (this.connectionWatchdog != null) {
            this.connectionWatchdog.prepareClose(new ConnectionEvents.PrepareClose());
        }
    }

    private void releaseBuffer() {
        if (this.buffer != null) {
            this.buffer.release();
            this.buffer = null;
        }
    }

    public boolean isClosed() {
        return this.lifecycleState == LifecycleState.CLOSED;
    }

    @Override
    public void reset() {
        if (this.debugEnabled) {
            logger.debug("{} reset()", (Object)this.logPrefix());
        }
        try {
            this.writeLock.lock();
            this.cancelCommands("Reset");
        }
        finally {
            this.writeLock.unlock();
        }
        if (this.buffer != null) {
            this.rsm.reset();
            this.buffer.clear();
        }
    }

    public void initialState() {
        this.setState(LifecycleState.NOT_CONNECTED);
        this.queue.clear();
        this.commandBuffer.clear();
        Channel currentChannel = this.channel;
        if (currentChannel != null) {
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.PrepareClose());
            currentChannel.pipeline().fireUserEventTriggered((Object)new ConnectionEvents.Close());
            currentChannel.pipeline().close();
        }
    }

    @Override
    public void setRedisChannelHandler(RedisChannelHandler<K, V> redisChannelHandler) {
        this.redisChannelHandler = redisChannelHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setAutoFlushCommands(boolean autoFlush) {
        Object object = this.stateLock;
        synchronized (object) {
            this.autoFlushCommands = autoFlush;
        }
    }

    protected String logPrefix() {
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        StringBuffer buffer = new StringBuffer(64);
        buffer.append('[').append(ChannelLogDescriptor.logDescriptor(this.channel)).append(']');
        this.logPrefix = buffer.toString();
        return this.logPrefix;
    }

    private ArrayDeque<RedisCommand<K, V, ?>> newCommandBuffer() {
        return new ArrayDeque(512);
    }

    static class SentReceived {
        final long sent;
        long firstResponse = -1L;

        public SentReceived(long sent) {
            this.sent = sent;
        }
    }

    static class WriteLogListener
    implements GenericFutureListener<Future<Void>> {
        WriteLogListener() {
        }

        public void operationComplete(Future<Void> future) throws Exception {
            Throwable cause = future.cause();
            if (!future.isSuccess() && !(cause instanceof ClosedChannelException)) {
                String message = "Unexpected exception during request: {}";
                InternalLogLevel logLevel = InternalLogLevel.WARN;
                if (cause instanceof IOException && SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage())) {
                    logLevel = InternalLogLevel.DEBUG;
                }
                logger.log(logLevel, message, (Object)cause.toString(), (Object)cause);
            }
        }
    }

    private static class AtMostOnceWriteListener<K, V, T>
    implements ChannelFutureListener {
        private final Collection<RedisCommand<K, V, T>> sentCommands;
        private final Queue<?> queue;
        private final Map<RedisCommand<K, V, T>, SentReceived> sentTimes;

        public AtMostOnceWriteListener(RedisCommand<K, V, T> sentCommand, Queue<?> queue, Map<RedisCommand<K, V, T>, SentReceived> sentTimes) {
            this((Collection<RedisCommand<K, V, T>>)ImmutableList.of(sentCommand), queue, sentTimes);
        }

        public AtMostOnceWriteListener(Collection<RedisCommand<K, V, T>> sentCommand, Queue<?> queue, Map<RedisCommand<K, V, T>, SentReceived> sentTimes) {
            this.sentCommands = sentCommand;
            this.queue = queue;
            this.sentTimes = sentTimes;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            future.await();
            if (future.cause() != null) {
                for (RedisCommand<K, V, T> sentCommand : this.sentCommands) {
                    sentCommand.setException(future.cause());
                    sentCommand.cancel(true);
                }
                this.queue.removeAll(this.sentCommands);
                for (RedisCommand<K, V, T> sentCommand : this.sentCommands) {
                    this.sentTimes.remove(sentCommand);
                }
            }
        }
    }

    private static enum Reliability {
        AT_MOST_ONCE,
        AT_LEAST_ONCE;

    }

    public static enum LifecycleState {
        NOT_CONNECTED,
        REGISTERED,
        CONNECTED,
        ACTIVATING,
        ACTIVE,
        DISCONNECTED,
        DEACTIVATING,
        DEACTIVATED,
        CLOSED;

    }
}

