diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 69a60a65e77a2500f85c92cf98d03b45b9d506ee..4a8d72e37a51690920d4ecab7b9ba048b40ee149 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -22,7 +22,7 @@ config.server_middleware do |chain| chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs - chain.add Gitlab::SidekiqMiddleware::Shutdown + chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0' chain.add Gitlab::SidekiqMiddleware::BatchLoader chain.add Gitlab::SidekiqMiddleware::CorrelationLogger diff --git a/lib/gitlab/gitaly_client.rb b/lib/gitlab/gitaly_client.rb index 5aeedb0f50dda3dbe8257eefe0ba80acd90d8d75..869b835b61e7238b0748cfda750207ecdd563d02 100644 --- a/lib/gitlab/gitaly_client.rb +++ b/lib/gitlab/gitaly_client.rb @@ -164,8 +164,6 @@ def self.call(storage, service, rpc, request, remote_storage: nil, timeout: nil) kwargs = yield(kwargs) if block_given? stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend - rescue GRPC::Unavailable => ex - handle_grpc_unavailable!(ex) ensure duration = Gitlab::Metrics::System.monotonic_time - start @@ -178,27 +176,6 @@ def self.call(storage, service, rpc, request, remote_storage: nil, timeout: nil) add_call_details(feature: "#{service}##{rpc}", duration: duration, request: request_hash, rpc: rpc) end - def self.handle_grpc_unavailable!(ex) - status = ex.to_status - raise ex unless status.details == 'Endpoint read failed' - - # There is a bug in grpc 1.8.x that causes a client process to get stuck - # always raising '14:Endpoint read failed'. The only thing that we can - # do to recover is to restart the process. - # - # See https://gitlab.com/gitlab-org/gitaly/issues/1029 - - if Sidekiq.server? - raise Gitlab::SidekiqMiddleware::Shutdown::WantShutdown.new(ex.to_s) - else - # SIGQUIT requests a Unicorn worker to shut down gracefully after the current request. - Process.kill('QUIT', Process.pid) - end - - raise ex - end - private_class_method :handle_grpc_unavailable! - def self.current_transaction_labels Gitlab::Metrics::Transaction.current&.labels || {} end diff --git a/lib/gitlab/sidekiq_middleware/memory_killer.rb b/lib/gitlab/sidekiq_middleware/memory_killer.rb new file mode 100644 index 0000000000000000000000000000000000000000..ad4112ba555f0096ce92c8d82301708c1625a014 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/memory_killer.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + class MemoryKiller + # Default the RSS limit to 0, meaning the MemoryKiller is disabled + MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i + # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit + GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i + # Wait 30 seconds for running jobs to finish during graceful shutdown + SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i + + # Create a mutex used to ensure there will be only one thread waiting to + # shut Sidekiq down + MUTEX = Mutex.new + + def call(worker, job, queue) + yield + + current_rss = get_rss + + return unless MAX_RSS > 0 && current_rss > MAX_RSS + + Thread.new do + # Return if another thread is already waiting to shut Sidekiq down + next unless MUTEX.try_lock + + Sidekiq.logger.warn "Sidekiq worker PID-#{pid} current RSS #{current_rss}"\ + " exceeds maximum RSS #{MAX_RSS} after finishing job #{worker.class} JID-#{job['jid']}" + Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later" + + # Wait `GRACE_TIME` to give the memory intensive job time to finish. + # Then, tell Sidekiq to stop fetching new jobs. + wait_and_signal(GRACE_TIME, 'SIGSTP', 'stop fetching new jobs') + + # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish. + # Then, tell Sidekiq to gracefully shut down by giving jobs a few more + # moments to finish, killing and requeuing them if they didn't, and + # then terminating itself. + wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down') + + # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't. + wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die') + end + end + + private + + def get_rss + output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s) + return 0 unless status.zero? + + output.to_i + end + + def wait_and_signal(time, signal, explanation) + Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})" + sleep(time) + + Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})" + Process.kill(signal, pid) + end + + def pid + Process.pid + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/shutdown.rb b/lib/gitlab/sidekiq_middleware/shutdown.rb deleted file mode 100644 index 19f3be83bcebc5bf512f130fed6bab1ac8cedbb2..0000000000000000000000000000000000000000 --- a/lib/gitlab/sidekiq_middleware/shutdown.rb +++ /dev/null @@ -1,135 +0,0 @@ -# frozen_string_literal: true - -require 'mutex_m' - -module Gitlab - module SidekiqMiddleware - class Shutdown - extend Mutex_m - - # Default the RSS limit to 0, meaning the MemoryKiller is disabled - MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i - # Give Sidekiq 15 minutes of grace time after exceeding the RSS limit - GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i - # Wait 30 seconds for running jobs to finish during graceful shutdown - SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i - - # This exception can be used to request that the middleware start shutting down Sidekiq - WantShutdown = Class.new(StandardError) - - ShutdownWithoutRaise = Class.new(WantShutdown) - private_constant :ShutdownWithoutRaise - - # For testing only, to avoid race conditions (?) in Rspec mocks. - attr_reader :trace - - # We store the shutdown thread in a class variable to ensure that there - # can be only one shutdown thread in the process. - def self.create_shutdown_thread - mu_synchronize do - break unless @shutdown_thread.nil? - - @shutdown_thread = Thread.new { yield } - end - end - - # For testing only: so we can wait for the shutdown thread to finish. - def self.shutdown_thread - mu_synchronize { @shutdown_thread } - end - - # For testing only: so that we can reset the global state before each test. - def self.clear_shutdown_thread - mu_synchronize { @shutdown_thread = nil } - end - - def initialize - @trace = Queue.new if Rails.env.test? - end - - def call(worker, job, queue) - shutdown_exception = nil - - begin - yield - check_rss! - rescue WantShutdown => ex - shutdown_exception = ex - end - - return unless shutdown_exception - - self.class.create_shutdown_thread do - do_shutdown(worker, job, shutdown_exception) - end - - raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise) - end - - private - - def do_shutdown(worker, job, shutdown_exception) - Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\ - "#{worker.class} JID-#{job['jid']}" - Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later" - - # Wait `GRACE_TIME` to give the memory intensive job time to finish. - # Then, tell Sidekiq to stop fetching new jobs. - wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs') - - # Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish. - # Then, tell Sidekiq to gracefully shut down by giving jobs a few more - # moments to finish, killing and requeuing them if they didn't, and - # then terminating itself. - wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down') - - # Wait for Sidekiq to shutdown gracefully, and kill it if it didn't. - wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die') - end - - def check_rss! - return unless MAX_RSS > 0 - - current_rss = get_rss - return unless current_rss > MAX_RSS - - raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}") - end - - def get_rss - output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s) - return 0 unless status.zero? - - output.to_i - end - - def wait_and_signal(time, signal, explanation) - Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})" - sleep(time) - - Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})" - kill(signal, pid) - end - - def pid - Process.pid - end - - def sleep(time) - if Rails.env.test? - @trace << [:sleep, time] - else - Kernel.sleep(time) - end - end - - def kill(signal, pid) - if Rails.env.test? - @trace << [:kill, signal, pid] - else - Process.kill(signal, pid) - end - end - end - end -end diff --git a/spec/lib/gitlab/sidekiq_middleware/memory_killer_spec.rb b/spec/lib/gitlab/sidekiq_middleware/memory_killer_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..8fdbbacd04d9744e7acaf69ab4aab3d53ce3051e --- /dev/null +++ b/spec/lib/gitlab/sidekiq_middleware/memory_killer_spec.rb @@ -0,0 +1,63 @@ +require 'spec_helper' + +describe Gitlab::SidekiqMiddleware::MemoryKiller do + subject { described_class.new } + let(:pid) { 999 } + + let(:worker) { double(:worker, class: 'TestWorker') } + let(:job) { { 'jid' => 123 } } + let(:queue) { 'test_queue' } + + def run + thread = subject.call(worker, job, queue) { nil } + thread&.join + end + + before do + allow(subject).to receive(:get_rss).and_return(10.kilobytes) + allow(subject).to receive(:pid).and_return(pid) + end + + context 'when MAX_RSS is set to 0' do + before do + stub_const("#{described_class}::MAX_RSS", 0) + end + + it 'does nothing' do + expect(subject).not_to receive(:sleep) + + run + end + end + + context 'when MAX_RSS is exceeded' do + before do + stub_const("#{described_class}::MAX_RSS", 5.kilobytes) + end + + it 'sends the STP, TERM and KILL signals at expected times' do + expect(subject).to receive(:sleep).with(15 * 60).ordered + expect(Process).to receive(:kill).with('SIGSTP', pid).ordered + + expect(subject).to receive(:sleep).with(30).ordered + expect(Process).to receive(:kill).with('SIGTERM', pid).ordered + + expect(subject).to receive(:sleep).with(10).ordered + expect(Process).to receive(:kill).with('SIGKILL', pid).ordered + + run + end + end + + context 'when MAX_RSS is not exceeded' do + before do + stub_const("#{described_class}::MAX_RSS", 15.kilobytes) + end + + it 'does nothing' do + expect(subject).not_to receive(:sleep) + + run + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/shutdown_spec.rb b/spec/lib/gitlab/sidekiq_middleware/shutdown_spec.rb deleted file mode 100644 index 0001795c3f06a3cba5fc6a7010d6fe8b68dcd94f..0000000000000000000000000000000000000000 --- a/spec/lib/gitlab/sidekiq_middleware/shutdown_spec.rb +++ /dev/null @@ -1,88 +0,0 @@ -require 'spec_helper' - -describe Gitlab::SidekiqMiddleware::Shutdown do - subject { described_class.new } - - let(:pid) { Process.pid } - let(:worker) { double(:worker, class: 'TestWorker') } - let(:job) { { 'jid' => 123 } } - let(:queue) { 'test_queue' } - let(:block) { proc { nil } } - - def run - subject.call(worker, job, queue) { block.call } - described_class.shutdown_thread&.join - end - - def pop_trace - subject.trace.pop(true) - end - - before do - allow(subject).to receive(:get_rss).and_return(10.kilobytes) - described_class.clear_shutdown_thread - end - - context 'when MAX_RSS is set to 0' do - before do - stub_const("#{described_class}::MAX_RSS", 0) - end - - it 'does nothing' do - expect(subject).not_to receive(:sleep) - - run - end - end - - def expect_shutdown_sequence - expect(pop_trace).to eq([:sleep, 15 * 60]) - expect(pop_trace).to eq([:kill, 'SIGTSTP', pid]) - - expect(pop_trace).to eq([:sleep, 30]) - expect(pop_trace).to eq([:kill, 'SIGTERM', pid]) - - expect(pop_trace).to eq([:sleep, 10]) - expect(pop_trace).to eq([:kill, 'SIGKILL', pid]) - end - - context 'when MAX_RSS is exceeded' do - before do - stub_const("#{described_class}::MAX_RSS", 5.kilobytes) - end - - it 'sends the TSTP, TERM and KILL signals at expected times' do - run - - expect_shutdown_sequence - end - end - - context 'when MAX_RSS is not exceeded' do - before do - stub_const("#{described_class}::MAX_RSS", 15.kilobytes) - end - - it 'does nothing' do - expect(subject).not_to receive(:sleep) - - run - end - end - - context 'when WantShutdown is raised' do - let(:block) { proc { raise described_class::WantShutdown } } - - it 'starts the shutdown sequence and re-raises the exception' do - expect { run }.to raise_exception(described_class::WantShutdown) - - # We can't expect 'run' to have joined on the shutdown thread, because - # it hit an exception. - shutdown_thread = described_class.shutdown_thread - expect(shutdown_thread).not_to be_nil - shutdown_thread.join - - expect_shutdown_sequence - end - end -end