Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

See Also:

Constant Summary

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary (collapse)

Class Method Summary (collapse)

Instance Method Summary (collapse)

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(selector, *args)



254
255
256
# File 'lib/march_hare/session.rb', line 254

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

- (Array<MarchHare::Channel>) channels (readonly)

Channels opened on this connection

Returns:



73
74
75
# File 'lib/march_hare/session.rb', line 73

def channels
  @channels
end

Class Method Details

+ (Object) connect(options = {})

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer) — default: 600

    Heartbeat interval. 0 means no heartbeat.

  • :tls (Boolean) — default: false

    Set to true to use TLS/SSL connection. This will switch port to 5671 by default.

See Also:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/march_hare/session.rb', line 42

def self.connect(options={})
  cf = ConnectionFactory.new

  cf.uri                = options[:uri]          if options[:uri]
  cf.host               = hostname_from(options) if include_host?(options)
  cf.port               = options[:port].to_i    if options[:port]
  cf.virtual_host       = vhost_from(options)    if include_vhost?(options)
  cf.connection_timeout = timeout_from(options)  if include_timeout?(options)
  cf.username           = username_from(options) if include_username?(options)
  cf.password           = password_from(options) if include_password?(options)

  cf.requested_heartbeat = heartbeat_from(options)          if include_heartbeat?(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end


  new(cf, options)
end

Instance Method Details

- (Object) automatically_recover

Begins automatic connection recovery (typically only used internally to recover from network failures)



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/march_hare/session.rb', line 190

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  @connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      self.new_connection
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, @connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end
end

- (Object) clear_blocked_connection_callbacks

Clears all callbacks defined with #on_blocked and #on_unblocked.



167
168
169
# File 'lib/march_hare/session.rb', line 167

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

- (Object) close

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



125
126
127
128
129
130
131
# File 'lib/march_hare/session.rb', line 125

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  @connection.close
end

- (Boolean) closed?

True if this channel is closed

Returns:

  • (Boolean)

    true if this channel is closed



140
141
142
# File 'lib/march_hare/session.rb', line 140

def closed?
  !@connection.open?
end

- (MarchHare::Channel) create_channel(n = nil)

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/march_hare/session.rb', line 107

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

- (Object) flush

Flushes the socket used by this connection.



233
234
235
# File 'lib/march_hare/session.rb', line 233

def flush
  @connection.flush
end

- (Object) on_blocked

Defines a connection.blocked handler



157
158
159
# File 'lib/march_hare/session.rb', line 157

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

- (Object) on_shutdown

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



147
148
149
150
151
152
153
154
# File 'lib/march_hare/session.rb', line 147

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

- (Object) on_unblocked

Defines a connection.unblocked handler



162
163
164
# File 'lib/march_hare/session.rb', line 162

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

- (Boolean) open? Also known as: connected?

True if connection is open, false otherwise

Returns:

  • (Boolean)

    true if connection is open, false otherwise



134
135
136
# File 'lib/march_hare/session.rb', line 134

def open?
  @connection.open?
end

- (Object) start

No-op, exists for better API compatibility with Bunny.



243
244
245
246
247
248
249
250
251
252
# File 'lib/march_hare/session.rb', line 243

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

- (String) to_s

Returns:

  • (String)


259
260
261
# File 'lib/march_hare/session.rb', line 259

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end