register_job_service.rb 5.6 KB
Newer Older
1 2
# frozen_string_literal: true

Douwe Maan's avatar
Douwe Maan committed
3 4 5
module Ci
  # This class responsible for assigning
  # proper pending build to runner on runner API request
6
  class RegisterJobService
7 8
    attr_reader :runner

9 10 11
    JOB_QUEUE_DURATION_SECONDS_BUCKETS = [1, 3, 10, 30].freeze
    JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET = 5.freeze

12 13
    Result = Struct.new(:build, :valid?)

14 15 16 17
    def initialize(runner)
      @runner = runner
    end

18
    def execute(params = {})
19
      builds =
20
        if runner.instance_type?
21
          builds_for_shared_runner
22
        elsif runner.group_type?
23 24 25 26
          builds_for_group_runner
        else
          builds_for_project_runner
        end
Douwe Maan's avatar
Douwe Maan committed
27

28 29
      valid = true

30 31
      # pick builds that does not have other tags than runner's one
      builds = builds.matches_tag_ids(runner.tags.ids)
32

33 34 35
      # pick builds that have at least one tag
      unless runner.run_untagged?
        builds = builds.with_any_tags
36 37
      end

38
      builds.find do |build|
39 40 41 42 43
        next unless runner.can_pick?(build)

        begin
          # In case when 2 runners try to assign the same build, second runner will be declined
          # with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method.
44 45
          begin
            build.runner_id = runner.id
46 47
            build.runner_session_attributes = params[:session] if params[:session].present?

48 49
            build.run!
            register_success(build)
50

51
            return Result.new(build, true) # rubocop:disable Cop/AvoidReturnFromBlocks
52 53 54
          rescue Ci::Build::MissingDependenciesError
            build.drop!(:missing_dependency_failure)
          end
55 56 57 58 59 60 61 62 63 64 65 66
        rescue StateMachines::InvalidTransition, ActiveRecord::StaleObjectError
          # We are looping to find another build that is not conflicting
          # It also indicates that this build can be picked and passed to runner.
          # If we don't do it, basically a bunch of runners would be competing for a build
          # and thus we will generate a lot of 409. This will increase
          # the number of generated requests, also will reduce significantly
          # how many builds can be picked by runner in a unit of time.
          # In case we hit the concurrency-access lock,
          # we still have to return 409 in the end,
          # to make sure that this is properly handled by runner.
          valid = false
        end
Douwe Maan's avatar
Douwe Maan committed
67 68
      end

69
      register_failure
70
      Result.new(nil, valid)
Douwe Maan's avatar
Douwe Maan committed
71
    end
Kamil Trzcinski's avatar
Kamil Trzcinski committed
72 73 74

    private

75 76 77 78 79 80 81 82 83 84 85 86
    def builds_for_shared_runner
      new_builds.
        # don't run projects which have not enabled shared runners and builds
        joins(:project).where(projects: { shared_runners_enabled: true, pending_delete: false })
        .joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
        .where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0').

      # Implement fair scheduling
      # this returns builds that are ordered by number of running builds
      # we prefer projects that don't use shared runners at all
      joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
        .order('COALESCE(project_builds.running_builds, 0) ASC', 'ci_builds.id ASC')
87 88
    end

89
    def builds_for_project_runner
90
      new_builds.where(project: runner.projects.without_deleted.with_builds_enabled).order('id ASC')
91 92
    end

93
    def builds_for_group_runner
94
      # Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
95
      groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
96

97
      hierarchy_groups = Gitlab::GroupHierarchy.new(groups).base_and_descendants
98
      projects = Project.where(namespace_id: hierarchy_groups)
99 100 101
        .with_group_runners_enabled
        .with_builds_enabled
        .without_deleted
102
      new_builds.where(project: projects).order('id ASC')
103 104 105
    end

    def running_builds_for_shared_runners
106
      Ci::Build.running.where(runner: Ci::Runner.instance_type)
107
        .group(:project_id).select(:project_id, 'count(*) AS running_builds')
Kamil Trzcinski's avatar
Kamil Trzcinski committed
108
    end
109

110 111
    def new_builds
      builds = Ci::Build.pending.unstarted
112
      builds = builds.ref_protected if runner.ref_protected?
113
      builds
114
    end
115 116

    def register_failure
117 118
      failed_attempt_counter.increment
      attempt_counter.increment
119 120 121
    end

    def register_success(job)
122
      labels = { shared_runner: runner.instance_type?,
123 124
                 jobs_running_for_project: jobs_running_for_project(job) }

125
      job_queue_duration_seconds.observe(labels, Time.now - job.queued_at) unless job.queued_at.nil?
126
      attempt_counter.increment
127 128
    end

129
    def jobs_running_for_project(job)
130
      return '+Inf' unless runner.instance_type?
131 132

      # excluding currently started job
133
      running_jobs_count = job.project.builds.running.where(runner: Ci::Runner.instance_type)
134 135 136 137
                              .limit(JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET + 1).count - 1
      running_jobs_count < JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET ? running_jobs_count : "#{JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET}+"
    end

138 139 140 141 142 143 144 145 146
    def failed_attempt_counter
      @failed_attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_failed_total, "Counts the times a runner tries to register a job")
    end

    def attempt_counter
      @attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_total, "Counts the times a runner tries to register a job")
    end

    def job_queue_duration_seconds
147
      @job_queue_duration_seconds ||= Gitlab::Metrics.histogram(:job_queue_duration_seconds, 'Request handling execution time', {}, JOB_QUEUE_DURATION_SECONDS_BUCKETS)
148
    end
Douwe Maan's avatar
Douwe Maan committed
149 150
  end
end