Class: MarchHare::BaseConsumer

Inherits:
DefaultConsumer
  • Object
show all
Defined in:
lib/march_hare/consumers/base.rb

Direct Known Subclasses

CallbackConsumer

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (BaseConsumer) initialize(channel, queue, opts)

A new instance of BaseConsumer



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/march_hare/consumers/base.rb', line 10

def initialize(channel, queue, opts)
  super(channel)
  @channel    = channel
  @queue      = queue
  @opts       = opts
  @auto_ack   = true

  @cancelling = JavaConcurrent::AtomicBoolean.new
  @cancelled  = JavaConcurrent::AtomicBoolean.new

  @terminated = JavaConcurrent::AtomicBoolean.new
end

Instance Attribute Details

- (Object) auto_ack

Returns the value of attribute auto_ack



8
9
10
# File 'lib/march_hare/consumers/base.rb', line 8

def auto_ack
  @auto_ack
end

- (Object) consumer_tag

Returns the value of attribute consumer_tag



7
8
9
# File 'lib/march_hare/consumers/base.rb', line 7

def consumer_tag
  @consumer_tag
end

Instance Method Details

- (Boolean) active?

Returns:

  • (Boolean)


75
76
77
# File 'lib/march_hare/consumers/base.rb', line 75

def active?
  !terminated?
end

- (Boolean) cancelled?

Returns:

  • (Boolean)


71
72
73
# File 'lib/march_hare/consumers/base.rb', line 71

def cancelled?
  @cancelling.get || @cancelled.get
end

- (Object) deliver(headers, message)

Raises:

  • (NotImplementedError)


67
68
69
# File 'lib/march_hare/consumers/base.rb', line 67

def deliver(headers, message)
  raise NotImplementedError, 'To be implemented by a subclass'
end

- (Object) gracefully_shut_down



63
64
65
# File 'lib/march_hare/consumers/base.rb', line 63

def gracefully_shut_down
  # no-op
end

- (Object) handleCancel(consumer_tag)



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/march_hare/consumers/base.rb', line 30

def handleCancel(consumer_tag)
  @cancelled.set(true)
  @channel.unregister_consumer(consumer_tag)

  if f = @opts[:on_cancellation]
    case f.arity
    when 0 then
      f.call
    when 1 then
      f.call(self)
    when 2 then
      f.call(@channel, self)
    when 3 then
      f.call(@channel, self, consumer_tag)
    else
      f.call(@channel, self, consumer_tag)
    end
  end

  @terminated.set(true)
end

- (Object) handleCancelOk(consumer_tag)



52
53
54
55
56
57
# File 'lib/march_hare/consumers/base.rb', line 52

def handleCancelOk(consumer_tag)
  @cancelled.set(true)
  @channel.unregister_consumer(consumer_tag)

  @terminated.set(true)
end

- (Object) handleDelivery(consumer_tag, envelope, properties, bytes)



23
24
25
26
27
28
# File 'lib/march_hare/consumers/base.rb', line 23

def handleDelivery(consumer_tag, envelope, properties, bytes)
  body    = String.from_java_bytes(bytes)
  headers = Headers.new(channel, consumer_tag, envelope, properties)

  deliver(headers, body)
end

- (Object) start



59
60
61
# File 'lib/march_hare/consumers/base.rb', line 59

def start
  # no-op
end

- (Boolean) terminated?

Returns:

  • (Boolean)


79
80
81
# File 'lib/march_hare/consumers/base.rb', line 79

def terminated?
  @terminated.get
end