Skip to content

Commit

Permalink
Merge pull request #202 from abrandoned/master
Browse files Browse the repository at this point in the history
zmq connector fixes
  • Loading branch information
abrandoned committed Jul 8, 2014
2 parents f015c55 + 58d86e5 commit b51cd11
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
51 changes: 31 additions & 20 deletions lib/protobuf/rpc/connectors/zmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@ module Protobuf
module Rpc
module Connectors
class Zmq < Base

RequestTimeout = Class.new(RuntimeError)
ZmqRecoverableError = Class.new(RuntimeError)

##
# Included Modules
#

include Protobuf::Rpc::Connectors::Common
include Protobuf::Logger::LogMethods

##
# Class Constants
#

CLIENT_RETRIES = (ENV['PB_CLIENT_RETRIES'] || 3)

##
Expand Down Expand Up @@ -68,21 +66,27 @@ def create_socket

begin
server_uri = lookup_server_uri

socket = zmq_context.socket(::ZMQ::REQ)
socket.setsockopt(::ZMQ::LINGER, 0)

log_debug { sign_message("Establishing connection: #{server_uri}") }
zmq_error_check(socket.connect(server_uri), :socket_connect)
log_debug { sign_message("Connection established to #{server_uri}") }

if first_alive_load_balance?
check_available_response = ""
zmq_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
zmq_error_check(socket.recv_string(check_available_response), :socket_recv_string)

if check_available_response == ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE
zmq_error_check(socket.close, :socket_close)
if socket # Make sure the context builds the socket
socket.setsockopt(::ZMQ::LINGER, 0)

log_debug { sign_message("Establishing connection: #{server_uri}") }
zmq_error_check(socket.connect(server_uri), :socket_connect)
log_debug { sign_message("Connection established to #{server_uri}") }

if first_alive_load_balance?
begin
check_available_response = ""
zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string)

if check_available_response == ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE
zmq_recoverable_error_check(socket.close, :socket_close)
end
rescue ZmqRecoverableError
socket = nil # couldn't make a connection and need to try again
end
end
end
end while socket.try(:socket).nil?
Expand All @@ -101,7 +105,7 @@ def error?
# to the host and port in the options
#
def lookup_server_uri
50.times do
5.times do
service_directory.all_listings_for(service).each do |listing|
host = listing.try(:address)
port = listing.try(:port)
Expand All @@ -111,16 +115,13 @@ def lookup_server_uri
host = options[:host]
port = options[:port]
return "tcp://#{host}:#{port}" if host_alive?(host)

sleep(1.0/10.0) # not sure why sleeping at all, but should be way less than 1 second
end

raise "Host not found for service #{service}"
end

def host_alive?(host)
return true unless ping_port_enabled?

socket = TCPSocket.new(host, ping_port.to_i)

true
Expand Down Expand Up @@ -199,6 +200,16 @@ def zmq_error_check(return_code, source)
ERROR
end
end

def zmq_recoverable_error_check(return_code, source)
unless ::ZMQ::Util.resultcode_ok?(return_code || -1)
raise ZmqRecoverableError, <<-ERROR
Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
#{caller(1).join($/)}
ERROR
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/protobuf/rpc/servers/zmq/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def process_frontend
address, _, message, *frames = read_from_frontend

if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
if @idle_workers.any? || local_queue.empty?
if @idle_workers.any? || local_queue.size < 5 # Should make queue a SizedQueue and allow users to configure queue size
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
else
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])
Expand Down

0 comments on commit b51cd11

Please sign in to comment.