class Irc::Bot::Journal::JournalBroker

Attributes

storage[R]

Public Class Methods

new(opts={}) click to toggle source
# File lib/rbot/journal.rb, line 294
def initialize(opts={})
  # overrides the internal consumer with a block
  @consumer = opts[:consumer]
  # storage backend
  @storage = opts[:storage]
  unless @storage
    warning 'journal broker: no storage set up, won\'t persist messages'
  end
  @queue = Queue.new
  # consumer thread:
  @thread = Thread.new do
    while message = @queue.pop
      begin
        consume message
      # pop(true) ... rescue ThreadError => e
      rescue Exception => e
        error 'journal broker: exception in consumer thread'
        error $!
      end
    end
  end
  @subscriptions = []
  # lookup-table for subscriptions by their topic
  @topic_subs = {}
end

Public Instance Methods

consume(message) click to toggle source
# File lib/rbot/journal.rb, line 320
def consume(message)
  return unless message
  @consumer.call(message) if @consumer

  # notify subscribers
  if @topic_subs.has_key? message.topic
    @topic_subs[message.topic].each do |s|
      s.block.call(message)
    end
  end

  @storage.insert(message) if @storage
end
count(query=nil) click to toggle source
# File lib/rbot/journal.rb, line 407
def count(query=nil)
  unless query.is_a? Query
    query = Query.define(query)
  end
  @storage.count(query)
end
ensure_payload_index(key) click to toggle source
# File lib/rbot/journal.rb, line 421
def ensure_payload_index(key)
  @storage.ensure_payload_index(key)
end
find(query, limit=100, offset=0, &block) click to toggle source

Find and return persisted messages by a query.

This method will either return all messages or call the provided block for each message. It will filter the messages by the provided Query instance. Limit and offset might be used to constrain the result. The query might also be a hash or proc that is passed to Query.define first.

@param query [Query] @param limit [Integer] how many items to return @param offset [Integer] relative offset in results

# File lib/rbot/journal.rb, line 396
def find(query, limit=100, offset=0, &block)
  unless query.is_a? Query
    query = Query.define(query)
  end
  if block_given?
    @storage.find(query, limit, offset, &block)
  else
    @storage.find(query, limit, offset)
  end
end
persists?() click to toggle source
# File lib/rbot/journal.rb, line 334
def persists?
  true if @storage
end
publish(topic, payload) click to toggle source
# File lib/rbot/journal.rb, line 347
def publish(topic, payload)
  debug 'journal publish message in %s: %s' % [topic, payload.inspect]
  @queue << JournalMessage::create(topic, payload)
  nil
end
remove(query=nil) click to toggle source
# File lib/rbot/journal.rb, line 414
def remove(query=nil)
  unless query.is_a? Query
    query = Query.define(query)
  end
  @storage.remove(query)
end
shutdown() click to toggle source
# File lib/rbot/journal.rb, line 338
def shutdown
  log 'journal shutdown'
  @subscriptions.clear
  @topic_subs.clear
  @queue << nil
  @thread.join
  @thread = nil
end
subscribe(topic=nil, &block) click to toggle source

Subscribe to receive messages from a topic.

You can use this method to subscribe to messages that are published within a specified topic. You must provide a receiving block to receive messages one-by-one. The method returns an instance of Subscription that can be used to cancel the subscription by invoking cancel on it.

journal.subscribe('irclog') do |message|
  # received irclog messages...
end
# File lib/rbot/journal.rb, line 366
def subscribe(topic=nil, &block)
  raise ArgumentError.new unless block_given?
  s = Subscription.new(self, topic, block)
  @subscriptions << s
  unless @topic_subs.has_key? topic
    @topic_subs[topic] = []
  end
  @topic_subs[topic] << s
  s
end
unsubscribe(s) click to toggle source
# File lib/rbot/journal.rb, line 377
def unsubscribe(s)
  if @topic_subs.has_key? s.topic
    @topic_subs[s.topic].delete(s)
  end
  @subscriptions.delete s
end