package com.netflix.conductor.contribs.queue.amqp;

import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants;
import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/conductor/contribs/queue/amqp/AMQPConnection.class */
public class AMQPConnection {
    private volatile Connection publisherConnection;
    private volatile Connection subscriberConnection;
    private ConnectionFactory factory;
    private Address[] addresses;
    private static final String PUBLISHER = "Publisher";
    private static final String SUBSCRIBER = "Subscriber";
    private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection.class);
    private static AMQPConnection amqpConnection = null;
    private static final Map<ConnectionType, Set<Channel>> availableChannelPool = new ConcurrentHashMap();
    private static final Map<String, Channel> subscriberReservedChannelPool = new ConcurrentHashMap();
    private static AMQPRetryPattern retrySettings = null;

    private AMQPConnection() {
        this.publisherConnection = null;
        this.subscriberConnection = null;
        this.factory = null;
        this.addresses = null;
    }

    private AMQPConnection(ConnectionFactory connectionFactory, Address[] addressArr) {
        this.publisherConnection = null;
        this.subscriberConnection = null;
        this.factory = null;
        this.addresses = null;
        this.factory = connectionFactory;
        this.addresses = addressArr;
    }

    public static synchronized AMQPConnection getInstance(ConnectionFactory connectionFactory, Address[] addressArr, AMQPRetryPattern aMQPRetryPattern) {
        if (amqpConnection == null) {
            amqpConnection = new AMQPConnection(connectionFactory, addressArr);
        }
        retrySettings = aMQPRetryPattern;
        return amqpConnection;
    }

    public static void setAMQPConnection(AMQPConnection aMQPConnection) {
        amqpConnection = aMQPConnection;
    }

    public Address[] getAddresses() {
        return this.addresses;
    }

    private Connection createConnection(String str) {
        int i = 1;
        while (true) {
            try {
                final Connection newConnection = this.factory.newConnection(this.addresses, System.getenv("HOSTNAME") + "-" + str);
                if (newConnection == null || !newConnection.isOpen()) {
                    throw new RuntimeException("Failed to open connection");
                }
                newConnection.addShutdownListener(new ShutdownListener() { // from class: com.netflix.conductor.contribs.queue.amqp.AMQPConnection.1
                    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                        AMQPConnection.LOGGER.error("Received a shutdown exception for the connection {}. reason {} cause{}", new Object[]{newConnection.getClientProvidedName(), shutdownSignalException.getMessage(), shutdownSignalException});
                    }
                });
                newConnection.addBlockedListener(new BlockedListener() { // from class: com.netflix.conductor.contribs.queue.amqp.AMQPConnection.2
                    public void handleUnblocked() throws IOException {
                        AMQPConnection.LOGGER.info("Connection {} is unblocked", newConnection.getClientProvidedName());
                    }

                    public void handleBlocked(String str2) throws IOException {
                        AMQPConnection.LOGGER.error("Connection {} is blocked. reason: {}", newConnection.getClientProvidedName(), str2);
                    }
                });
                return newConnection;
            } catch (IOException e) {
                AMQPRetryPattern aMQPRetryPattern = retrySettings;
                if (aMQPRetryPattern == null) {
                    String str2 = "IO error while connecting to " + ((String) Arrays.stream(this.addresses).map(address -> {
                        return address.toString();
                    }).collect(Collectors.joining(",")));
                    LOGGER.error(str2, e);
                    throw new RuntimeException(str2, e);
                }
                try {
                    aMQPRetryPattern.continueOrPropogate(e, i);
                    i++;
                } catch (Exception e2) {
                    String str3 = "Retries completed. IO error while connecting to " + ((String) Arrays.stream(this.addresses).map(address2 -> {
                        return address2.toString();
                    }).collect(Collectors.joining(",")));
                    LOGGER.error(str3, e);
                    throw new RuntimeException(str3, e);
                }
            } catch (TimeoutException e3) {
                AMQPRetryPattern aMQPRetryPattern2 = retrySettings;
                if (aMQPRetryPattern2 == null) {
                    String str4 = "Timeout while connecting to " + ((String) Arrays.stream(this.addresses).map(address3 -> {
                        return address3.toString();
                    }).collect(Collectors.joining(",")));
                    LOGGER.error(str4, e3);
                    throw new RuntimeException(str4, e3);
                }
                try {
                    aMQPRetryPattern2.continueOrPropogate(e3, i);
                    i++;
                } catch (Exception e4) {
                    String str5 = "Retries completed. Timeout while connecting to " + ((String) Arrays.stream(this.addresses).map(address4 -> {
                        return address4.toString();
                    }).collect(Collectors.joining(",")));
                    LOGGER.error(str5, e3);
                    throw new RuntimeException(str5, e3);
                }
            }
        }
    }

    public Channel getOrCreateChannel(ConnectionType connectionType, String str) throws Exception {
        Channel channel;
        LOGGER.debug("Accessing the channel for queueOrExchange {} with type {} ", str, connectionType);
        switch (connectionType) {
            case SUBSCRIBER:
                String str2 = connectionType + ";" + str;
                if (subscriberReservedChannelPool.containsKey(str2) && (channel = subscriberReservedChannelPool.get(str2)) != null && channel.isOpen()) {
                    return channel;
                }
                synchronized (this) {
                    if (this.subscriberConnection == null || !this.subscriberConnection.isOpen()) {
                        this.subscriberConnection = createConnection(SUBSCRIBER);
                    }
                }
                Channel borrowChannel = borrowChannel(connectionType, this.subscriberConnection);
                subscriberReservedChannelPool.put(str2, borrowChannel);
                return borrowChannel;
            case PUBLISHER:
                synchronized (this) {
                    if (this.publisherConnection == null || !this.publisherConnection.isOpen()) {
                        this.publisherConnection = createConnection(PUBLISHER);
                    }
                }
                return borrowChannel(connectionType, this.publisherConnection);
            default:
                return null;
        }
    }

    private Channel getOrCreateChannel(ConnectionType connectionType, Connection connection) {
        int i = 1;
        while (true) {
            try {
                LOGGER.debug("Creating a channel for " + connectionType);
                Channel createChannel = connection.createChannel();
                if (createChannel == null || !createChannel.isOpen()) {
                    throw new RuntimeException("Fail to open " + connectionType + " channel");
                }
                createChannel.addShutdownListener(shutdownSignalException -> {
                    LOGGER.error(connectionType + " Channel has been shutdown: {}", shutdownSignalException.getMessage(), shutdownSignalException);
                });
                return createChannel;
            } catch (IOException e) {
                AMQPRetryPattern aMQPRetryPattern = retrySettings;
                if (aMQPRetryPattern == null) {
                    throw new RuntimeException("Cannot open " + connectionType + " channel on " + ((String) Arrays.stream(this.addresses).map(address -> {
                        return address.toString();
                    }).collect(Collectors.joining(","))), e);
                }
                try {
                    aMQPRetryPattern.continueOrPropogate(e, i);
                    i++;
                } catch (Exception e2) {
                    throw new RuntimeException("Retries completed. Cannot open " + connectionType + " channel on " + ((String) Arrays.stream(this.addresses).map(address2 -> {
                        return address2.toString();
                    }).collect(Collectors.joining(","))), e);
                }
            } catch (Exception e3) {
                AMQPRetryPattern aMQPRetryPattern2 = retrySettings;
                if (aMQPRetryPattern2 == null) {
                    throw new RuntimeException("Cannot open " + connectionType + " channel on " + ((String) Arrays.stream(this.addresses).map(address3 -> {
                        return address3.toString();
                    }).collect(Collectors.joining(","))), e3);
                }
                try {
                    aMQPRetryPattern2.continueOrPropogate(e3, i);
                    i++;
                } catch (Exception e4) {
                    throw new RuntimeException("Retries completed. Cannot open " + connectionType + " channel on " + ((String) Arrays.stream(this.addresses).map(address4 -> {
                        return address4.toString();
                    }).collect(Collectors.joining(","))), e3);
                }
            }
        }
    }

    public void close() {
        LOGGER.info("Closing all connections and channels");
        try {
            closeChannelsInMap(ConnectionType.PUBLISHER);
            closeChannelsInMap(ConnectionType.SUBSCRIBER);
            closeConnection(this.publisherConnection);
            closeConnection(this.subscriberConnection);
            availableChannelPool.clear();
            this.publisherConnection = null;
            this.subscriberConnection = null;
        } catch (Throwable th) {
            availableChannelPool.clear();
            this.publisherConnection = null;
            this.subscriberConnection = null;
            throw th;
        }
    }

    private void closeChannelsInMap(ConnectionType connectionType) {
        Set<Channel> set = availableChannelPool.get(connectionType);
        if (set == null || set.isEmpty()) {
            return;
        }
        Iterator<Channel> it = set.iterator();
        while (it.hasNext()) {
            closeChannel(it.next());
        }
        set.clear();
    }

    private void closeConnection(Connection connection) {
        if (connection == null || !connection.isOpen()) {
            LOGGER.warn("Connection is null or closed already. Not closing it again");
            return;
        }
        try {
            connection.close();
        } catch (Exception e) {
            LOGGER.warn("Fail to close connection: {}", e.getMessage(), e);
        }
    }

    private void closeChannel(Channel channel) {
        if (channel == null || !channel.isOpen()) {
            LOGGER.warn("Channel is null or closed already. Not closing it again");
            return;
        }
        try {
            channel.close();
        } catch (Exception e) {
            LOGGER.warn("Fail to close channel: {}", e.getMessage(), e);
        }
    }

    private synchronized Channel borrowChannel(ConnectionType connectionType, Connection connection) throws Exception {
        if (!availableChannelPool.containsKey(connectionType)) {
            Channel orCreateChannel = getOrCreateChannel(connectionType, connection);
            LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType));
            return orCreateChannel;
        }
        Set<Channel> set = availableChannelPool.get(connectionType);
        if (set != null && set.isEmpty()) {
            Channel orCreateChannel2 = getOrCreateChannel(connectionType, connection);
            LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_CREATION_SUCCESS, connectionType));
            return orCreateChannel2;
        }
        Iterator<Channel> it = set.iterator();
        while (it.hasNext()) {
            Channel next = it.next();
            if (next != null && next.isOpen()) {
                it.remove();
                LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_BORROW_SUCCESS, connectionType));
                return next;
            }
            it.remove();
        }
        Channel orCreateChannel3 = getOrCreateChannel(connectionType, connection);
        LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RESET_SUCCESS, connectionType));
        return orCreateChannel3;
    }

    public synchronized void returnChannel(ConnectionType connectionType, Channel channel) throws Exception {
        if (channel == null || !channel.isOpen()) {
            channel = null;
        }
        Set<Channel> set = availableChannelPool.get(connectionType);
        if (set == null) {
            set = new HashSet();
            availableChannelPool.put(connectionType, set);
        }
        set.add(channel);
        LOGGER.info(String.format(AMQPConstants.INFO_CHANNEL_RETURN_SUCCESS, connectionType));
    }
}
