diff --git a/Gemfile.lock b/Gemfile.lock index 4ccf9a4..b80c140 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,15 +1,16 @@ PATH remote: . specs: - hystrix-ruby (0.0.1) + hystrix-ruby (0.1.0) celluloid (>= 0.13.0) GEM remote: https://rubygems.org/ specs: - celluloid (0.15.2) - timers (~> 1.1.0) + celluloid (0.16.0) + timers (~> 4.0.0) diff-lcs (1.2.4) + hitimes (1.2.2) multi_json (1.8.2) rspec (2.14.1) rspec-core (~> 2.14.0) @@ -26,7 +27,8 @@ GEM simplecov-rcov (0.2.3) simplecov (>= 0.4.1) timecop (0.6.3) - timers (1.1.0) + timers (4.0.1) + hitimes PLATFORMS ruby diff --git a/hystrix-ruby.gemspec b/hystrix-ruby.gemspec index 75a39d9..9bddd8d 100644 --- a/hystrix-ruby.gemspec +++ b/hystrix-ruby.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = %q{hystrix-ruby} - s.version = "0.0.1" + s.version = "0.1.2" s.authors = ["Keith Thornhill"] s.date = %q{2013-04-08} s.description = %q{Hystrix for Ruby} diff --git a/lib/hystrix/circuit.rb b/lib/hystrix/circuit.rb index a9b7b66..e0c39c0 100644 --- a/lib/hystrix/circuit.rb +++ b/lib/hystrix/circuit.rb @@ -3,13 +3,14 @@ module Hystrix class Circuit include Celluloid - attr_accessor :lock, :health, :recent_latency_errors, :last_health_check_time + attr_accessor :lock, :health, :recent_latency_errors, :last_health_check_time, :command_pool - def initialize + def initialize(command_pool) self.lock = Mutex.new self.recent_latency_errors = [] self.health = 0 self.last_health_check_time = nil + self.command_pool = command_pool end def is_closed? diff --git a/lib/hystrix/command.rb b/lib/hystrix/command.rb index db24f1c..28140af 100644 --- a/lib/hystrix/command.rb +++ b/lib/hystrix/command.rb @@ -34,20 +34,15 @@ def execute result = executor.run(self) duration = Time.now - start_time - Configuration.notify_success(executor_pool_name, duration) + Configuration.notify_success({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration}) rescue Exception => main_error duration = Time.now - start_time begin - if main_error.respond_to?(:cause) - result = fallback(main_error.cause) - Configuration.notify_fallback(executor_pool_name, duration, main_error.cause) - else - result = fallback(main_error) - Configuration.notify_fallback(executor_pool_name, duration, main_error) - end + result = fallback(main_error) + Configuration.notify_fallback({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error}) rescue NotImplementedError => fallback_error - Configuration.notify_failure(executor_pool_name, duration, main_error) + Configuration.notify_failure({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error, fallback_error: fallback_error}) raise main_error end ensure diff --git a/lib/hystrix/configuration.rb b/lib/hystrix/configuration.rb index 5c13e30..e70f081 100644 --- a/lib/hystrix/configuration.rb +++ b/lib/hystrix/configuration.rb @@ -3,27 +3,27 @@ class Configuration def self.on_success(&block) @on_success = block end - def self.notify_success(command_name, duration) + def self.notify_success(params) if @on_success - @on_success.call(command_name, duration) + @on_success.call(params) end end def self.on_fallback(&block) @on_fallback = block end - def self.notify_fallback(command_name, duration, error) + def self.notify_fallback(params) if @on_fallback - @on_fallback.call(command_name, duration, error) + @on_fallback.call(params) end end def self.on_failure(&block) @on_failure = block end - def self.notify_failure(command_name, duration, error) + def self.notify_failure(params) if @on_failure - @on_failure.call(command_name, duration, error) + @on_failure.call(params) end end diff --git a/lib/hystrix/executor_pool.rb b/lib/hystrix/executor_pool.rb index e650ded..9705272 100644 --- a/lib/hystrix/executor_pool.rb +++ b/lib/hystrix/executor_pool.rb @@ -1,4 +1,5 @@ require 'singleton' +require 'securerandom' module Hystrix class CommandExecutorPools @@ -14,6 +15,9 @@ def initialize def get_pool(pool_name, size = nil) lock.synchronize do pools[pool_name] ||= CommandExecutorPool.new(pool_name, size || 10) + pools[pool_name].set_size(size || 10) + + return pools[pool_name] end end @@ -28,42 +32,61 @@ def shutdown class CommandExecutorPool attr_accessor :name, :size - attr_accessor :executors, :lock + attr_accessor :executors, :locked_executors, :lock attr_accessor :circuit_supervisor + attr_reader :uuid def initialize(name, size) + @uuid = SecureRandom.uuid + self.name = name self.size = size - self.executors = [] + self.executors = {} + self.locked_executors = {} self.lock = Mutex.new - self.circuit_supervisor = Circuit.supervise + self.circuit_supervisor = Circuit.supervise(self.name) size.times do - self.executors << CommandExecutor.new + e = CommandExecutor.new(self) + self.executors[e.uuid] = e + end + end + + def set_size(size) + self.size = size + if size > self.size + (size - self.size).times do + e = CommandExecutor.new(self) + self.executors[e.uuid] = e + end + self.size = size end end def take + raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0 + lock.synchronize do - for executor in self.executors - unless executor.locked? - executor.lock - return executor - end - end + raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0 + uuid, executor = self.executors.first + executor.lock + + self.executors.delete(executor.uuid) + self.locked_executors[executor.uuid] = executor + + return executor end + end - raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool.") + def release(executor) + self.locked_executors.delete(executor.uuid) + self.executors[executor.uuid] = executor end def shutdown lock.synchronize do - until executors.size == 0 do - for i in (0...executors.size) - unless executors[i].locked? - executors[i] = nil - end - end - executors.compact! + self.executors = {} + until (self.executors.size + self.locked_executors.size) == 0 do + self.executors = {} sleep 0.1 end end @@ -72,8 +95,12 @@ def shutdown class CommandExecutor attr_accessor :owner + attr_reader :uuid, :pool + + def initialize(pool) + @uuid = SecureRandom.uuid + @pool = pool - def initialize self.owner = nil end @@ -83,6 +110,7 @@ def lock def unlock self.owner = nil + self.pool.release(self) if self.pool end def locked? diff --git a/spec/lib/hystrix/circuit_spec.rb b/spec/lib/hystrix/circuit_spec.rb index 8b52241..383587b 100644 --- a/spec/lib/hystrix/circuit_spec.rb +++ b/spec/lib/hystrix/circuit_spec.rb @@ -3,12 +3,12 @@ describe Hystrix::Circuit do context 'health threshold, ' do it 'is healthy if no commands have reported latency errors' do - circuit = Hystrix::Circuit.new + circuit = Hystrix::Circuit.new("test") circuit.is_healthy?.should == true end it 'becomes unhealthy if X commands are slow within the last Y seconds.' do - circuit = Hystrix::Circuit.new + circuit = Hystrix::Circuit.new("test") 5.times do circuit.add_latency_error(rand(10)) end @@ -16,13 +16,13 @@ end it 'opens the circuit when unhealthy' do - circuit = Hystrix::Circuit.new + circuit = Hystrix::Circuit.new("test") circuit.wrapped_object.stub(:is_healthy?).and_return(false) circuit.is_closed?.should == false end it 'prunes old latency errors over time' do - circuit = Hystrix::Circuit.new + circuit = Hystrix::Circuit.new("test") now = Time.now 13.times do |i| Timecop.freeze(now - i) do @@ -37,31 +37,37 @@ end it 'doesnt recalculate health every closed check' do - circuit = Hystrix::Circuit.new - circuit.wrapped_object.should_receive(:calculate_health).once.and_call_original now = Time.now - - 2.times do |i| - Timecop.freeze(now - i) do - circuit.is_closed? - end + circuit = Hystrix::Circuit.new("test") + + Timecop.freeze(now) do + circuit.wrapped_object.should_receive(:calculate_health).once + circuit.is_closed? + + circuit.wrapped_object.last_health_check_time = now.to_f + circuit.is_closed? end end it 'only recalculates health every X seconds' do - circuit = Hystrix::Circuit.new - circuit.wrapped_object.should_receive(:calculate_health).twice.and_call_original + circuit = Hystrix::Circuit.new("test") + circuit.wrapped_object.should_receive(:calculate_health).twice now = Time.now - - 12.times do |i| - Timecop.freeze(now + i) do - circuit.is_closed? - end + + Timecop.freeze(now) do + circuit.is_closed? + + circuit.wrapped_object.last_health_check_time = now.to_f + circuit.is_closed? + end + + Timecop.freeze(now+15) do + circuit.is_closed? end end it 'allows the health to return back to 0' do - circuit = Hystrix::Circuit.new + circuit = Hystrix::Circuit.new("test") now = Time.now Timecop.freeze(now - 11) do circuit.add_latency_error(rand(10)) diff --git a/spec/lib/hystrix/command_spec.rb b/spec/lib/hystrix/command_spec.rb index 87506f5..9b4c78f 100644 --- a/spec/lib/hystrix/command_spec.rb +++ b/spec/lib/hystrix/command_spec.rb @@ -15,7 +15,7 @@ def run sleep wait if fail - abort 'error' + raise 'error' else return self.string end @@ -72,9 +72,9 @@ def fallback(error) test_duration = nil Hystrix.configure do - on_success do |command_name, duration| - test_name = command_name - test_duration = duration + on_success do |params| + test_name = params[:command_name] + test_duration = params[:duration] end end @@ -90,10 +90,10 @@ def fallback(error) test_error = nil Hystrix.configure do - on_fallback do |command_name, duration, error| - test_name = command_name - test_duration = duration - test_error = error + on_fallback do |params| + test_name = params[:command_name] + test_duration = params[:duration] + test_error = params[:error] end end @@ -116,10 +116,10 @@ def run end Hystrix.configure do - on_failure do |command_name, duration, error| - test_name = command_name - test_duration = duration - test_error = error + on_failure do |params| + test_name = params[:command_name] + test_duration = params[:duration] + test_error = params[:error] end end @@ -154,7 +154,7 @@ class SizedPoolCommand < Hystrix::Command it 'sends exception to fallback method on error' do c = CommandHelloWorld.new('keith', 0, true) - c.wrapped_object.should_receive(:fallback).with do |error| + c.wrapped_object.should_receive(:fallback) do |error| error.message.should == 'error' end c.execute diff --git a/spec/lib/hystrix/configuration_spec.rb b/spec/lib/hystrix/configuration_spec.rb index eabcfb9..406e245 100644 --- a/spec/lib/hystrix/configuration_spec.rb +++ b/spec/lib/hystrix/configuration_spec.rb @@ -7,13 +7,13 @@ it 'defines callbacks via dsl' do Hystrix.configure do - on_success do |command_name, duration| + on_success do |params| raise 'callback' end end expect { - Hystrix::Configuration.notify_success('test', 30) + Hystrix::Configuration.notify_success({}) }.to raise_error('callback') end end \ No newline at end of file diff --git a/spec/lib/hystrix/executor_pool_spec.rb b/spec/lib/hystrix/executor_pool_spec.rb index 4bb479a..c92e58a 100644 --- a/spec/lib/hystrix/executor_pool_spec.rb +++ b/spec/lib/hystrix/executor_pool_spec.rb @@ -1,7 +1,11 @@ require 'spec_helper' describe Hystrix::CommandExecutor do - let(:executor) { Hystrix::CommandExecutor.new } + let(:executor) { Hystrix::CommandExecutor.new(Hystrix::CommandExecutorPool.new('test',0)) } + + it 'has a uuid' do + executor.uuid.length.should == 36 + end it 'can be locked' do executor.locked?.should == false @@ -25,6 +29,13 @@ def run Hystrix::CommandExecutorPool.new('test', 10).executors.size.should == 10 end + context '.uuid' do + it 'generates a UUID on creation' do + pool = Hystrix::CommandExecutorPool.new('test', 1) + pool.uuid.length.should == 36 + end + end + context '.take' do it 'fails if all executors are locked' do pool = Hystrix::CommandExecutorPool.new('test', 1) diff --git a/spec/lib/hystrix/inline_spec.rb b/spec/lib/hystrix/inline_spec.rb index a443396..20d4e6b 100644 --- a/spec/lib/hystrix/inline_spec.rb +++ b/spec/lib/hystrix/inline_spec.rb @@ -35,8 +35,8 @@ mock.should_receive(:check).with('sup') Hystrix.configure do - on_success do |command_name, duration| - mock.check(command_name) + on_success do |params| + mock.check(params[:executor_pool_name]) end end