Class: MarchHare::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/queue.rb

Overview

Represents AMQP 0.9.1 queue.

See Also:

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Queue) initialize(channel, name, options = {})

A new instance of Queue

Parameters:

  • channel_or_connection (MarchHare::Channel)

    Channel this queue will use.

  • name (String)

    Queue name. Pass an empty string to make RabbitMQ generate a unique one.

  • opts (Hash)

    Queue properties

See Also:



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/march_hare/queue.rb', line 31

def initialize(channel, name, options={})
  @channel = channel
  @name = name
  @options = {:durable => false, :exclusive => false, :auto_delete => false, :passive => false, :arguments => Hash.new}.merge(options)

  @durable      = @options[:durable]
  @exclusive    = @options[:exclusive]
  @server_named = @name.empty?
  @auto_delete  = @options[:auto_delete]
  @arguments    = @options[:arguments]

  @bindings     = Set.new
end

Instance Attribute Details

- (MarchHare::Channel) channel (readonly)

Channel this queue uses

Returns:



15
16
17
# File 'lib/march_hare/queue.rb', line 15

def channel
  @channel
end

- (String) name (readonly)

Queue name

Returns:

  • (String)

    Queue name



17
18
19
# File 'lib/march_hare/queue.rb', line 17

def name
  @name
end

Instance Method Details

- (Hash) arguments

Additional optional arguments (typically used by RabbitMQ extensions and plugins)

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



71
72
73
# File 'lib/march_hare/queue.rb', line 71

def arguments
  @arguments
end

- (Boolean) auto_delete?

True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

See Also:



60
61
62
# File 'lib/march_hare/queue.rb', line 60

def auto_delete?
  @auto_delete
end

- (Object) bind(exchange, options = {})

Binds queue to an exchange

Parameters:

  • exchange (MarchHare::Exchange, String)

    Exchange to bind to

  • options (Hash) (defaults to: {})

    Binding properties

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/march_hare/queue.rb', line 87

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_bind(@name, exchange_name, (options[:routing_key] || options[:key] || ""), options[:arguments])

  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key]), :arguments => options[:arguments] }
  @bindings << binding unless @bindings.include?(binding)

  self
end

- (Object) build_consumer(opts = {})



155
156
157
158
159
160
161
# File 'lib/march_hare/queue.rb', line 155

def build_consumer(opts = {}, &block)
  if opts[:block] || opts[:blocking]
    BlockingCallbackConsumer.new(@channel, self, opts[:buffer_size], opts, block)
  else
    CallbackConsumer.new(@channel, self, opts, block)
  end
end

- (Integer) consumer_count

How many active consumers the queue has

Returns:

  • (Integer)

    How many active consumers the queue has



206
207
208
209
# File 'lib/march_hare/queue.rb', line 206

def consumer_count
  response = @channel.queue_declare_passive(@name)
  response.consumer_count
end

- (Object) delete(if_unused = false, if_empty = false)

Deletes the queue

Parameters:

  • [Boolean] (Hash)

    a customizable set of options

See Also:



133
134
135
# File 'lib/march_hare/queue.rb', line 133

def delete(if_unused = false, if_empty = false)
  @channel.queue_delete(@name, if_unused, if_empty)
end

- (Boolean) durable?

True if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).

See Also:



48
49
50
# File 'lib/march_hare/queue.rb', line 48

def durable?
  @durable
end

- (Boolean) exclusive?

True if this queue was declared as exclusive (limited to just one consumer)

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)

See Also:



54
55
56
# File 'lib/march_hare/queue.rb', line 54

def exclusive?
  @exclusive
end

- (Object) get(options = {:block => false}) Also known as: pop



144
145
146
147
148
149
150
151
152
# File 'lib/march_hare/queue.rb', line 144

def get(options = {:block => false})
  response = @channel.basic_get(@name, !options.fetch(:ack, false))

  if response
    [Headers.new(@channel, nil, response.envelope, response.props), String.from_java_bytes(response.body)]
  else
    nil
  end
end

- (Integer) message_count

How many messages the queue has ready (e.g. not delivered but not unacknowledged)

Returns:

  • (Integer)

    How many messages the queue has ready (e.g. not delivered but not unacknowledged)



200
201
202
203
# File 'lib/march_hare/queue.rb', line 200

def message_count
  response = @channel.queue_declare_passive(@name)
  response.message_count
end

- (Boolean) predefined?

True if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)

Returns:

  • (Boolean)

    true if this queue is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



228
229
230
# File 'lib/march_hare/queue.rb', line 228

def predefined?
  @name.start_with?("amq.")
end

- (Object) publish(payload, opts = {})

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



216
217
218
219
220
# File 'lib/march_hare/queue.rb', line 216

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

- (Object) purge

Purges a queue (removes all messages from it)



140
141
142
# File 'lib/march_hare/queue.rb', line 140

def purge
  @channel.queue_purge(@name)
end

- (Boolean) server_named?

True if this queue was declared as server named.

Returns:

  • (Boolean)

    true if this queue was declared as server named.

See Also:



66
67
68
# File 'lib/march_hare/queue.rb', line 66

def server_named?
  @server_named
end

- (Array<Integer>) status

A pair with information about the number of queue messages and consumers

Returns:

  • (Array<Integer>)

    A pair with information about the number of queue messages and consumers

See Also:



194
195
196
197
# File 'lib/march_hare/queue.rb', line 194

def status
  response = @channel.queue_declare_passive(@name)
  [response.message_count, response.consumer_count]
end

- (Object) subscribe(opts = {})

Adds a consumer to the queue (subscribes for message deliveries).

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let MarchHare generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



176
177
178
# File 'lib/march_hare/queue.rb', line 176

def subscribe(opts = {}, &block)
  subscribe_with(build_consumer(opts, &block), opts)
end

- (Object) subscribe_with(consumer, opts = {})



180
181
182
183
184
185
186
187
188
189
# File 'lib/march_hare/queue.rb', line 180

def subscribe_with(consumer, opts = {})
  @consumer_tag     = @channel.basic_consume(@name, !(opts[:ack] || opts[:manual_ack]), consumer)
  consumer.consumer_tag = @consumer_tag

  @default_consumer = consumer
  @channel.register_consumer(@consumer_tag, consumer)

  consumer.start
  consumer
end

- (Object) unbind(exchange, options = {})

Unbinds queue from an exchange

Parameters:

  • exchange (MarchHare::Exchange, String)

    Exchange to unbind from

  • options (Hash) (defaults to: {})

    Binding properties

Options Hash (options):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/march_hare/queue.rb', line 113

def unbind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then
                    exchange.name
                  else
                    exchange.to_s
                  end
  @channel.queue_unbind(@name, exchange_name, options.fetch(:routing_key, ''))

  binding = { :exchange => exchange_name, :routing_key => (options[:routing_key] || options[:key] || ""), :arguments => options[:arguments] }
  @bindings.delete(binding) unless @bindings.include?(binding)

  self
end