package com.netflix.conductor.contribs.queue.amqp;

import com.google.common.collect.Maps;
import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties;
import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
import com.netflix.conductor.contribs.queue.amqp.util.AMQPConstants;
import com.netflix.conductor.contribs.queue.amqp.util.AMQPSettings;
import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.metrics.Monitors;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

/* loaded from: input_file:com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.class */
public class AMQPObservableQueue implements ObservableQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(AMQPObservableQueue.class);
    private final AMQPSettings settings;
    private final AMQPRetryPattern retrySettings;
    private final int batchSize;
    private final boolean useExchange;
    private int pollTimeInMS;
    private AMQPConnection amqpConnection;
    private volatile boolean running;
    private final String QUEUE_TYPE = "x-queue-type";
    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    /* loaded from: input_file:com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue$Builder.class */
    public static class Builder {
        private final int batchSize;
        private final int pollTimeInMS;
        private final AMQPEventQueueProperties properties;
        private final Address[] addresses = buildAddressesFromHosts();
        private final ConnectionFactory factory = buildConnectionFactory();

        public Builder(AMQPEventQueueProperties aMQPEventQueueProperties) {
            this.properties = aMQPEventQueueProperties;
            this.batchSize = aMQPEventQueueProperties.getBatchSize();
            this.pollTimeInMS = (int) aMQPEventQueueProperties.getPollTimeDuration().toMillis();
        }

        private Address[] buildAddressesFromHosts() {
            String hosts = this.properties.getHosts();
            if (StringUtils.isEmpty(hosts)) {
                throw new IllegalArgumentException("Hosts are undefined");
            }
            return Address.parseAddresses(hosts);
        }

        private ConnectionFactory buildConnectionFactory() {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            String username = this.properties.getUsername();
            if (StringUtils.isEmpty(username)) {
                throw new IllegalArgumentException("Username is null or empty");
            }
            connectionFactory.setUsername(username);
            String password = this.properties.getPassword();
            if (StringUtils.isEmpty(password)) {
                throw new IllegalArgumentException("Password is null or empty");
            }
            connectionFactory.setPassword(password);
            String virtualHost = this.properties.getVirtualHost();
            if (StringUtils.isEmpty(virtualHost)) {
                throw new IllegalArgumentException("Virtual host is null or empty");
            }
            connectionFactory.setVirtualHost(virtualHost);
            int port = this.properties.getPort();
            if (port <= 0) {
                throw new IllegalArgumentException("Port must be greater than 0");
            }
            connectionFactory.setPort(port);
            if (this.properties.isUseNio()) {
                connectionFactory.useNio();
            }
            if (this.properties.isUseSslProtocol()) {
                try {
                    connectionFactory.useSslProtocol();
                } catch (KeyManagementException | NoSuchAlgorithmException e) {
                    throw new IllegalArgumentException("Invalid sslProtocol ", e);
                }
            }
            connectionFactory.setConnectionTimeout(this.properties.getConnectionTimeoutInMilliSecs());
            connectionFactory.setRequestedHeartbeat(this.properties.getRequestHeartbeatTimeoutInSecs());
            connectionFactory.setNetworkRecoveryInterval(this.properties.getNetworkRecoveryIntervalInMilliSecs());
            connectionFactory.setHandshakeTimeout(this.properties.getHandshakeTimeoutInMilliSecs());
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setTopologyRecoveryEnabled(true);
            connectionFactory.setRequestedChannelMax(this.properties.getMaxChannelCount());
            return connectionFactory;
        }

        public AMQPObservableQueue build(boolean z, String str) {
            return new AMQPObservableQueue(this.factory, this.addresses, z, new AMQPSettings(this.properties).fromURI(str), new AMQPRetryPattern(this.properties.getLimit(), this.properties.getDuration(), this.properties.getType()), this.batchSize, this.pollTimeInMS);
        }
    }

    public AMQPObservableQueue(ConnectionFactory connectionFactory, Address[] addressArr, boolean z, AMQPSettings aMQPSettings, AMQPRetryPattern aMQPRetryPattern, int i, int i2) {
        if (connectionFactory == null) {
            throw new IllegalArgumentException("Connection factory is undefined");
        }
        if (addressArr == null || addressArr.length == 0) {
            throw new IllegalArgumentException("Addresses are undefined");
        }
        if (aMQPSettings == null) {
            throw new IllegalArgumentException("Settings are undefined");
        }
        if (i <= 0) {
            throw new IllegalArgumentException("Batch size must be greater than 0");
        }
        if (i2 <= 0) {
            throw new IllegalArgumentException("Poll time must be greater than 0 ms");
        }
        this.useExchange = z;
        this.settings = aMQPSettings;
        this.batchSize = i;
        this.amqpConnection = AMQPConnection.getInstance(connectionFactory, addressArr, aMQPRetryPattern);
        this.retrySettings = aMQPRetryPattern;
        setPollTimeInMS(i2);
    }

    public Observable<Message> observe() {
        Observable.OnSubscribe onSubscribe;
        if (this.settings.isSequentialProcessing()) {
            LOGGER.info("Subscribing for the message processing on schedule basis");
            receiveMessages();
            onSubscribe = subscriber -> {
                Observable flatMap = Observable.interval(this.pollTimeInMS, TimeUnit.MILLISECONDS).flatMap(l -> {
                    if (!isRunning()) {
                        LOGGER.debug("Component stopped, skip listening for messages from RabbitMQ");
                        return Observable.from(Collections.emptyList());
                    }
                    LinkedList linkedList = new LinkedList();
                    this.messages.drainTo(linkedList);
                    if (!linkedList.isEmpty()) {
                        AtomicInteger atomicInteger = new AtomicInteger(0);
                        StringBuilder sb = new StringBuilder();
                        linkedList.forEach(message -> {
                            sb.append(message.getId()).append("=").append(message.getPayload());
                            atomicInteger.incrementAndGet();
                            if (atomicInteger.get() < linkedList.size()) {
                                sb.append(",");
                            }
                        });
                        LOGGER.info(String.format("Batch from %s to conductor is %s", this.settings.getQueueOrExchangeName(), sb.toString()));
                    }
                    return Observable.from(linkedList);
                });
                Objects.requireNonNull(subscriber);
                Action1 action1 = (v1) -> {
                    r1.onNext(v1);
                };
                Objects.requireNonNull(subscriber);
                flatMap.subscribe(action1, subscriber::onError);
            };
            LOGGER.info("Subscribed for the message processing on schedule basis");
        } else {
            onSubscribe = subscriber2 -> {
                LOGGER.info("Subscribing for the event based AMQP message processing");
                receiveMessages(subscriber2);
                LOGGER.info("Subscribed for the event based AMQP message processing");
            };
        }
        return Observable.create(onSubscribe);
    }

    public String getType() {
        return this.useExchange ? AMQPConstants.AMQP_EXCHANGE_TYPE : AMQPConstants.AMQP_QUEUE_TYPE;
    }

    public String getName() {
        return this.settings.getEventName();
    }

    public String getURI() {
        return this.settings.getQueueOrExchangeName();
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public AMQPSettings getSettings() {
        return this.settings;
    }

    public Address[] getAddresses() {
        return this.amqpConnection.getAddresses();
    }

    public List<String> ack(List<Message> list) {
        ArrayList arrayList = new ArrayList();
        for (Message message : list) {
            try {
                ackMsg(message);
            } catch (Exception e) {
                LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e);
                arrayList.add(message.getReceipt());
            }
        }
        return arrayList;
    }

    public void ackMsg(Message message) throws Exception {
        int i = 1;
        while (true) {
            try {
                LOGGER.info("ACK message with delivery tag {}", message.getReceipt());
                this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicAck(Long.parseLong(message.getReceipt()), false);
                LOGGER.info("Ack'ed the message with delivery tag {}", message.getReceipt());
                return;
            } catch (Exception e) {
                AMQPRetryPattern aMQPRetryPattern = this.retrySettings;
                if (aMQPRetryPattern == null) {
                    LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e);
                    throw e;
                }
                try {
                    aMQPRetryPattern.continueOrPropogate(e, i);
                    i++;
                } catch (Exception e2) {
                    LOGGER.error("Retries completed. Cannot ACK message with delivery tag {}", message.getReceipt(), e);
                    throw e2;
                }
            }
        }
    }

    public void nack(List<Message> list) {
        for (Message message : list) {
            int i = 1;
            while (true) {
                try {
                    LOGGER.info("NACK message with delivery tag {}", message.getReceipt());
                    this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicNack(Long.parseLong(message.getReceipt()), false, false);
                    LOGGER.info("Nack'ed the message with delivery tag {}", message.getReceipt());
                    break;
                } catch (Exception e) {
                    AMQPRetryPattern aMQPRetryPattern = this.retrySettings;
                    if (aMQPRetryPattern == null) {
                        LOGGER.error("Cannot NACK message with delivery tag {}", message.getReceipt(), e);
                    }
                    try {
                        aMQPRetryPattern.continueOrPropogate(e, i);
                        i++;
                    } catch (Exception e2) {
                        LOGGER.error("Retries completed. Cannot NACK message with delivery tag {}", message.getReceipt(), e);
                    }
                }
            }
        }
    }

    private static AMQP.BasicProperties buildBasicProperties(Message message, AMQPSettings aMQPSettings) {
        return new AMQP.BasicProperties.Builder().messageId(StringUtils.isEmpty(message.getId()) ? UUID.randomUUID().toString() : message.getId()).correlationId(StringUtils.isEmpty(message.getReceipt()) ? UUID.randomUUID().toString() : message.getReceipt()).contentType(aMQPSettings.getContentType()).contentEncoding(aMQPSettings.getContentEncoding()).deliveryMode(Integer.valueOf(aMQPSettings.getDeliveryMode())).build();
    }

    private void publishMessage(Message message, String str, String str2) {
        Channel channel = null;
        int i = 1;
        while (true) {
            try {
                try {
                    String payload = message.getPayload();
                    channel = this.amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, getSettings().getQueueOrExchangeName());
                    channel.basicPublish(str, str2, buildBasicProperties(message, this.settings), payload.getBytes(this.settings.getContentEncoding()));
                    LOGGER.info(String.format("Published message to %s: %s", str, payload));
                    if (channel != null) {
                        try {
                            this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel);
                            return;
                        } catch (Exception e) {
                            LOGGER.error("Failed to return the channel of {}. {}", ConnectionType.PUBLISHER, e);
                            return;
                        }
                    }
                    return;
                } catch (Exception e2) {
                    AMQPRetryPattern aMQPRetryPattern = this.retrySettings;
                    if (aMQPRetryPattern == null) {
                        LOGGER.error("Failed to publish message {} to {}", new Object[]{message.getPayload(), str, e2});
                        throw new RuntimeException(e2);
                    }
                    try {
                        aMQPRetryPattern.continueOrPropogate(e2, i);
                        i++;
                        if (channel != null) {
                            try {
                                this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel);
                            } catch (Exception e3) {
                                LOGGER.error("Failed to return the channel of {}. {}", ConnectionType.PUBLISHER, e3);
                            }
                        }
                    } catch (Exception e4) {
                        LOGGER.error("Retries completed. Failed to publish message {} to {}", new Object[]{message.getPayload(), str, e2});
                        throw new RuntimeException(e2);
                    }
                }
            } catch (Throwable th) {
                if (channel != null) {
                    try {
                        this.amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel);
                    } catch (Exception e5) {
                        LOGGER.error("Failed to return the channel of {}. {}", ConnectionType.PUBLISHER, e5);
                    }
                }
                throw th;
            }
        }
    }

    public void publish(List<Message> list) {
        String str;
        String queue;
        try {
            if (this.useExchange) {
                getOrCreateExchange(ConnectionType.PUBLISHER, this.settings.getQueueOrExchangeName(), this.settings.getExchangeType(), this.settings.isDurable(), this.settings.autoDelete(), this.settings.getArguments());
                str = this.settings.getQueueOrExchangeName();
                queue = this.settings.getRoutingKey();
            } else {
                str = "";
                queue = getOrCreateQueue(ConnectionType.PUBLISHER, this.settings.getQueueOrExchangeName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), this.settings.getArguments()).getQueue();
            }
            String str2 = str;
            String str3 = queue;
            list.forEach(message -> {
                publishMessage(message, str2, str3);
            });
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            LOGGER.error("Failed to publish messages: {}", e2.getMessage(), e2);
            throw new RuntimeException(e2);
        }
    }

    public void setUnackTimeout(Message message, long j) {
        throw new UnsupportedOperationException();
    }

    public long size() {
        Channel channel = null;
        try {
            try {
                channel = this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName());
                long messageCount = channel.messageCount(this.settings.getQueueOrExchangeName());
                if (channel != null) {
                    try {
                        this.amqpConnection.returnChannel(ConnectionType.SUBSCRIBER, channel);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                return messageCount;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            if (channel != null) {
                try {
                    this.amqpConnection.returnChannel(ConnectionType.SUBSCRIBER, channel);
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
            throw th;
        }
    }

    public void close() {
        this.amqpConnection.close();
    }

    public void start() {
        LOGGER.info("Started listening to {}:{}", getClass().getSimpleName(), this.settings.getQueueOrExchangeName());
        this.running = true;
    }

    public void stop() {
        LOGGER.info("Stopped listening to {}:{}", getClass().getSimpleName(), this.settings.getQueueOrExchangeName());
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType) throws Exception {
        return getOrCreateExchange(connectionType, this.settings.getQueueOrExchangeName(), this.settings.getExchangeType(), this.settings.isDurable(), this.settings.autoDelete(), this.settings.getArguments());
    }

    private AMQP.Exchange.DeclareOk getOrCreateExchange(ConnectionType connectionType, String str, String str2, boolean z, boolean z2, Map<String, Object> map) throws Exception {
        if (StringUtils.isEmpty(str)) {
            throw new RuntimeException("Exchange name is undefined");
        }
        if (StringUtils.isEmpty(str2)) {
            throw new RuntimeException("Exchange type is undefined");
        }
        Channel channel = null;
        try {
            try {
                LOGGER.debug("Creating exchange {} of type {}", str, str2);
                channel = this.amqpConnection.getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName());
                AMQP.Exchange.DeclareOk exchangeDeclare = channel.exchangeDeclare(str, str2, z, z2, map);
                if (channel != null) {
                    try {
                        this.amqpConnection.returnChannel(connectionType, channel);
                    } catch (Exception e) {
                        LOGGER.error("Failed to return the channel of {}. {}", connectionType, e);
                    }
                }
                return exchangeDeclare;
            } catch (Exception e2) {
                LOGGER.warn("Failed to create exchange {} of type {}", new Object[]{str, str2, e2});
                throw e2;
            }
        } catch (Throwable th) {
            if (channel != null) {
                try {
                    this.amqpConnection.returnChannel(connectionType, channel);
                } catch (Exception e3) {
                    LOGGER.error("Failed to return the channel of {}. {}", connectionType, e3);
                }
            }
            throw th;
        }
    }

    private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType) throws Exception {
        return getOrCreateQueue(connectionType, this.settings.getQueueOrExchangeName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), this.settings.getArguments());
    }

    private AMQP.Queue.DeclareOk getOrCreateQueue(ConnectionType connectionType, String str, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws Exception {
        if (StringUtils.isEmpty(str)) {
            throw new RuntimeException("Queue name is undefined");
        }
        map.put("x-queue-type", this.settings.getQueueType());
        Channel channel = null;
        try {
            try {
                LOGGER.debug("Creating queue {}", str);
                channel = this.amqpConnection.getOrCreateChannel(connectionType, getSettings().getQueueOrExchangeName());
                AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(str, z, z2, z3, map);
                if (channel != null) {
                    try {
                        this.amqpConnection.returnChannel(connectionType, channel);
                    } catch (Exception e) {
                        LOGGER.error("Failed to return the channel of {}. {}", connectionType, e);
                    }
                }
                return queueDeclare;
            } catch (Exception e2) {
                LOGGER.warn("Failed to create queue {}", str, e2);
                throw e2;
            }
        } catch (Throwable th) {
            if (channel != null) {
                try {
                    this.amqpConnection.returnChannel(connectionType, channel);
                } catch (Exception e3) {
                    LOGGER.error("Failed to return the channel of {}. {}", connectionType, e3);
                }
            }
            throw th;
        }
    }

    private static Message asMessage(AMQPSettings aMQPSettings, GetResponse getResponse) throws Exception {
        if (getResponse == null) {
            return null;
        }
        Message message = new Message();
        message.setId(getResponse.getProps().getMessageId());
        message.setPayload(new String(getResponse.getBody(), aMQPSettings.getContentEncoding()));
        message.setReceipt(String.valueOf(getResponse.getEnvelope().getDeliveryTag()));
        return message;
    }

    private void receiveMessagesFromQueue(String str) throws Exception {
        LOGGER.debug("Accessing channel for queue {}", str);
        this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicConsume(str, false, new DefaultConsumer(this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())) { // from class: com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.1
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                try {
                    Message asMessage = AMQPObservableQueue.asMessage(AMQPObservableQueue.this.settings, new GetResponse(envelope, basicProperties, bArr, Integer.MAX_VALUE));
                    if (asMessage != null) {
                        if (AMQPObservableQueue.LOGGER.isDebugEnabled()) {
                            AMQPObservableQueue.LOGGER.debug("Got message with ID {} and receipt {}", asMessage.getId(), asMessage.getReceipt());
                        }
                        AMQPObservableQueue.this.messages.add(asMessage);
                        AMQPObservableQueue.LOGGER.info("receiveMessagesFromQueue- End method {}", AMQPObservableQueue.this.messages);
                    }
                } catch (InterruptedException e) {
                    AMQPObservableQueue.LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", str2, e);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    AMQPObservableQueue.LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", str2, e2);
                }
            }

            public void handleCancel(String str2) throws IOException {
                AMQPObservableQueue.LOGGER.error("Recieved a consumer cancel notification for subscriber {}", str2);
            }
        });
        Monitors.recordEventQueueMessagesProcessed(getType(), str, this.messages.size());
    }

    private void receiveMessagesFromQueue(String str, final Subscriber<? super Message> subscriber) throws Exception {
        LOGGER.debug("Accessing channel for queue {}", str);
        this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicConsume(str, false, new DefaultConsumer(this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())) { // from class: com.netflix.conductor.contribs.queue.amqp.AMQPObservableQueue.2
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                try {
                    Message asMessage = AMQPObservableQueue.asMessage(AMQPObservableQueue.this.settings, new GetResponse(envelope, basicProperties, bArr, Integer.MAX_VALUE));
                    if (asMessage == null) {
                        return;
                    }
                    AMQPObservableQueue.LOGGER.info("Got message with ID {} and receipt {}", asMessage.getId(), asMessage.getReceipt());
                    AMQPObservableQueue.LOGGER.debug("Message content {}", asMessage);
                    Subscriber subscriber2 = subscriber;
                    new Thread(() -> {
                        AMQPObservableQueue.LOGGER.info("Spawning a new thread for message with ID {}", asMessage.getId());
                        subscriber2.onNext(asMessage);
                    }).start();
                } catch (InterruptedException e) {
                    AMQPObservableQueue.LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", str2, e);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    AMQPObservableQueue.LOGGER.error("Issue in handling the mesages for the subscriber with consumer tag {}. {}", str2, e2);
                }
            }

            public void handleCancel(String str2) throws IOException {
                AMQPObservableQueue.LOGGER.error("Recieved a consumer cancel notification for subscriber {}", str2);
            }
        });
    }

    protected void receiveMessages() {
        String queue;
        try {
            this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicQos(this.batchSize);
            if (this.useExchange) {
                getOrCreateExchange(ConnectionType.SUBSCRIBER);
                queue = getOrCreateQueue(ConnectionType.SUBSCRIBER, this.settings.getExchangeBoundQueueName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), Maps.newHashMap()).getQueue();
                this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).queueBind(queue, this.settings.getQueueOrExchangeName(), this.settings.getRoutingKey());
            } else {
                queue = getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue();
            }
            LOGGER.info("Consuming from queue {}", queue);
            receiveMessagesFromQueue(queue);
        } catch (Exception e) {
            LOGGER.error("Exception while getting messages from RabbitMQ", e);
            Monitors.recordObservableQMessageReceivedErrors(getType());
        }
    }

    protected void receiveMessages(Subscriber<? super Message> subscriber) {
        String queue;
        try {
            this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName()).basicQos(this.batchSize);
            if (this.useExchange) {
                getOrCreateExchange(ConnectionType.SUBSCRIBER);
                queue = getOrCreateQueue(ConnectionType.SUBSCRIBER, this.settings.getExchangeBoundQueueName(), this.settings.isDurable(), this.settings.isExclusive(), this.settings.autoDelete(), Maps.newHashMap()).getQueue();
                this.amqpConnection.getOrCreateChannel(ConnectionType.SUBSCRIBER, this.settings.getQueueOrExchangeName()).queueBind(queue, this.settings.getQueueOrExchangeName(), this.settings.getRoutingKey());
            } else {
                queue = getOrCreateQueue(ConnectionType.SUBSCRIBER).getQueue();
            }
            LOGGER.info("Consuming from queue {}", queue);
            receiveMessagesFromQueue(queue, subscriber);
        } catch (Exception e) {
            LOGGER.error("Exception while getting messages from RabbitMQ", e);
            Monitors.recordObservableQMessageReceivedErrors(getType());
        }
    }

    public int getPollTimeInMS() {
        return this.pollTimeInMS;
    }

    public void setPollTimeInMS(int i) {
        this.pollTimeInMS = i;
    }
}
