Class: MarchHare::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/channel.rb

Overview

Channels in RabbitMQ

To quote AMQP 0.9.1 specification:

AMQP 0.9.1 is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

Opening Channels

Channels can be opened either via MarchHare::Session#create_channel (sufficient in the majority of cases) or by instantiating MarchHare::Channel directly:

This will automatically allocate a channel id.

Closing Channels

Channels are closed via #close. Channels that get a channel-level exception are closed, too. Closed channels can no longer be used. Attempts to use them will raise ChannelAlreadyClosed.

Higher-level API

MarchHare offers two sets of methods on Channel: known as higher-level and lower-level APIs, respectively. Higher-level API mimics amqp gem API where exchanges and queues are objects (instance of Exchange and Queue, respectively). Lower-level API is built around AMQP 0.9.1 methods (commands), where queues and exchanges are passed as strings (à la RabbitMQ Java client, Langohr and Pika).

Queue Operations In Higher-level API

  • #queue is used to declare queues. The rest of the API is in Queue.

Exchange Operations In Higher-level API

Channel Qos (Prefetch Level)

It is possible to control how many messages at most a consumer will be given (before it acknowledges or rejects previously consumed ones). This setting is per channel and controlled via #prefetch.

Channel IDs

Channels are identified by their ids which are integers. MarchHare takes care of allocating and releasing them as channels are opened and closed. It is almost never necessary to specify channel ids explicitly.

There is a limit on the maximum number of channels per connection, usually 65536. Note that allocating channels is very cheap on both client and server so having tens, hundreds or even thousands of channels is possible.

Channels and Error Handling

Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).

With MarchHare, channel-level exceptions are raised as Ruby exceptions, for example, NotFound, that provide access to the underlying channel.close method information.

Examples:

conn = MarchHare.new
conn.start

ch   = conn.create_channel
ch  = conn.create_channel
ch.close

Handling 404 NOT_FOUND

begin
  ch.queue_delete("queue_that_should_not_exist#{rand}")
rescue MarchHare::NotFound => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
end

Handling 406 PRECONDITION_FAILED

begin
  ch2 = conn.create_channel
  q   = "rubymarchhare.examples.recovery.q#{rand}"

  ch2.queue_declare(q, :durable => false)
  ch2.queue_declare(q, :durable => true)
rescue MarchHare::PreconditionFailed => e
  puts "Channel-level exception! Code: #{e.channel_close.reply_code}, message: #{e.channel_close.reply_text}"
ensure
  conn.create_channel.queue_delete(q)
end

See Also:

Instance Attribute Summary (collapse)

Exchanges (collapse)

Queues (collapse)

basic.* (collapse)

Instance Method Summary (collapse)

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(selector, *args)



829
830
831
# File 'lib/march_hare/channel.rb', line 829

def method_missing(selector, *args)
  @delegate.__send__(selector, *args)
end

Instance Attribute Details

- (Array<MarchHare::Consumer>) consumers (readonly)

Consumers on this channel

Returns:

  • (Array<MarchHare::Consumer>)

    Consumers on this channel



118
119
120
# File 'lib/march_hare/channel.rb', line 118

def consumers
  @consumers
end

- (Object) recoveries_counter (readonly)

Returns the value of attribute recoveries_counter



262
263
264
# File 'lib/march_hare/channel.rb', line 262

def recoveries_counter
  @recoveries_counter
end

Instance Method Details

- (Object) ack(delivery_tag, multiple = false) Also known as: acknowledge

Acknowledges a message. Acknowledged messages are completely removed from the queue.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to acknowledge

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be acknowledged as well?

See Also:



625
626
627
628
629
# File 'lib/march_hare/channel.rb', line 625

def ack(delivery_tag, multiple = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_ack(delivery_tag.to_i, multiple)
  end
end

- (NilClass) basic_ack(delivery_tag, multiple)

Acknowledges one or more messages (deliveries).

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • multiple (Boolean)

    Should all deliveries up to this one be acknowledged?

Returns:

  • (NilClass)

    nil

See Also:



711
712
713
714
715
# File 'lib/march_hare/channel.rb', line 711

def basic_ack(delivery_tag, multiple)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_ack(delivery_tag.to_i, multiple)
  end
end

- (Object) basic_consume(queue, auto_ack, consumer)



583
584
585
586
587
588
589
590
591
# File 'lib/march_hare/channel.rb', line 583

def basic_consume(queue, auto_ack, consumer)
  consumer.auto_ack = auto_ack
  tag = converting_rjc_exceptions_to_ruby do
    @delegate.basic_consume(queue, auto_ack, consumer)
  end
  self.register_consumer(tag, consumer)

  tag
end

- (Object) basic_get(queue, auto_ack)



577
578
579
580
581
# File 'lib/march_hare/channel.rb', line 577

def basic_get(queue, auto_ack)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_get(queue, auto_ack)
  end
end

- (NilClass) basic_nack(delivery_tag, multiple = false, requeue = false)

Rejects or requeues messages just like #basic_reject but can do so with multiple messages at once.

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean) (defaults to: false)

    Should the message be requeued?

  • multiple (Boolean) (defaults to: false)

    Should all deliveries up to this one be rejected/requeued?

