/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.listener;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.ConnectionUtils;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.SynchronizingMessageListener;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

public class RedisMessageListenerContainer
implements InitializingBean,
DisposableBean,
BeanNameAware,
SmartLifecycle {
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000L;
    public static final long DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME = 2000L;
    public static final String DEFAULT_THREAD_NAME_PREFIX = ClassUtils.getShortName(RedisMessageListenerContainer.class) + "-";
    protected final Log logger = LogFactory.getLog(this.getClass());
    private boolean afterPropertiesSet = false;
    private boolean manageExecutor = false;
    private long maxSubscriptionRegistrationWaitingTime = 2000L;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicReference<State> state = new AtomicReference<State>(State.notListening());
    private BackOff backOff = new FixedBackOff(5000L, Long.MAX_VALUE);
    private volatile CompletableFuture<Void> listenFuture = new CompletableFuture();
    private volatile CompletableFuture<Void> unsubscribeFuture = new CompletableFuture();
    @Nullable
    private ErrorHandler errorHandler;
    @Nullable
    private Executor subscriptionExecutor;
    @Nullable
    private Executor taskExecutor;
    private final Map<ByteArrayWrapper, Collection<MessageListener>> channelMapping = new ConcurrentHashMap<ByteArrayWrapper, Collection<MessageListener>>();
    private final Map<ByteArrayWrapper, Collection<MessageListener>> patternMapping = new ConcurrentHashMap<ByteArrayWrapper, Collection<MessageListener>>();
    private final Map<MessageListener, Set<Topic>> listenerTopics = new ConcurrentHashMap<MessageListener, Set<Topic>>();
    @Nullable
    private RedisConnectionFactory connectionFactory;
    private RedisSerializer<String> serializer = RedisSerializer.string();
    @Nullable
    private String beanName;
    @Nullable
    private Subscriber subscriber;

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setSubscriptionExecutor(Executor subscriptionExecutor) {
        this.subscriptionExecutor = subscriptionExecutor;
    }

    public void setTaskExecutor(Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Nullable
    public RedisConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(RedisConnectionFactory connectionFactory) {
        Assert.notNull((Object)connectionFactory, (String)"ConnectionFactory must not be null");
        this.connectionFactory = connectionFactory;
    }

    public void setTopicSerializer(RedisSerializer<String> serializer) {
        this.serializer = serializer;
    }

    public long getMaxSubscriptionRegistrationWaitingTime() {
        return this.maxSubscriptionRegistrationWaitingTime;
    }

    public void setMaxSubscriptionRegistrationWaitingTime(long maxSubscriptionRegistrationWaitingTime) {
        this.maxSubscriptionRegistrationWaitingTime = maxSubscriptionRegistrationWaitingTime;
    }

    public void setRecoveryInterval(long recoveryInterval) {
        this.setRecoveryBackoff((BackOff)new FixedBackOff(recoveryInterval, Long.MAX_VALUE));
    }

    public void setRecoveryBackoff(BackOff recoveryInterval) {
        Assert.notNull((Object)recoveryInterval, (String)"Recovery interval must not be null");
        this.backOff = recoveryInterval;
    }

    public void setMessageListeners(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
        this.initMapping(listeners);
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void afterPropertiesSet() {
        Assert.state((!this.afterPropertiesSet ? 1 : 0) != 0, (String)"Container already initialized");
        Assert.notNull((Object)this.connectionFactory, (String)"RedisConnectionFactory is not set");
        if (this.taskExecutor == null) {
            this.manageExecutor = true;
            this.taskExecutor = this.createDefaultTaskExecutor();
        }
        if (this.subscriptionExecutor == null) {
            this.subscriptionExecutor = this.taskExecutor;
        }
        this.subscriber = this.createSubscriber(this.connectionFactory, this.subscriptionExecutor);
        this.afterPropertiesSet = true;
    }

    protected TaskExecutor createDefaultTaskExecutor() {
        String threadNamePrefix = this.beanName != null ? this.beanName + "-" : DEFAULT_THREAD_NAME_PREFIX;
        return new SimpleAsyncTaskExecutor(threadNamePrefix);
    }

    public void destroy() throws Exception {
        Executor executor;
        this.afterPropertiesSet = false;
        this.stop();
        if (this.manageExecutor && (executor = this.taskExecutor) instanceof DisposableBean) {
            DisposableBean bean = (DisposableBean)executor;
            bean.destroy();
            this.logDebug(() -> "Stopped internally-managed task executor");
        }
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.logDebug(() -> "Starting RedisMessageListenerContainer...");
            this.lazyListen();
        }
    }

    private void lazyListen() {
        CompletableFuture<Void> containerListenFuture = this.listenFuture;
        State state = this.state.get();
        CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture : this.lazyListen(this.backOff.start());
        try {
            futureToAwait.get(this.getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof DataAccessException) {
                throw new RedisListenerExecutionFailedException(ex.getMessage(), ex.getCause());
            }
            throw new CompletionException(ex.getCause());
        }
        catch (TimeoutException ex) {
            throw new IllegalStateException("Subscription registration timeout exceeded", ex);
        }
    }

    private CompletableFuture<Void> lazyListen(BackOffExecution backOffExecution) {
        if (!this.hasTopics()) {
            this.logDebug(() -> "Postpone listening for Redis messages until actual listeners are added");
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> containerListenFuture = this.listenFuture;
        while (!this.doSubscribe(backOffExecution)) {
            containerListenFuture = this.listenFuture;
        }
        return containerListenFuture;
    }

    private boolean doSubscribe(BackOffExecution backOffExecution) {
        CompletableFuture<Void> containerListenFuture = this.listenFuture;
        CompletableFuture<Void> containerUnsubscribeFuture = this.unsubscribeFuture;
        State state = this.state.get();
        if (!state.isPrepareListening() && state.isListening()) {
            containerUnsubscribeFuture.join();
        }
        if (!this.state.compareAndSet(state, State.prepareListening())) {
            return false;
        }
        CompletableFuture<Void> listenFuture = this.getRequiredSubscriber().initialize(backOffExecution, this.patternMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()), this.channelMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()));
        listenFuture.whenComplete((unused, throwable) -> {
            if (throwable == null) {
                this.logDebug(() -> "RedisMessageListenerContainer listeners registered successfully");
                this.state.set(State.listening());
            } else {
                this.logDebug(() -> "Failed to start RedisMessageListenerContainer listeners", (Throwable)throwable);
                this.state.set(State.notListening());
            }
            this.propagate((Object)unused, (Throwable)throwable, containerListenFuture);
            if (throwable != null) {
                this.listenFuture = new CompletableFuture();
            }
        });
        this.logDebug(() -> "Subscribing to topics for RedisMessageListenerContainer");
        return true;
    }

    public void stop() {
        this.stop(() -> {});
    }

    public void stop(Runnable callback) {
        if (this.started.compareAndSet(true, false)) {
            this.stopListening();
            this.logDebug(() -> "Stopped RedisMessageListenerContainer");
            callback.run();
        }
    }

    private void stopListening() {
        while (!this.doUnsubscribe()) {
        }
    }

    private boolean doUnsubscribe() {
        CompletableFuture<Void> listenFuture = this.listenFuture;
        State state = this.state.get();
        if (!state.isListenerActivated()) {
            return true;
        }
        this.awaitRegistrationTime(listenFuture);
        if (this.state.compareAndSet(state, State.prepareUnsubscribe())) {
            this.getRequiredSubscriber().unsubscribeAll();
            this.awaitRegistrationTime(this.unsubscribeFuture);
            this.state.set(State.notListening());
            this.listenFuture = new CompletableFuture();
            this.unsubscribeFuture = new CompletableFuture();
            this.logDebug(() -> "Stopped listening");
            return true;
        }
        return false;
    }

    private void awaitRegistrationTime(CompletableFuture<Void> future) {
        try {
            future.get(this.getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException exception) {
            // empty catch block
        }
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public boolean isListening() {
        return this.state.get().isListening();
    }

    public final boolean isActive() {
        return this.afterPropertiesSet;
    }

    public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
        this.addListener(listener, topics);
    }

    public void addMessageListener(MessageListener listener, Topic topic) {
        this.addMessageListener(listener, Collections.singleton(topic));
    }

    public void removeMessageListener(@Nullable MessageListener listener, Collection<? extends Topic> topics) {
        this.removeListener(listener, topics);
    }

    public void removeMessageListener(@Nullable MessageListener listener, Topic topic) {
        this.removeMessageListener(listener, Collections.singleton(topic));
    }

    public void removeMessageListener(MessageListener listener) {
        Assert.notNull((Object)listener, (String)"MessageListener must not be null");
        this.removeMessageListener(listener, Collections.emptySet());
    }

    private void initMapping(Map<? extends MessageListener, Collection<? extends Topic>> listeners) {
        if (this.isRunning()) {
            this.stop();
        }
        this.patternMapping.clear();
        this.channelMapping.clear();
        this.listenerTopics.clear();
        if (!CollectionUtils.isEmpty(listeners)) {
            for (Map.Entry<? extends MessageListener, Collection<? extends Topic>> entry : listeners.entrySet()) {
                this.addListener(entry.getKey(), entry.getValue());
            }
        }
        if (this.afterPropertiesSet) {
            this.start();
        }
    }

    private void addListener(MessageListener listener, Collection<? extends Topic> topics) {
        Assert.notNull((Object)listener, (String)"A valid listener is required");
        Assert.notEmpty(topics, (String)"At least one topic is required");
        ArrayList<byte[]> channels = new ArrayList<byte[]>(topics.size());
        ArrayList<byte[]> patterns = new ArrayList<byte[]>(topics.size());
        Set set = this.listenerTopics.computeIfAbsent(listener, key -> new CopyOnWriteArraySet());
        set.addAll(topics);
        for (Topic topic : topics) {
            Collection<MessageListener> collection;
            ByteArrayWrapper serializedTopic = new ByteArrayWrapper(this.serialize(topic));
            if (topic instanceof ChannelTopic) {
                collection = this.resolveMessageListeners(this.channelMapping, serializedTopic);
                collection.add(listener);
                channels.add(serializedTopic.getArray());
                this.logTrace(() -> String.format("Adding listener '%s' on channel '%s'", listener, topic.getTopic()));
                continue;
            }
            if (topic instanceof PatternTopic) {
                collection = this.resolveMessageListeners(this.patternMapping, serializedTopic);
                collection.add(listener);
                patterns.add(serializedTopic.getArray());
                this.logTrace(() -> String.format("Adding listener '%s' for pattern '%s'", listener, topic.getTopic()));
                continue;
            }
            throw new IllegalArgumentException(String.format("Unknown topic type '%s'", topic.getClass()));
        }
        boolean wasListening = this.isListening();
        if (this.isRunning()) {
            this.lazyListen();
            if (wasListening) {
                CompletableFuture completableFuture = new CompletableFuture();
                this.getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> future.complete(null)));
                this.getRequiredSubscriber().subscribeChannel((byte[][])channels.toArray((T[])new byte[channels.size()][]));
                this.getRequiredSubscriber().subscribePattern((byte[][])patterns.toArray((T[])new byte[patterns.size()][]));
                try {
                    completableFuture.join();
                }
                catch (CompletionException ex) {
                    if (ex.getCause() instanceof DataAccessException) {
                        throw new RedisListenerExecutionFailedException(ex.getMessage(), ex.getCause());
                    }
                    throw ex;
                }
            }
        }
    }

    private Collection<MessageListener> resolveMessageListeners(Map<ByteArrayWrapper, Collection<MessageListener>> mapping, ByteArrayWrapper topic) {
        Collection<MessageListener> messageListeners = mapping.get(topic);
        if (messageListeners == null) {
            messageListeners = new CopyOnWriteArraySet<MessageListener>();
            mapping.put(topic, messageListeners);
        }
        return messageListeners;
    }

    private void removeListener(@Nullable MessageListener listener, Collection<? extends Topic> topics) {
        Assert.notNull(topics, (String)"Topics must not be null");
        if (listener != null && this.listenerTopics.get(listener) == null) {
            return;
        }
        if (topics.isEmpty()) {
            topics = this.listenerTopics.get(listener);
        }
        if (CollectionUtils.isEmpty(topics)) {
            this.stopListening();
            return;
        }
        ArrayList<byte[]> channelsToRemove = new ArrayList<byte[]>();
        ArrayList<byte[]> patternsToRemove = new ArrayList<byte[]>();
        if (CollectionUtils.isEmpty((Collection)topics)) {
            Set<Topic> set = this.listenerTopics.remove(listener);
            if (set == null) {
                return;
            }
            topics = set;
        }
        for (Topic topic : topics) {
            ByteArrayWrapper holder = new ByteArrayWrapper(this.serialize(topic));
            if (topic instanceof ChannelTopic) {
                this.remove(listener, topic, holder, this.channelMapping, channelsToRemove);
                this.logTrace(() -> String.format("Removing listener '%s' from channel '%s'", listener, topic.getTopic()));
                continue;
            }
            if (!(topic instanceof PatternTopic)) continue;
            this.remove(listener, topic, holder, this.patternMapping, patternsToRemove);
            this.logTrace(() -> String.format("Removing listener '%s' from pattern '%s'", listener, topic.getTopic()));
        }
        if (this.listenerTopics.isEmpty()) {
            this.stopListening();
        } else if (this.isListening()) {
            this.getRequiredSubscriber().unsubscribeChannel((byte[][])channelsToRemove.toArray((T[])new byte[channelsToRemove.size()][]));
            this.getRequiredSubscriber().unsubscribePattern((byte[][])patternsToRemove.toArray((T[])new byte[patternsToRemove.size()][]));
        }
    }

    private void remove(@Nullable MessageListener listener, Topic topic, ByteArrayWrapper holder, Map<ByteArrayWrapper, Collection<MessageListener>> mapping, List<byte[]> topicToRemove) {
        Collection<MessageListener> listeners = mapping.get(holder);
        List<MessageListener> listenersToRemove = null;
        if (listeners != null) {
            listeners.remove(listener);
            listenersToRemove = Collections.singletonList(listener);
            for (MessageListener messageListener : listenersToRemove) {
                Set<Topic> topics = this.listenerTopics.get(messageListener);
                if (topics != null) {
                    topics.remove(topic);
                }
                if (!CollectionUtils.isEmpty(topics)) continue;
                this.listenerTopics.remove(messageListener);
            }
            if (listeners.isEmpty()) {
                mapping.remove(holder);
                topicToRemove.add(holder.getArray());
            }
        }
    }

    private Subscriber createSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
        return ConnectionUtils.isAsync(connectionFactory) ? new Subscriber(connectionFactory) : new BlockingSubscriber(connectionFactory, executor);
    }

    protected void processMessage(MessageListener listener, Message message, byte[] source) {
        try {
            listener.onMessage(message, source);
        }
        catch (Throwable cause) {
            this.handleListenerException(cause);
        }
    }

    protected void handleListenerException(Throwable cause) {
        if (this.isActive()) {
            this.invokeErrorHandler(cause);
        } else {
            this.logDebug(() -> "Listener exception after container shutdown", cause);
        }
    }

    protected void invokeErrorHandler(Throwable cause) {
        if (this.errorHandler != null) {
            this.errorHandler.handleError(cause);
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)"Execution of message listener failed, and no ErrorHandler has been set", cause);
        }
    }

    protected void handleSubscriptionException(CompletableFuture<Void> future, BackOffExecution backOffExecution, Throwable cause) {
        this.getRequiredSubscriber().closeConnection();
        if (cause instanceof RedisConnectionFailureException && this.isRunning()) {
            BackOffExecution loggingBackOffExecution = () -> {
                long recoveryInterval = backOffExecution.nextBackOff();
                if (recoveryInterval != -1L) {
                    String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms", cause, recoveryInterval);
                    this.logger.error((Object)message, cause);
                }
                return recoveryInterval;
            };
            Runnable recoveryFunction = () -> {
                CompletableFuture<Void> lazyListen = this.lazyListen(backOffExecution);
                lazyListen.whenComplete(this.propagate(future));
            };
            if (this.potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
                return;
            }
            this.logger.error((Object)"SubscriptionTask aborted with exception:", cause);
            future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", cause));
            return;
        }
        if (this.isRunning()) {
            this.logger.error((Object)"SubscriptionTask aborted with exception:", cause);
        }
        future.completeExceptionally(cause);
    }

    private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable retryRunnable) {
        long recoveryInterval = backOffExecution.nextBackOff();
        if (recoveryInterval == -1L) {
            return false;
        }
        try {
            Executor executor = this.subscriptionExecutor;
            if (executor instanceof ScheduledExecutorService) {
                ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService)executor;
                scheduledExecutorService.schedule(retryRunnable, recoveryInterval, TimeUnit.MILLISECONDS);
            } else {
                Thread.sleep(recoveryInterval);
                retryRunnable.run();
            }
            return true;
        }
        catch (InterruptedException ex) {
            this.logDebug(() -> "Thread interrupted while sleeping the recovery interval");
            Thread.currentThread().interrupt();
            return false;
        }
    }

    private <T> BiConsumer<? super T, ? super Throwable> propagate(CompletableFuture<T> target) {
        return (value, throwable) -> this.propagate((Object)value, (Throwable)throwable, target);
    }

    private <T> void propagate(@Nullable T value, @Nullable Throwable throwable, CompletableFuture<T> target) {
        if (throwable != null) {
            target.completeExceptionally(throwable);
        } else {
            target.complete(value);
        }
    }

    private void dispatchSubscriptionNotification(Collection<MessageListener> listeners, byte[] pattern, long count, SubscriptionConsumer listenerConsumer) {
        if (!CollectionUtils.isEmpty(listeners)) {
            byte[] source = (byte[])pattern.clone();
            Executor executor = this.getRequiredTaskExecutor();
            for (MessageListener messageListener : listeners) {
                if (!(messageListener instanceof SubscriptionListener)) continue;
                SubscriptionListener subscriptionListener = (SubscriptionListener)((Object)messageListener);
                executor.execute(() -> listenerConsumer.accept(subscriptionListener, source, count));
            }
        }
    }

    private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
        byte[] source = pattern != null ? (byte[])pattern.clone() : message.getChannel();
        Executor executor = this.getRequiredTaskExecutor();
        for (MessageListener messageListener : listeners) {
            executor.execute(() -> this.processMessage(messageListener, message, source));
        }
    }

    private boolean hasTopics() {
        return !this.channelMapping.isEmpty() || !this.patternMapping.isEmpty();
    }

    private Subscriber getRequiredSubscriber() {
        Assert.state((this.subscriber != null ? 1 : 0) != 0, (String)"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
        return this.subscriber;
    }

    private Executor getRequiredTaskExecutor() {
        Assert.state((this.taskExecutor != null ? 1 : 0) != 0, (String)"No executor configured");
        return this.taskExecutor;
    }

    private byte[] serialize(Topic topic) {
        return this.serializer.serialize(topic.getTopic());
    }

    private void logDebug(Supplier<String> message) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)message.get());
        }
    }

    private void logDebug(Supplier<String> message, Throwable cause) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)message.get(), cause);
        }
    }

    private void logTrace(Supplier<String> message) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)message.get());
        }
    }

    static class State {
        private final boolean prepareListening;
        private final boolean listening;

        private State(boolean prepareListening, boolean listening) {
            this.prepareListening = prepareListening;
            this.listening = listening;
        }

        static State notListening() {
            return new State(false, false);
        }

        static State prepareListening() {
            return new State(true, false);
        }

        static State listening() {
            return new State(true, true);
        }

        static State prepareUnsubscribe() {
            return new State(false, true);
        }

        private boolean isListenerActivated() {
            return this.isListening() || this.isPrepareListening();
        }

        public boolean isListening() {
            return this.listening;
        }

        public boolean isPrepareListening() {
            return this.prepareListening;
        }
    }

    class Subscriber {
        private final DispatchMessageListener delegateListener;
        private final Lock lock;
        @Nullable
        private volatile RedisConnection connection;
        private final RedisConnectionFactory connectionFactory;
        private final SynchronizingMessageListener synchronizingMessageListener;

        Subscriber(RedisConnectionFactory connectionFactory) {
            this.delegateListener = new DispatchMessageListener();
            this.lock = new ReentrantLock();
            this.synchronizingMessageListener = new SynchronizingMessageListener(this.delegateListener, this.delegateListener);
            this.connectionFactory = connectionFactory;
        }

        public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Collection<byte[]> patterns, Collection<byte[]> channels) {
            return this.doInLock(() -> {
                CompletableFuture<Void> initFuture = new CompletableFuture<Void>();
                try {
                    RedisConnection connection;
                    this.connection = connection = this.connectionFactory.getConnection();
                    if (connection.isSubscribed()) {
                        initFuture.completeExceptionally(new IllegalStateException("Retrieved connection is already subscribed; aborting listening"));
                        return initFuture;
                    }
                    try {
                        this.eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
                    }
                    catch (Throwable t) {
                        RedisMessageListenerContainer.this.handleSubscriptionException(initFuture, backOffExecution, t);
                    }
                }
                catch (RuntimeException ex) {
                    initFuture.completeExceptionally(ex);
                }
                return initFuture;
            });
        }

        void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution, CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
            this.addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null)));
            this.doSubscribe(connection, patterns, channels);
        }

        void doSubscribe(RedisConnection connection, Collection<byte[]> patterns, Collection<byte[]> channels) {
            if (!patterns.isEmpty()) {
                connection.pSubscribe(this.synchronizingMessageListener, (byte[][])patterns.toArray((T[])new byte[0][]));
            }
            if (!channels.isEmpty()) {
                if (patterns.isEmpty()) {
                    connection.subscribe(this.synchronizingMessageListener, (byte[][])channels.toArray((T[])new byte[0][]));
                } else {
                    this.subscribeChannel((byte[][])channels.toArray((T[])new byte[0][]));
                }
            }
        }

        void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization synchronizer) {
            this.synchronizingMessageListener.addSynchronization(synchronizer);
        }

        public void unsubscribeAll() {
            this.doInLock(() -> {
                RedisConnection connection = this.connection;
                if (connection != null) {
                    this.doUnsubscribe(connection);
                }
            });
        }

        void doUnsubscribe(RedisConnection connection) {
            this.closeSubscription(connection);
            this.closeConnection();
            RedisMessageListenerContainer.this.unsubscribeFuture.complete(null);
        }

        public void cancel() {
            this.doInLock(() -> {
                RedisConnection connection = this.connection;
                if (connection != null) {
                    this.doCancel(connection);
                }
            });
        }

        void doCancel(RedisConnection connection) {
            this.closeSubscription(connection);
            this.closeConnection();
        }

        void closeSubscription(RedisConnection connection) {
            RedisMessageListenerContainer.this.logTrace(() -> "Cancelling Redis subscription...");
            Subscription subscription = connection.getSubscription();
            if (subscription != null) {
                RedisMessageListenerContainer.this.logTrace(() -> "Unsubscribing from all channels");
                try {
                    subscription.close();
                }
                catch (Exception ex) {
                    RedisMessageListenerContainer.this.logger.warn((Object)"Unable to unsubscribe from subscriptions", (Throwable)ex);
                }
            }
        }

        public void closeConnection() {
            this.doInLock(() -> {
                RedisConnection connection = this.connection;
                this.connection = null;
                if (connection != null) {
                    RedisMessageListenerContainer.this.logTrace(() -> "Closing connection");
                    try {
                        connection.close();
                    }
                    catch (Exception ex) {
                        RedisMessageListenerContainer.this.logger.warn((Object)"Error closing subscription connection", (Throwable)ex);
                    }
                }
            });
        }

        public void subscribeChannel(byte[] ... channels) {
            this.doWithSubscription(channels, Subscription::subscribe);
        }

        public void subscribePattern(byte[] ... patterns) {
            this.doWithSubscription(patterns, Subscription::pSubscribe);
        }

        public void unsubscribeChannel(byte[] ... channels) {
            this.doWithSubscription(channels, Subscription::unsubscribe);
        }

        public void unsubscribePattern(byte[] ... patterns) {
            this.doWithSubscription(patterns, Subscription::pUnsubscribe);
        }

        private void doWithSubscription(byte[][] data, BiConsumer<Subscription, byte[][]> function) {
            if (ObjectUtils.isEmpty((Object[])data)) {
                return;
            }
            this.doInLock(() -> {
                Subscription subscription;
                RedisConnection connection = this.connection;
                if (connection != null && (subscription = connection.getSubscription()) != null) {
                    function.accept(subscription, data);
                }
            });
        }

        private void doInLock(Runnable runner) {
            this.doInLock(() -> {
                runner.run();
                return null;
            });
        }

        private <T> T doInLock(Supplier<T> supplier) {
            this.lock.lock();
            try {
                T t = supplier.get();
                return t;
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    class BlockingSubscriber
    extends Subscriber {
        private final Executor executor;

        BlockingSubscriber(RedisConnectionFactory connectionFactory, Executor executor) {
            super(connectionFactory);
            this.executor = executor;
        }

        @Override
        void doUnsubscribe(RedisConnection connection) {
            this.closeSubscription(connection);
        }

        @Override
        protected void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution, CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
            Collection<byte[]> initiallySubscribeToChannels;
            if (!patterns.isEmpty() && !channels.isEmpty()) {
                initiallySubscribeToChannels = Collections.emptySet();
                this.addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, Collections.emptySet(), () -> {
                    try {
                        this.subscribeChannel((byte[][])channels.toArray((T[])new byte[0][]));
                    }
                    catch (Exception ex) {
                        RedisMessageListenerContainer.this.handleSubscriptionException(subscriptionDone, backOffExecution, ex);
                    }
                }));
            } else {
                initiallySubscribeToChannels = channels;
            }
            this.addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null)));
            this.executor.execute(() -> {
                try {
                    this.doSubscribe(connection, patterns, initiallySubscribeToChannels);
                    this.closeConnection();
                    RedisMessageListenerContainer.this.unsubscribeFuture.complete(null);
                }
                catch (Throwable cause) {
                    RedisMessageListenerContainer.this.handleSubscriptionException(subscriptionDone, backOffExecution, cause);
                }
            });
        }
    }

    static interface SubscriptionConsumer {
        public void accept(SubscriptionListener var1, byte[] var2, long var3);
    }

    private class DispatchMessageListener
    implements MessageListener,
    SubscriptionListener {
        private DispatchMessageListener() {
        }

        @Override
        public void onMessage(Message message, @Nullable byte[] pattern) {
            Collection<MessageListener> listeners = null;
            if (pattern != null && pattern.length > 0) {
                listeners = RedisMessageListenerContainer.this.patternMapping.get(new ByteArrayWrapper(pattern));
            } else {
                pattern = null;
                listeners = RedisMessageListenerContainer.this.channelMapping.get(new ByteArrayWrapper(message.getChannel()));
            }
            if (!CollectionUtils.isEmpty(listeners)) {
                RedisMessageListenerContainer.this.dispatchMessage(listeners, message, pattern);
            }
        }

        @Override
        public void onChannelSubscribed(byte[] channel, long count) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count, SubscriptionListener::onChannelSubscribed);
        }

        @Override
        public void onChannelUnsubscribed(byte[] channel, long count) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.channelMapping.getOrDefault(new ByteArrayWrapper(channel), Collections.emptyList()), channel, count, SubscriptionListener::onChannelUnsubscribed);
        }

        @Override
        public void onPatternSubscribed(byte[] pattern, long count) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count, SubscriptionListener::onPatternSubscribed);
        }

        @Override
        public void onPatternUnsubscribed(byte[] pattern, long count) {
            RedisMessageListenerContainer.this.dispatchSubscriptionNotification(RedisMessageListenerContainer.this.patternMapping.getOrDefault(new ByteArrayWrapper(pattern), Collections.emptyList()), pattern, count, SubscriptionListener::onPatternUnsubscribed);
        }
    }
}

