Skip to content

Commit

Permalink
Optimize thread usage in monitor
Browse files Browse the repository at this point in the history
In monitor, we create 2 threads per resource; one for SSH event loop processing
and one for actual pulse check. In previous version, each resource would keep
their threads even after the pulse check is completed. This means the number of
resources we can monitor at the same time is limited by the number of threads
we can create.
This commit changes the behavior so that after the pulse check is completed,
the threads are released. This way, we can monitor significantly more resources
at the same time.
One drawback of the new approach is that we need to re-create the threads for
each check. In my system creating 1000 threads takes about 0.025 seconds, so
overhead is seems negligible.
I also added a new helper method, needs_event_loop_for_pulse_check? to models.
We actually don't need event loop for pulse check for most of the resources,
only PostgresServer and MinioServer need it. Other resources rely on exec! to
perform their pulse check which doesn't need event loop. In fact, I observed
that extra event loop processing actually slows down the exec! calls. By taking
this into consideration, we reduce the number of threads we create and also
improve the speed of some pulse checks.
Another change we are making is that removing the monitoring_interval from the
model and hardcoding it in the monitor as 5 seconds. This removes capability of
setting different monitoring intervals for different resources. Supporting this
would require some work and since it is not used in the current implementation
I decided to remove all together. If we need to support this in the future, we
can add it back with some effort.
  • Loading branch information
byucesoy committed May 3, 2024
1 parent 01d709e commit 19ec0b3
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 50 deletions.
86 changes: 39 additions & 47 deletions bin/monitor
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,54 @@
# frozen_string_literal: true

require_relative "../loader"
clover_freeze

MONITORABLE_RESOURCE_TYPES = [VmHost, PostgresServer, Vm.where(~Sshable.where(id: Sequel[:vm][:id]).exists), MinioServer, GithubRunner]
resources = {}
mutex = Mutex.new

resource_scanner = Thread.new do
loop do
mutex.synchronize do
Enumerator::Chain.new(*MONITORABLE_RESOURCE_TYPES).each do |r|
resources[r.id] ||= MonitorableResource.new(r)
end
end

sessions = {}
ssh_threads = {}
pulse_threads = {}