Returns:

  • (NilClass)

    nil

See Also:



727
728
729
730
731
# File 'lib/march_hare/channel.rb', line 727

def basic_nack(delivery_tag, multiple = false, requeue = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

- (MarchHare::Channel) basic_publish(exchange, routing_key, mandatory, properties, body)

Publishes a message using basic.publish AMQP 0.9.1 method.

Parameters:

  • exchange (String)

    Exchange to publish to

  • routing_key (String)

    Routing key

  • body (String)

    Message payload. It will never be modified by MarchHare or RabbitMQ in any way.

  • properties (Hash)

    Message properties

  • opts (Hash)

    a customizable set of options

Options Hash (properties):

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:



571
572
573
574
575
# File 'lib/march_hare/channel.rb', line 571

def basic_publish(exchange, routing_key, mandatory, properties, body)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_publish(exchange, routing_key, mandatory, false, BasicPropertiesBuilder.build_properties_from(properties || Hash.new), body)
  end
end

- (Object) basic_qos(prefetch_count)



593
594
595
596
597
598
599
600
# File 'lib/march_hare/channel.rb', line 593

def basic_qos(prefetch_count)
  r = converting_rjc_exceptions_to_ruby do
    @delegate.basic_qos(prefetch_count)
  end
  @prefetch_count = prefetch_count

  r
end

- (Object) basic_recover(requeue = true)

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean) (defaults to: true)

    Should messages be requeued?

Returns:

  • RabbitMQ response



737
738
739
740
741
# File 'lib/march_hare/channel.rb', line 737

def basic_recover(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover(requeue)
  end
end

- (Object) basic_recover_async(requeue = true)

Redeliver unacknowledged messages

Parameters:

  • requeue (Boolean) (defaults to: true)

    Should messages be requeued?

Returns:

  • RabbitMQ response



747
748
749
750
751
# File 'lib/march_hare/channel.rb', line 747

def basic_recover_async(requeue = true)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_recover_async(requeue)
  end
end

- (NilClass) basic_reject(delivery_tag, requeue)

Rejects or requeues a message.

Examples:

Requeue a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, true)
end

Reject a message

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
q.subscribe do |delivery_info, properties, payload|
  # requeue the message
  ch.basic_reject(delivery_info.delivery_tag, false)
end

Requeue a message fetched via basic.get

conn  = MarchHare.new
conn.start

ch    = conn.create_channel
# we assume the queue exists and has messages
delivery_info, properties, payload = ch.basic_get("bunny.examples.queue3", :ack => true)
ch.basic_reject(delivery_info.delivery_tag, true)

Parameters:

  • delivery_tag (Integer)

    Delivery tag obtained from delivery info

  • requeue (Boolean)

    Should the message be requeued?

Returns:

  • (NilClass)

    nil

See Also:



698
699
700
701
702
# File 'lib/march_hare/channel.rb', line 698

def basic_reject(delivery_tag, requeue)
  converting_rjc_exceptions_to_ruby do
    @delegate.basic_reject(delivery_tag.to_i, requeue)
  end
end

- (Object) channel_flow(active)

Enables or disables channel flow. This feature id deprecated in RabbitMQ.



811
812
813
814
815
# File 'lib/march_hare/channel.rb', line 811

def channel_flow(active)
  converting_rjc_exceptions_to_ruby do
    @delegate.channel_flow(active)
  end
end

- (Integer) channel_number Also known as: id, number

Channel id

Returns:

  • (Integer)

    Channel id



147
148
149
# File 'lib/march_hare/channel.rb', line 147

def channel_number
  @delegate.channel_number
end

- (Object) close(code = 200, reason = "Goodbye")

Closes the channel.

Closed channels can no longer be used. Closed channel id is returned back to the pool of available ids and may be used by a different channel opened later.



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/march_hare/channel.rb', line 158

def close(code = 200, reason = "Goodbye")
  v = @delegate.close(code, reason)

  @consumers.each do |tag, consumer|
    consumer.gracefully_shut_down
  end

  @connection.unregister_channel(self)

  v
