Class: MarchHare::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

See Also:

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Exchange) initialize(channel, name, options = {})

Instantiates a new exchange.

Parameters:

  • channel (Channel)

    Channel to declare exchange on

Raises:

  • (ArgumentError)

See Also:



38
39
40
41
42
43
44
45
46
47
# File 'lib/march_hare/exchange.rb', line 38

def initialize(channel, name, options = {})
  raise ArgumentError, "exchange channel cannot be nil" if channel.nil?
  raise ArgumentError, "exchange name cannot be nil" if name.nil?
  raise ArgumentError, "exchange :type must be specified as an option" if options[:type].nil?

  @channel = channel
  @name    = name
  @type    = options[:type]
  @options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false, :passive => false}.merge(options)
end

Instance Attribute Details

- (MarchHare::Channel) channel (readonly)

Channel this exchange object uses

Returns:



15
16
17
# File 'lib/march_hare/exchange.rb', line 15

def channel
  @channel
end

- (String) name (readonly)

Exchange name

Returns:

  • (String)

    Exchange name



13
14
15
# File 'lib/march_hare/exchange.rb', line 13

def name
  @name
end

- (Symbol) type (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


19
20
21
# File 'lib/march_hare/exchange.rb', line 19

def type
  @type
end

Instance Method Details

- (MarchHare::Exchange) bind(exchange, options = {})

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • exchange (String)

    Source exchange name

  • options (Hash) (defaults to: {})

    Options

  • opts (Hash)

    a customizable set of options

Returns:

See Also:



110
111
112
113
# File 'lib/march_hare/exchange.rb', line 110

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_bind(@name, exchange_name, options.fetch(:routing_key, ''))
end

- (Object) delete(options = {})

Deletes the exchange unless it is predefined

Parameters:

  • options (Hash) (defaults to: {})

    Options

  • opts (Hash)

    a customizable set of options

See Also:



91
92
93
94
# File 'lib/march_hare/exchange.rb', line 91

def delete(options={})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, options.fetch(:if_unused, false)) unless predefined?
end

- (Boolean) predefined?

True if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



135
136
137
# File 'lib/march_hare/exchange.rb', line 135

def predefined?
  @name.empty? || @name.start_with?("amq.")
end

- (MarchHare::Exchange) publish(body, opts = {})

Publishes a message

Parameters:

  • payload (String)

    Message payload. It will never be modified by MarchHare or RabbitMQ in any way.

  • opts (Hash) (defaults to: {})

    Message properties (metadata) and delivery settings

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :properties (Hash)

    Messages and delivery properties

    • :timestamp (Integer) A timestamp associated with this message
    • :expiration (Integer) Expiration time after which the message will be deleted
    • :type (String) Message type, e.g. what type of event or command this message represents. Can be any string
    • :reply_to (String) Queue name other apps should send the response to
    • :content_type (String) Message content type (e.g. application/json)
    • :content_encoding (String) Message content encoding (e.g. gzip)
    • :correlation_id (String) Message correlated to this one, e.g. what request this message is a reply for
    • :priority (Integer) Message priority, 0 to 9. Not used by RabbitMQ, only applications
    • :message_id (String) Any message identifier
    • :user_id (String) Optional user ID. Verified by RabbitMQ against the actual connection username
    • :app_id (String) Optional application ID

Returns:

See Also:



74
75
76
77
78
79
80
81
# File 'lib/march_hare/exchange.rb', line 74

def publish(body, opts = {})
  options = {:routing_key => '', :mandatory => false}.merge(opts)
  @channel.basic_publish(@name,
                         options[:routing_key],
                         options[:mandatory],
                         options.fetch(:properties, Hash.new),
                         body.to_java_bytes)
end

- (Bunny::Exchange) unbind(exchange, opts = {})

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (Bunny::Exchange)

    Self

See Also:



129
130
131
132
# File 'lib/march_hare/exchange.rb', line 129

def unbind(exchange, opts = {})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_unbind(@name, exchange_name, opts.fetch(:routing_key, ''), opts[:arguments])
end

- (Object) wait_for_confirms

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms



145
146
147
# File 'lib/march_hare/exchange.rb', line 145

def wait_for_confirms
  @channel.wait_for_confirms
end