clover_freeze
sleep 60
end
rescue => ex
Clog.emit("Resource scanning has failed.") { {resource_scanning_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end

loop do
resources = MONITORABLE_RESOURCE_TYPES.flat_map { _1.all }
pulse_checker = Thread.new do
loop do
mutex.synchronize do
resources.each do |r_id, r|
sleep 1 while Thread.list.count + 2 > Config.max_monitor_threads

resources.each do |r|
break if Thread.list.count + 2 > Config.max_monitor_threads
ssh_threads[r.id] ||= Thread.new do
loop do
sessions[r.id] = r.init_health_monitor_session
r.force_stop_if_stuck

loop do
sessions[r.id][:ssh_session].process
rescue => ex
Clog.emit("Processing SSH session is failed. Trying to reestablish the connection") { {health_monitor_ssh_failure: {ubid: r.ubid, exception: Util.exception_to_hash(ex)}} }
break
Thread.new do
r.lock_no_wait do
r.open_resource_session
r.process_event_loop
r.check_pulse
end
end
rescue => ex
Clog.emit("Establishing the SSH session is failed") { {health_monitor_reestablish_ssh_failure: {ubid: r.ubid, exception: Util.exception_to_hash(ex)}} }

begin
r.reload
rescue Sequel::NoExistingObject
Clog.emit("Resource is deleted") { {health_monitor_resource_deleted: {ubid: r.ubid}} }
sessions.delete(r.id)
ssh_threads.delete(r.id)
pulse_threads.delete(r.id)&.kill
break
end

sleep 5
end
end

pulse_threads[r.id] ||= Thread.new do
pulse = {}
loop do
pulse = r.check_pulse(session: sessions[r.id], previous_pulse: pulse)
Clog.emit("Got new pulse") { {got_pulse: {ubid: r.ubid, pulse: pulse}} }
sleep r.monitoring_interval
rescue RuntimeError, IOError => ex
Clog.emit("Pulse checking is failed") { {pulse_check_failure: {ubid: r.ubid, exception: Util.exception_to_hash(ex)}} }
sleep r.monitoring_interval
rescue => ex
Clog.emit("Pulse checking is failed permanently!") { {pulse_check_failure: {ubid: r.ubid, exception: Util.exception_to_hash(ex)}} }
raise
end
resources.select { |r_id, r| r.deleted }.each { |r_id, r| resources.delete(r_id) }
end
sleep 5
end

sleep 5 * 60
rescue => ex
Clog.emit("Pulse checking has failed.") { {pulse_checking_failure: {exception: Util.exception_to_hash(ex)}} }
ThreadPrinter.run
Kernel.exit!
end

resource_scanner.join
pulse_checker.join
88 changes: 88 additions & 0 deletions lib/monitorable_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# frozen_string_literal: true

class MonitorableResource
attr_reader :deleted, :run_event_loop

PULSE_TIMEOUT = 120

def initialize(resource)
@resource = resource
@session = nil
@mutex = Mutex.new
@pulse = {}
@pulse_check_started_at = Time.now
@pulse_thread = nil
@run_event_loop = false
@deleted = false
end

def open_resource_session
return if @session && @pulse[:reading] == "up"

@session = @resource.reload.init_health_monitor_session
rescue => ex
if ex.is_a?(Sequel::NoExistingObject)
Clog.emit("Resource is deleted.") { {resource_deleted: {ubid: @resource.ubid}} }
@session = nil
@deleted = true
end
end

def process_event_loop
return if @session.nil? || !@resource.needs_event_loop_for_pulse_check?

@pulse_thread = Thread.new do
sleep 0.01 until @run_event_loop
@session[:ssh_session].loop(0.01) { @run_event_loop }
rescue => ex
Clog.emit("SSH event loop has failed.") { {event_loop_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
close_resource_session
end
end

def check_pulse
@run_event_loop = true if @resource.needs_event_loop_for_pulse_check?

@pulse_check_started_at = Time.now
begin
@pulse = @resource.check_pulse(session: @session, previous_pulse: @pulse)
Clog.emit("Got new pulse.") { {got_pulse: {ubid: @resource.ubid, pulse: @pulse}} }
rescue => ex
Clog.emit("Pulse checking has failed.") { {pulse_check_failure: {ubid: @resource.ubid, exception: Util.exception_to_hash(ex)}} }
end

@run_event_loop = false if @resource.needs_event_loop_for_pulse_check?
@pulse_thread&.join
end

def close_resource_session
return if @session.nil?

@session[:ssh_session].shutdown!
begin
@session[:ssh_session].close
rescue
end
@session = nil
end

def force_stop_if_stuck
if @mutex.locked?
if @pulse_check_started_at + PULSE_TIMEOUT < Time.now
Clog.emit("Pulse check has stuck.") { {pulse_check_stuck: {ubid: @resource.ubid}} }
ThreadPrinter.run
Kernel.exit!
end
end
end

def lock_no_wait
return unless @mutex.try_lock

begin
yield
ensure
@mutex.unlock
end
end
end
4 changes: 2 additions & 2 deletions model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def aggregate_readings(previous_pulse:, reading:, data: {})
}.merge(data)
end

def monitoring_interval
5
def needs_event_loop_for_pulse_check?
false
end
end

Expand Down
4 changes: 4 additions & 0 deletions model/minio/minio_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def check_pulse(session:, previous_pulse:)
pulse
end

def needs_event_loop_for_pulse_check?
true
end

def server_url
cluster.url || ip4_url
end
Expand Down
6 changes: 5 additions & 1 deletion model/postgres/postgres_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def init_health_monitor_session

def check_pulse(session:, previous_pulse:)
reading = begin
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres")
session[:db_connection] ||= Sequel.connect(adapter: "postgres", host: health_monitor_socket_path, user: "postgres", connect_timeout: 4)
lsn_function = primary? ? "pg_current_wal_lsn()" : "pg_last_wal_receive_lsn()"
last_known_lsn = session[:db_connection]["SELECT #{lsn_function} AS lsn"].first[:lsn]
"up"
Expand All @@ -160,6 +160,10 @@ def check_pulse(session:, previous_pulse:)
pulse
end

def needs_event_loop_for_pulse_check?
true
end

def health_monitor_socket_path
@health_monitor_socket_path ||= File.join(Dir.pwd, "var", "health_monitor_sockets", "pg_#{vm.ephemeral_net6.nth(2)}")
end
Expand Down
152 changes: 152 additions & 0 deletions spec/lib/monitorable_resource_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# frozen_string_literal: true

require_relative "../model/spec_helper"

RSpec.describe MonitorableResource do
let(:postgres_server) { PostgresServer.new { _1.id = "c068cac7-ed45-82db-bf38-a003582b36ee" } }
let(:r_w_event_loop) { described_class.new(postgres_server) }
let(:vm_host) { VmHost.new { _1.id = "46683a25-acb1-4371-afe9-d39f303e44b4" } }
let(:r_without_event_loop) { described_class.new(vm_host) }

describe "#open_resource_session" do
it "returns if session is not nil and pulse reading is up" do
r_w_event_loop.instance_variable_set(:@session, "not nil")
r_w_event_loop.instance_variable_set(:@pulse, {reading: "up"})

expect(postgres_server).not_to receive(:reload)
r_w_event_loop.open_resource_session
end

it "sets session to resource's init_health_monitor_session" do
expect(postgres_server).to receive(:reload).and_return(postgres_server)
expect(postgres_server).to receive(:init_health_monitor_session).and_return("session")
expect { r_w_event_loop.open_resource_session }.to change { r_w_event_loop.instance_variable_get(:@session) }.from(nil).to("session")
end

it "sets deleted to true if resource is deleted" do
expect(postgres_server).to receive(:reload).and_raise(Sequel::NoExistingObject)
expect { r_w_event_loop.open_resource_session }.to change(r_w_event_loop, :deleted).from(false).to(true)
end

it "ignores exception if it is not Sequel::NoExistingObject" do
expect(postgres_server).to receive(:reload).and_raise(StandardError)
expect { r_w_event_loop.open_resource_session }.not_to raise_error
end
end

describe "#process_event_loop" do
before do
# We are monkeypatching the sleep method here to avoid the actual sleep.
# We also use it to flip the @run_event_loop flag to true, so that the
# loop in the process_event_loop method can exit.
def r_w_event_loop.sleep(duration)
puts "sleeping for #{duration}"
@run_event_loop = true
end
end

it "returns if session is nil or resource does not need event loop" do
expect(Thread).not_to receive(:new)

# session is nil
r_w_event_loop.process_event_loop

# resource does not need event loop
r_without_event_loop.instance_variable_set(:@session, "not nil")
r_without_event_loop.process_event_loop
end

it "creates a new thread and runs the event loop" do
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
r_w_event_loop.instance_variable_set(:@session, session)
expect(Thread).to receive(:new).and_yield
expect(session[:ssh_session]).to receive(:loop)
r_w_event_loop.process_event_loop
end

it "swallows exception and logs it if event loop fails" do
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
r_w_event_loop.instance_variable_set(:@session, session)
expect(Thread).to receive(:new).and_yield
expect(session[:ssh_session]).to receive(:loop).and_raise(StandardError)
expect(Clog).to receive(:emit)
expect(r_w_event_loop).to receive(:close_resource_session)
r_w_event_loop.process_event_loop
end
end

describe "#check_pulse" do
it "calls check_pulse on resource and sets pulse" do
expect(postgres_server).to receive(:check_pulse).and_return({reading: "up"})
expect { r_w_event_loop.check_pulse }.to change { r_w_event_loop.instance_variable_get(:@pulse) }.from({}).to({reading: "up"})
end

it "swallows exception and logs it if check_pulse fails" do
expect(vm_host).to receive(:check_pulse).and_raise(StandardError)
expect(Clog).to receive(:emit)
expect { r_without_event_loop.check_pulse }.not_to raise_error
end

it "waits for the pulse thread to finish" do
pulse_thread = Thread.new {}
expect(pulse_thread).to receive(:join)
r_w_event_loop.instance_variable_set(:@pulse_thread, pulse_thread)
r_w_event_loop.check_pulse
end
end

describe "#close_resource_session" do
it "returns if session is nil" do
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
expect(session[:ssh_session]).not_to receive(:shutdown!)
expect(session).to receive(:nil?).and_return(true)
r_w_event_loop.instance_variable_set(:@session, session)
r_w_event_loop.close_resource_session
end

it "shuts down and closes the session" do
session = {ssh_session: instance_double(Net::SSH::Connection::Session)}
expect(session[:ssh_session]).to receive(:shutdown!)
expect(session[:ssh_session]).to receive(:close)
r_w_event_loop.instance_variable_set(:@session, session)
r_w_event_loop.close_resource_session
end
end

describe "#force_stop_if_stuck" do
it "does nothing if pulse check is not stuck" do
expect(Kernel).not_to receive(:exit!)

# not locked
r_w_event_loop.force_stop_if_stuck

# not timed out
r_w_event_loop.instance_variable_get(:@mutex).lock
r_w_event_loop.instance_variable_set(:@pulse_check_started_at, Time.now)
r_w_event_loop.force_stop_if_stuck
r_w_event_loop.instance_variable_get(:@mutex).unlock
end

it "triggers Kernel.exit if pulse check is stuck" do
expect(ThreadPrinter).to receive(:run)
expect(Kernel).to receive(:exit!)

r_w_event_loop.instance_variable_get(:@mutex).lock
r_w_event_loop.instance_variable_set(:@pulse_check_started_at, Time.now - 200)
r_w_event_loop.force_stop_if_stuck
r_w_event_loop.instance_variable_get(:@mutex).unlock
end
end

describe "#lock_no_wait" do
it "does not yield if mutex is locked" do
r_w_event_loop.instance_variable_get(:@mutex).lock
expect { |b| r_w_event_loop.lock_no_wait(&b) }.not_to yield_control
r_w_event_loop.instance_variable_get(:@mutex).unlock
end

it "yields if mutex is not locked" do
expect { |b| r_w_event_loop.lock_no_wait(&b) }.to yield_control
end
end
end
4 changes: 4 additions & 0 deletions spec/model/minio/minio_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
expect(ms.endpoint).to eq("minio-cluster-name0.minio.ubicloud.com:9000")
end

it "needs event loop for pulse check" do
expect(ms).to be_needs_event_loop_for_pulse_check
end

describe "#url" do
before do
minio_project = Project.create_with_id(name: "default").tap { _1.associate_with_project(_1) }
Expand Down

0 comments on commit 19ec0b3

Please sign in to comment.