end

- (NilClass) confirm_select

Enables publisher confirms on the channel.

Returns:

  • (NilClass)

    nil

See Also:



760
761
762
763
764
# File 'lib/march_hare/channel.rb', line 760

def confirm_select
  converting_rjc_exceptions_to_ruby do
    @delegate.confirm_select
  end
end

- (Object) default_exchange

Provides access to the default exchange



374
375
376
# File 'lib/march_hare/channel.rb', line 374

def default_exchange
  @default_exchange ||= self.exchange("", :durable => true, :auto_delete => false, :type => "direct")
end

- (MarchHare::Exchange) direct(name, opts = {})

Declares a direct exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



322
323
324
325
326
327
328
# File 'lib/march_hare/channel.rb', line 322

def direct(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "direct")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

- (MarchHare::Exchange) exchange(name, options = {})

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash)

    Exchange parameters

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :type (String, Symbol) — default: :direct

    Exchange type, e.g. :fanout or "x-consistent-hash"

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



280
281
282
283
284
285
286
# File 'lib/march_hare/channel.rb', line 280

def exchange(name, options={})
  dx = Exchange.new(self, name, options).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

- (Object) exchange_bind(destination, source, routing_key, arguments = nil)

Binds an exchange to another exchange using exchange.bind method (RabbitMQ extension)

Parameters:

  • desitnation (String)

    Destination exchange name

  • source (String)

    Source exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments

Returns:

  • RabbitMQ response

See Also:



404
405
406
# File 'lib/march_hare/channel.rb', line 404

def exchange_bind(destination, source, routing_key, arguments = nil)
  @delegate.exchange_bind(destination, source, routing_key, arguments)
end

- (Object) exchange_declare(name, type, durable = false, auto_delete = false, arguments = nil)

Declares a echange using echange.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Exchange name

  • durable (Boolean) (defaults to: false)

    (false) Should information about this echange be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived exchanges.

  • auto_delete (Boolean) (defaults to: false)

    (false) Should this echange be deleted when it is no longer used?

  • passive (Boolean)

    (false) If true, exchange will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • RabbitMQ response

See Also:



389
390
391
# File 'lib/march_hare/channel.rb', line 389

def exchange_declare(name, type, durable = false, auto_delete = false, arguments = nil)
  @delegate.exchange_declare(name, type, durable, auto_delete, arguments)
end

- (Object) exchange_unbind(destination, source, routing_key, arguments = nil)

Unbinds an exchange from another exchange using exchange.unbind method (RabbitMQ extension)

Parameters:

  • destination (String)

    Destination exchange name

  • source (String)

    Source exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    ({}) Optional arguments

Returns:

  • RabbitMQ response

See Also:



419
420
421
# File 'lib/march_hare/channel.rb', line 419

def exchange_unbind(destination, source, routing_key, arguments = nil)
  @delegate.exchange_unbind(destination, source, routing_key, arguments)
end

- (MarchHare::Exchange) fanout(name, opts = {})

Declares a fanout exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



301
302
303
304
305
306
307
# File 'lib/march_hare/channel.rb', line 301

def fanout(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "fanout")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

- (MarchHare::Exchange) headers(name, opts = {})

Declares a headers exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments

Returns:

See Also:



364
365
366
367
368
369
370
# File 'lib/march_hare/channel.rb', line 364

def headers(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "headers")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

- (Object) nack(delivery_tag, multiple = false, requeue = false)

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ. This method is similar to #reject but supports rejecting multiple messages at once, and is usually preferred.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • multiple (Boolean) (defaults to: false)

    (false) Should all unacknowledged messages up to this be rejected as well?

  • requeue (Boolean) (defaults to: false)

    (false) Should this message be requeued instead of dropping it?

See Also:



655
656
657
658
659
# File 'lib/march_hare/channel.rb', line 655

def nack(delivery_tag, multiple = false, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_nack(delivery_tag.to_i, multiple, requeue)
  end
end

- (Object) next_publisher_seq_no



784
785
786
# File 'lib/march_hare/channel.rb', line 784

def next_publisher_seq_no
  @delegate.next_publisher_seq_no
end

- (Object) on_confirm

Defines a publisher confirm handler



825
826
827
# File 'lib/march_hare/channel.rb', line 825

def on_confirm(&block)
  self.add_confirm_listener(BlockConfirmListener.from(block))
end

- (Object) on_return

Defines a returned message handler.



819
820
821
# File 'lib/march_hare/channel.rb', line 819

def on_return(&block)
  self.add_return_listener(BlockReturnListener.from(block))
end

- (Object) on_shutdown

Defines a shutdown event callback. Shutdown events are broadcasted when a channel is closed, either explicitly or forcefully, or due to a network/peer failure.



173
174
175
176
177
178
179
180
# File 'lib/march_hare/channel.rb', line 173

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)

  @shutdown_hooks << sh
  @delegate.add_shutdown_listener(sh)

  sh
