Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
PATH
remote: .
specs:
hystrix-ruby (0.0.1)
hystrix-ruby (0.1.0)
celluloid (>= 0.13.0)

GEM
remote: https://rubygems.org/
specs:
celluloid (0.15.2)
timers (~> 1.1.0)
celluloid (0.16.0)
timers (~> 4.0.0)
diff-lcs (1.2.4)
hitimes (1.2.2)
multi_json (1.8.2)
rspec (2.14.1)
rspec-core (~> 2.14.0)
Expand All @@ -26,7 +27,8 @@ GEM
simplecov-rcov (0.2.3)
simplecov (>= 0.4.1)
timecop (0.6.3)
timers (1.1.0)
timers (4.0.1)
hitimes

PLATFORMS
ruby
Expand Down
2 changes: 1 addition & 1 deletion hystrix-ruby.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = %q{hystrix-ruby}
s.version = "0.0.1"
s.version = "0.1.2"
s.authors = ["Keith Thornhill"]
s.date = %q{2013-04-08}
s.description = %q{Hystrix for Ruby}
Expand Down
5 changes: 3 additions & 2 deletions lib/hystrix/circuit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module Hystrix
class Circuit
include Celluloid

attr_accessor :lock, :health, :recent_latency_errors, :last_health_check_time
attr_accessor :lock, :health, :recent_latency_errors, :last_health_check_time, :command_pool

def initialize
def initialize(command_pool)
self.lock = Mutex.new
self.recent_latency_errors = []
self.health = 0
self.last_health_check_time = nil
self.command_pool = command_pool
end

def is_closed?
Expand Down
13 changes: 4 additions & 9 deletions lib/hystrix/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,15 @@ def execute
result = executor.run(self)
duration = Time.now - start_time

Configuration.notify_success(executor_pool_name, duration)
Configuration.notify_success({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration})
rescue Exception => main_error
duration = Time.now - start_time

begin
if main_error.respond_to?(:cause)
result = fallback(main_error.cause)
Configuration.notify_fallback(executor_pool_name, duration, main_error.cause)
else
result = fallback(main_error)
Configuration.notify_fallback(executor_pool_name, duration, main_error)
end
result = fallback(main_error)
Configuration.notify_fallback({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error})
rescue NotImplementedError => fallback_error
Configuration.notify_failure(executor_pool_name, duration, main_error)
Configuration.notify_failure({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error, fallback_error: fallback_error})
raise main_error
end
ensure
Expand Down
12 changes: 6 additions & 6 deletions lib/hystrix/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ class Configuration
def self.on_success(&block)
@on_success = block
end
def self.notify_success(command_name, duration)
def self.notify_success(params)
if @on_success
@on_success.call(command_name, duration)
@on_success.call(params)
end
end

def self.on_fallback(&block)
@on_fallback = block
end
def self.notify_fallback(command_name, duration, error)
def self.notify_fallback(params)
if @on_fallback
@on_fallback.call(command_name, duration, error)
@on_fallback.call(params)
end
end

def self.on_failure(&block)
@on_failure = block
end
def self.notify_failure(command_name, duration, error)
def self.notify_failure(params)
if @on_failure
@on_failure.call(command_name, duration, error)
@on_failure.call(params)
end
end

Expand Down
66 changes: 47 additions & 19 deletions lib/hystrix/executor_pool.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'singleton'
require 'securerandom'

module Hystrix
class CommandExecutorPools
Expand All @@ -14,6 +15,9 @@ def initialize
def get_pool(pool_name, size = nil)
lock.synchronize do
pools[pool_name] ||= CommandExecutorPool.new(pool_name, size || 10)
pools[pool_name].set_size(size || 10)

return pools[pool_name]
end
end

Expand All @@ -28,42 +32,61 @@ def shutdown

class CommandExecutorPool
attr_accessor :name, :size
attr_accessor :executors, :lock
attr_accessor :executors, :locked_executors, :lock
attr_accessor :circuit_supervisor
attr_reader :uuid

def initialize(name, size)
@uuid = SecureRandom.uuid

self.name = name
self.size = size
self.executors = []
self.executors = {}
self.locked_executors = {}
self.lock = Mutex.new
self.circuit_supervisor = Circuit.supervise
self.circuit_supervisor = Circuit.supervise(self.name)
size.times do
self.executors << CommandExecutor.new
e = CommandExecutor.new(self)
self.executors[e.uuid] = e
end
end

def set_size(size)
self.size = size
if size > self.size
(size - self.size).times do
e = CommandExecutor.new(self)
self.executors[e.uuid] = e
end
self.size = size
end
end

def take
raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0

lock.synchronize do
for executor in self.executors
unless executor.locked?
executor.lock
return executor
end
end
raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0
uuid, executor = self.executors.first
executor.lock

self.executors.delete(executor.uuid)
self.locked_executors[executor.uuid] = executor

return executor
end
end

raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool.")
def release(executor)
self.locked_executors.delete(executor.uuid)
self.executors[executor.uuid] = executor
end

