Class: Kaboose::Task
Constants
| Name | Value |
|---|---|
| MAX_ERRORS | 3 |
Attributes
| Name | Read/write? |
|---|---|
| options | R |
Public Class Methods
from_message (message)
Creates a Task from a message
# File lib/kaboose/task.rb, line 39 39: def self.from_message(message) 40: action = message.delete :action 41: Task.new(action, message) 42: end
new (action, options = {})
# File lib/kaboose/task.rb, line 44 44: def initialize(action, options = {}) 45: @action = action 46: @options = options 47: end
publish (action, options={})
Puts actions in the Queue. Priority defaults to 5
# File lib/kaboose/task.rb, line 32 32: def self.publish(action, options={}) 33: priority = options[:priority] || 5 34: options.merge!({:action => action}) 35: Kaboose::Queue.enqueue(priority, options) 36: end
Public Instance Methods
enqueue_with_error (e)
Puts a failed Task back in the Queue with the lowest priority. If a task fails MAX_ERRORS times, it is put in Queue 0 which is never processed.
# File lib/kaboose/task.rb, line 13 13: def enqueue_with_error(e) 14: @options[:failures] = @options[:failures].to_i + 1 15: @options[:errors] ||= [] 16: 17: priority = (@options[:failures] < MAX_ERRORS) ? 10 : 0 18: Kaboose::Queue.enqueue(priority, @options.merge!({ 19: :action => @action, 20: :errors => @options[:errors] << {:class => e.class.to_s, :message => e.message} 21: })) 22: 23: Kaboose::Queue.log("Action '#{@action}' failed #{@options[:failures]} time(s).") 24: 25: # Send failure notification if a notification observer is defined 26: if priority == 0 && Kaboose::Queue.failure_notification_observer 27: Kaboose::Queue.failure_notification_observer.call(self, e) 28: end 29: end
process! ()
# File lib/kaboose/task.rb, line 7 7: def process! 8: processor.new.process!(self) 9: end
Protected Instance Methods
processor ()
Infers a Processor‘s name from the action
# File lib/kaboose/task.rb, line 51 51: def processor 52: Object.const_get(@action.to_s.camelize + 'Processor') 53: end