end

- (Object) prefetch=(n)

Sets how many messages will be given to consumers on this channel before they have to acknowledge or reject one of the previously consumed messages

Parameters:

  • prefetch_count (Integer)

    Prefetch (QoS setting) for this channel

See Also:



615
616
617
# File 'lib/march_hare/channel.rb', line 615

def prefetch=(n)
  basic_qos(n)
end

- (Object) qos(options = {})



602
603
604
605
606
607
# File 'lib/march_hare/channel.rb', line 602

def qos(options={})
  if options.size == 1 && options[:prefetch_count]
  then basic_qos(options[:prefetch_count])
  else basic_qos(options.fetch(:prefetch_size, 0), options.fetch(:prefetch_count, 0), options.fetch(:global, false))
  end
end

- (MarchHare::Queue) queue(name, options = {})

Declares a queue or looks it up in the per-channel cache.

Parameters:

  • name (String)

    Queue name. Pass an empty string to declare a server-named queue (make RabbitMQ generate a unique name).

  • options (Hash) (defaults to: {})

    Queue properties and other options

Options Hash (options):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto-delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

See Also:



441
442
443
444
445
446
447
# File 'lib/march_hare/channel.rb', line 441

def queue(name, options={})
  dq = Queue.new(self, name, options).tap do |q|
    q.declare!
  end

  self.register_queue(dq)
end

- (Object) queue_bind(queue, exchange, routing_key, arguments = nil)

Binds a queue to an exchange using queue.bind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    (nil) Optional arguments

Returns:

  • RabbitMQ response

See Also:



508
509
510
511
512
# File 'lib/march_hare/channel.rb', line 508

def queue_bind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_bind(queue, exchange, routing_key, arguments)
  end
end

- (Object) queue_declare(name, durable, exclusive, auto_delete, arguments = {})

Declares a queue using queue.declare AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

  • durable (Boolean)

    (false) Should information about this queue be persisted to disk so that it can survive broker restarts? Typically set to true for long-lived queues.

  • auto_delete (Boolean)

    (false) Should this queue be deleted when the last consumer is cancelled?

  • exclusive (Boolean)

    (false) Should only this connection be able to use this queue? If true, the queue will be automatically deleted when this connection is closed

  • passive (Boolean)

    (false) If true, queue will be checked for existence. If it does not exist, NotFound will be raised.

Returns:

  • RabbitMQ response

See Also:



464
465
466
467
468
# File 'lib/march_hare/channel.rb', line 464

def queue_declare(name, durable, exclusive, auto_delete, arguments = {})
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare(name, durable, exclusive, auto_delete, arguments)
  end
end

- (Object) queue_declare_passive(name)

Checks if a queue exists using queue.declare AMQP 0.9.1 method. If it does not, a channel exception will be raised.

Parameters:

  • name (String)

    Queue name

See Also:



476
477
478
479
480
# File 'lib/march_hare/channel.rb', line 476