def shutdown
lock.synchronize do
until executors.size == 0 do
for i in (0...executors.size)
unless executors[i].locked?
executors[i] = nil
end
end
executors.compact!
self.executors = {}
until (self.executors.size + self.locked_executors.size) == 0 do
self.executors = {}
sleep 0.1
end
end
Expand All @@ -72,8 +95,12 @@ def shutdown

class CommandExecutor
attr_accessor :owner
attr_reader :uuid, :pool

def initialize(pool)
@uuid = SecureRandom.uuid
@pool = pool

def initialize
self.owner = nil
end

Expand All @@ -83,6 +110,7 @@ def lock

def unlock
self.owner = nil
self.pool.release(self) if self.pool
end

def locked?
Expand Down
44 changes: 25 additions & 19 deletions spec/lib/hystrix/circuit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
describe Hystrix::Circuit do
context 'health threshold, ' do
it 'is healthy if no commands have reported latency errors' do
circuit = Hystrix::Circuit.new
circuit = Hystrix::Circuit.new("test")
circuit.is_healthy?.should == true
end

it 'becomes unhealthy if X commands are slow within the last Y seconds.' do
circuit = Hystrix::Circuit.new
circuit = Hystrix::Circuit.new("test")
5.times do
circuit.add_latency_error(rand(10))
end
circuit.is_healthy?.should == false
end

it 'opens the circuit when unhealthy' do
circuit = Hystrix::Circuit.new
circuit = Hystrix::Circuit.new("test")
circuit.wrapped_object.stub(:is_healthy?).and_return(false)
circuit.is_closed?.should == false
end

it 'prunes old latency errors over time' do
circuit = Hystrix::Circuit.new
circuit = Hystrix::Circuit.new("test")
now = Time.now
13.times do |i|
Timecop.freeze(now - i) do
Expand All @@ -37,31 +37,37 @@
end

it 'doesnt recalculate health every closed check' do
circuit = Hystrix::Circuit.new
circuit.wrapped_object.should_receive(:calculate_health).once.and_call_original
now = Time.now

2.times do |i|
Timecop.freeze(now - i) do
circuit.is_closed?
end
circuit = Hystrix::Circuit.new("test")

Timecop.freeze(now) do
circuit.wrapped_object.should_receive(:calculate_health).once
circuit.is_closed?

circuit.wrapped_object.last_health_check_time = now.to_f
circuit.is_closed?
end
end

it 'only recalculates health every X seconds' do
circuit = Hystrix::Circuit.new
circuit.wrapped_object.should_receive(:calculate_health).twice.and_call_original
circuit = Hystrix::Circuit.new("test")
circuit.wrapped_object.should_receive(:calculate_health).twice
now = Time.now

12.times do |i|
Timecop.freeze(now + i) do
circuit.is_closed?
end

Timecop.freeze(now) do
circuit.is_closed?

circuit.wrapped_object.last_health_check_time = now.to_f
circuit.is_closed?
end

Timecop.freeze(now+15) do
circuit.is_closed?
end
end

it 'allows the health to return back to 0' do
circuit = Hystrix::Circuit.new
circuit = Hystrix::Circuit.new("test")
now = Time.now
Timecop.freeze(now - 11) do
circuit.add_latency_error(rand(10))
Expand Down
26 changes: 13 additions & 13 deletions spec/lib/hystrix/command_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def run
sleep wait

if fail
abort 'error'
raise 'error'
else
return self.string
end
Expand Down Expand Up @@ -72,9 +72,9 @@ def fallback(error)
test_duration = nil

Hystrix.configure do
on_success do |command_name, duration|
test_name = command_name
test_duration = duration
on_success do |params|
test_name = params[:command_name]
test_duration = params[:duration]
end
end

Expand All @@ -90,10 +90,10 @@ def fallback(error)
test_error = nil

Hystrix.configure do
on_fallback do |command_name, duration, error|
test_name = command_name
test_duration = duration
test_error = error
on_fallback do |params|
test_name = params[:command_name]
test_duration = params[:duration]
test_error = params[:error]
end
end

Expand All @@ -116,10 +116,10 @@ def run
end

Hystrix.configure do
on_failure do |command_name, duration, error|
test_name = command_name
test_duration = duration
test_error = error
on_failure do |params|
test_name = params[:command_name]
test_duration = params[:duration]
test_error = params[:error]
end
end

Expand Down Expand Up @@ -154,7 +154,7 @@ class SizedPoolCommand < Hystrix::Command

it 'sends exception to fallback method on error' do
c = CommandHelloWorld.new('keith', 0, true)
c.wrapped_object.should_receive(:fallback).with do |error|
c.wrapped_object.should_receive(:fallback) do |error|
error.message.should == 'error'
end
c.execute
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/hystrix/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

it 'defines callbacks via dsl' do
Hystrix.configure do
on_success do |command_name, duration|
on_success do |params|
raise 'callback'
end
end

expect {
Hystrix::Configuration.notify_success('test', 30)
Hystrix::Configuration.notify_success({})
}.to raise_error('callback')
end
end
Loading