def queue_declare_passive(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_declare_passive(name)
  end
end

- (Object) queue_delete(name, if_empty = false, if_unused = false)

Deletes a queue using queue.delete AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • if_empty (Boolean) (defaults to: false)

    (false) Should this queue be deleted only if it has no messages?

  • if_unused (Boolean) (defaults to: false)

    (false) Should this queue be deleted only if it has no consumers?

Returns:

  • RabbitMQ response

See Also:



491
492
493
494
495
# File 'lib/march_hare/channel.rb', line 491

def queue_delete(name, if_empty = false, if_unused = false)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_delete(name, if_empty, if_unused)
  end
end

- (Object) queue_purge(name)

Purges a queue (removes all messages from it) using queue.purge AMQP 0.9.1 method.

Parameters:

  • name (String)

    Queue name

Returns:

  • RabbitMQ response

See Also:



537
538
539
540
541
# File 'lib/march_hare/channel.rb', line 537

def queue_purge(name)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_purge(name)
  end
end

- (Object) queue_unbind(queue, exchange, routing_key, arguments = nil)

Unbinds a queue from an exchange using queue.unbind AMQP 0.9.1 method

Parameters:

  • name (String)

    Queue name

  • exchange (String)

    Exchange name

  • routing_key (String)

    Routing key used for binding

  • arguments (Hash) (defaults to: nil)

    ({}) Optional arguments

Returns:

  • RabbitMQ response

See Also:



525
526
527
528
529
# File 'lib/march_hare/channel.rb', line 525

def queue_unbind(queue, exchange, routing_key, arguments = nil)
  converting_rjc_exceptions_to_ruby do
    @delegate.queue_unbind(queue, exchange, routing_key, arguments)
  end
end

- (Object) recover_consumers

Recovers consumers. Used by the Automatic Network Failure Recovery feature.



245
246
247
248
249
250
251
252
253
254
255
# File 'lib/march_hare/channel.rb', line 245

def recover_consumers
  @consumers.values.each do |c|
    begin
      self.unregister_consumer(c)
      c.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering consumer #{c.consumer_tag}"
    end
  end
end

- (Object) recover_exchanges

Recovers exchanges. Used by the Automatic Network Failure Recovery feature.



219
220
221
222
223
224
225
226
227
228
# File 'lib/march_hare/channel.rb', line 219

def recover_exchanges
  @exchanges.values.each do |x|
    begin
      x.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering exchange #{x.name}"
    end
  end
end

- (Object) recover_prefetch_setting

Recovers basic.qos setting. Used by the Automatic Network Failure Recovery feature.



212
213
214
# File 'lib/march_hare/channel.rb', line 212

def recover_prefetch_setting
  basic_qos(@prefetch_count) if @prefetch_count
end

- (Object) recover_queues

Recovers queues and bindings. Used by the Automatic Network Failure Recovery feature.



232
233
234
235
236
237
238
239
240
241
# File 'lib/march_hare/channel.rb', line 232

def recover_queues
  @queues.values.each do |q|
    begin
      q.recover_from_network_failure
    rescue Exception => e
      # TODO: logger
      $stderr.puts "Caught exception when recovering queue #{q.name}"
    end
  end
end

- (Object) reject(delivery_tag, requeue = false)

Rejects a message. A rejected message can be requeued or dropped by RabbitMQ.

Parameters:

  • delivery_tag (Integer)

    Delivery tag to reject

  • requeue (Boolean) (defaults to: false)

    Should this message be requeued instead of dropping it?

See Also:



640
641
642
643
644
# File 'lib/march_hare/channel.rb', line 640

def reject(delivery_tag, requeue = false)
  guarding_against_stale_delivery_tags(delivery_tag) do
    basic_reject(delivery_tag.to_i, requeue)
  end
end

- (MarchHare::Session) session Also known as: client, connection

Connection this channel is on

Returns:



140
141
142
# File 'lib/march_hare/channel.rb', line 140

def session
  @connection
end

- (MarchHare::Exchange) topic(name, opts = {})

Declares a topic exchange or looks it up in the cache of previously declared exchanges.

Parameters:

  • name (String)

    Exchange name

  • opts (Hash) (defaults to: {})

    Exchange parameters

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should the exchange be durable?

  • :auto_delete (Boolean) — default: false

    Should the exchange be automatically deleted when no longer in use?

  • :arguments (Hash) — default: {}

    Optional exchange arguments (used by RabbitMQ extensions)

Returns:

See Also:



343
344
345
346
347
348
349
# File 'lib/march_hare/channel.rb', line 343

def topic(name, opts = {})
  dx = Exchange.new(self, name, opts.merge(:type => "topic")).tap do |x|
    x.declare!
  end

  self.register_exchange(dx)
end

- (Object) tx_commit

Commits a transaction



796
797
798
799
800
# File 'lib/march_hare/channel.rb', line 796

def tx_commit
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_commit
  end
end

- (Object) tx_rollback

Rolls back a transaction



803
804
805
806
807
# File 'lib/march_hare/channel.rb', line 803

def tx_rollback
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_rollback
  end
end

- (Object) tx_select

Enables transactions on the channel



789
790
791
792
793
# File 'lib/march_hare/channel.rb', line 789

def tx_select
  converting_rjc_exceptions_to_ruby do
    @delegate.tx_select
  end
end

- (Boolean) wait_for_confirms(timeout = nil)

Waits until all outstanding publisher confirms arrive.

Takes an optional timeout in milliseconds. Will raise an exception in timeout has occured.

Parameters:

  • timeout (Integer) (defaults to: nil)

    Timeout in milliseconds

Returns:

  • (Boolean)

    true if all confirms were positive, false if some were negative



774
775
776
777
778
779
780
781
782
# File 'lib/march_hare/channel.rb', line 774

def wait_for_confirms(timeout = nil)
  if timeout
    converting_rjc_exceptions_to_ruby do
      @delegate.wait_for_confirms(timeout)
    end
  else
    @delegate.wait_for_confirms
  end
end