Commit 7bc16889 authored by Gabriel Mazetto's avatar Gabriel Mazetto
Browse files

Refactor Storage Migration

Specs were reviewed and improved to better cover the current behavior.
There was some standardization done as well to facilitate the
implementation of the rollback functionality.

StorageMigratorWorker was extracted to HashedStorage namespace were
RollbackerWorker will live one as well.
parent b88f27c8
......@@ -34,22 +34,22 @@ def execute
private
def move_folder!(old_disk_path, new_disk_path)
unless File.directory?(old_disk_path)
logger.info("Skipped attachments migration from '#{old_disk_path}' to '#{new_disk_path}', source path doesn't exist or is not a directory (PROJECT_ID=#{project.id})")
return
def move_folder!(old_path, new_path)
unless File.directory?(old_path)
logger.info("Skipped attachments migration from '#{old_path}' to '#{new_path}', source path doesn't exist or is not a directory (PROJECT_ID=#{project.id})")
return true
end
if File.exist?(new_disk_path)
logger.error("Cannot migrate attachments from '#{old_disk_path}' to '#{new_disk_path}', target path already exist (PROJECT_ID=#{project.id})")
raise AttachmentMigrationError, "Target path '#{new_disk_path}' already exist"
if File.exist?(new_path)
logger.error("Cannot migrate attachments from '#{old_path}' to '#{new_path}', target path already exist (PROJECT_ID=#{project.id})")
raise AttachmentMigrationError, "Target path '#{new_path}' already exist"
end
# Create hashed storage base path folder
FileUtils.mkdir_p(File.dirname(new_disk_path))
FileUtils.mkdir_p(File.dirname(new_path))
FileUtils.mv(old_disk_path, new_disk_path)
logger.info("Migrated project attachments from '#{old_disk_path}' to '#{new_disk_path}' (PROJECT_ID=#{project.id})")
FileUtils.mv(old_path, new_path)
logger.info("Migrated project attachments from '#{old_path}' to '#{new_path}' (PROJECT_ID=#{project.id})")
true
end
......
......@@ -14,16 +14,26 @@ def initialize(project, old_disk_path, logger: nil)
def execute
# Migrate repository from Legacy to Hashed Storage
unless project.hashed_storage?(:repository)
return unless HashedStorage::MigrateRepositoryService.new(project, old_disk_path, logger: logger).execute
return false unless migrate_repository
end
# Migrate attachments from Legacy to Hashed Storage
unless project.hashed_storage?(:attachments)
HashedStorage::MigrateAttachmentsService.new(project, old_disk_path, logger: logger).execute
return false unless migrate_attachments
end
true
end
private
def migrate_repository
HashedStorage::MigrateRepositoryService.new(project, old_disk_path, logger: logger).execute
end
def migrate_attachments
HashedStorage::MigrateAttachmentsService.new(project, old_disk_path, logger: logger).execute
end
end
end
end
......@@ -45,6 +45,8 @@
- github_importer:github_import_stage_import_pull_requests
- github_importer:github_import_stage_import_repository
- hashed_storage:hashed_storage_migrator
- mail_scheduler:mail_scheduler_issue_due
- mail_scheduler:mail_scheduler_notification_service
......@@ -131,7 +133,6 @@
- repository_fork
- repository_import
- repository_remove_remote
- storage_migrator
- system_hook_push
- update_merge_requests
- upload_checksum
......
# frozen_string_literal: true
module HashedStorage
class MigratorWorker
include ApplicationWorker
queue_namespace :hashed_storage
# @param [Integer] start initial ID of the batch
# @param [Integer] finish last ID of the batch
def perform(start, finish)
migrator = Gitlab::HashedStorage::Migrator.new
migrator.bulk_migrate(start: start, finish: finish)
end
end
end
......@@ -4,21 +4,25 @@ class ProjectMigrateHashedStorageWorker
include ApplicationWorker
LEASE_TIMEOUT = 30.seconds.to_i
LEASE_KEY_SEGMENT = 'project_migrate_hashed_storage_worker'.freeze
# rubocop: disable CodeReuse/ActiveRecord
def perform(project_id, old_disk_path = nil)
project = Project.find_by(id: project_id)
return if project.nil? || project.pending_delete?
uuid = lease_for(project_id).try_obtain
if uuid
::Projects::HashedStorage::MigrationService.new(project, old_disk_path || project.full_path, logger: logger).execute
project = Project.find_by(id: project_id)
return if project.nil? || project.pending_delete?
old_disk_path ||= project.disk_path
::Projects::HashedStorage::MigrationService.new(project, old_disk_path, logger: logger).execute
else
false
return false
end
rescue => ex
ensure
cancel_lease_for(project_id, uuid) if uuid
raise ex
end
# rubocop: enable CodeReuse/ActiveRecord
......@@ -29,7 +33,8 @@ def lease_for(project_id)
private
def lease_key(project_id)
"project_migrate_hashed_storage_worker:#{project_id}"
# we share the same lease key for both migration and rollback so they don't run simultaneously
"#{LEASE_KEY_SEGMENT}:#{project_id}"
end
def cancel_lease_for(project_id, uuid)
......
# frozen_string_literal: true
class StorageMigratorWorker
include ApplicationWorker
# @param [Integer] start initial ID of the batch
# @param [Integer] finish last ID of the batch
# @param [String] operation the operation to be performed: ['migrate', 'rollback']
def perform(start, finish, operation = :migrate)
# when scheduling a job symbols are converted to string, we need to convert back
operation = operation.to_sym if operation
migrator = Gitlab::HashedStorage::Migrator.new
migrator.bulk_migrate(start: start, finish: finish, operation: operation)
end
end
......@@ -68,7 +68,7 @@
- [background_migration, 1]
- [gcp_cluster, 1]
- [project_migrate_hashed_storage, 1]
- [storage_migrator, 1]
- [hashed_storage, 1]
- [pages_domain_verification, 1]
- [object_storage_upload, 1]
- [object_storage, 1]
......
......@@ -13,35 +13,28 @@ class Migrator
#
# @param [Integer] start first project id for the range
# @param [Integer] finish last project id for the range
# @param [Symbol] operation [:migrate, :rollback]
def bulk_schedule(start:, finish:, operation: :migrate)
StorageMigratorWorker.perform_async(start, finish, operation)
def bulk_schedule(start:, finish:)
::HashedStorage::MigratorWorker.perform_async(start, finish)
end
# Start migration of projects from specified range
#
# Flagging a project to be migrated is a synchronous action,
# Flagging a project to be migrated is a synchronous action
# but the migration runs through async jobs
#
# @param [Integer] start first project id for the range
# @param [Integer] finish last project id for the range
# @param [Symbol] operation [:migrate, :rollback]
# rubocop: disable CodeReuse/ActiveRecord
def bulk_migrate(start:, finish:, operation: :migrate)
def bulk_migrate(start:, finish:)
projects = build_relation(start, finish)
projects.with_route.find_each(batch_size: BATCH_SIZE) do |project|
case operation
when :migrate
migrate(project)
when :rollback
rollback(project)
end
migrate(project)
end
end
# rubocop: enable CodeReuse/ActiveRecord
# Flag a project to be migrated
# Flag a project to be migrated to Hashed Storage
#
# @param [Project] project that will be migrated
def migrate(project)
......
......@@ -37,7 +37,7 @@ namespace :gitlab do
print "Enqueuing migration of #{legacy_projects_count} projects in batches of #{helper.batch_size}"
helper.project_id_batches do |start, finish|
storage_migrator.bulk_schedule(start: start, finish: finish, operation: :migrate)
storage_migrator.bulk_schedule(start: start, finish: finish)
print '.'
end
......
......@@ -4,7 +4,7 @@
describe '#bulk_schedule' do
it 'schedules job to StorageMigratorWorker' do
Sidekiq::Testing.fake! do
expect { subject.bulk_schedule(start: 1, finish: 5) }.to change(StorageMigratorWorker.jobs, :size).by(1)
expect { subject.bulk_schedule(start: 1, finish: 5) }.to change(HashedStorage::MigratorWorker.jobs, :size).by(1)
end
end
end
......@@ -46,7 +46,7 @@
describe '#migrate' do
let(:project) { create(:project, :legacy_storage, :empty_repo) }
it 'enqueues job to ProjectMigrateHashedStorageWorker' do
it 'enqueues project migration job' do
Sidekiq::Testing.fake! do
expect { subject.migrate(project) }.to change(ProjectMigrateHashedStorageWorker.jobs, :size).by(1)
end
......@@ -58,7 +58,7 @@
expect { subject.migrate(project) }.not_to raise_error
end
it 'migrate project' do
it 'migrates project storage' do
perform_enqueued_jobs do
subject.migrate(project)
end
......@@ -73,5 +73,19 @@
expect(project.reload.repository_read_only?).to be_falsey
end
context 'when project is already on hashed storage' do
let(:project) { create(:project, :empty_repo) }
it 'doesnt enqueue any migration job' do
Sidekiq::Testing.fake! do
expect { subject.migrate(project) }.not_to change(ProjectMigrateHashedStorageWorker.jobs, :size)
end
end
it 'returns false' do
expect(subject.migrate(project)).to be_falsey
end
end
end
end
......@@ -3224,7 +3224,7 @@ def enable_lfs
end
context 'legacy storage' do
let(:project) { create(:project, :repository, :legacy_storage) }
set(:project) { create(:project, :repository, :legacy_storage) }
let(:gitlab_shell) { Gitlab::Shell.new }
let(:project_storage) { project.send(:storage) }
......@@ -3279,13 +3279,14 @@ def enable_lfs
end
describe '#migrate_to_hashed_storage!' do
let(:project) { create(:project, :empty_repo, :legacy_storage) }
it 'returns true' do
expect(project.migrate_to_hashed_storage!).to be_truthy
end
it 'does not validate project visibility' do
expect(project).not_to receive(:visibility_level_allowed_as_fork)
expect(project).not_to receive(:visibility_level_allowed_by_group)
it 'does not run validation' do
expect(project).not_to receive(:valid?)
project.migrate_to_hashed_storage!
end
......@@ -3315,7 +3316,7 @@ def enable_lfs
end
context 'hashed storage' do
let(:project) { create(:project, :repository, skip_disk_validation: true) }
set(:project) { create(:project, :repository, skip_disk_validation: true) }
let(:gitlab_shell) { Gitlab::Shell.new }
let(:hash) { Digest::SHA2.hexdigest(project.id.to_s) }
let(:hashed_prefix) { File.join('@hashed', hash[0..1], hash[2..3]) }
......@@ -3372,6 +3373,8 @@ def enable_lfs
end
describe '#migrate_to_hashed_storage!' do
let(:project) { create(:project, :repository, skip_disk_validation: true) }
it 'returns nil' do
expect(project.migrate_to_hashed_storage!).to be_nil
end
......@@ -3381,10 +3384,12 @@ def enable_lfs
end
context 'when partially migrated' do
it 'returns true' do
it 'enqueues a job' do
project = create(:project, storage_version: 1, skip_disk_validation: true)
expect(project.migrate_to_hashed_storage!).to be_truthy
Sidekiq::Testing.fake! do
expect { project.migrate_to_hashed_storage! }.to change(ProjectMigrateHashedStorageWorker.jobs, :size).by(1)
end
end
end
end
......
......@@ -3,7 +3,7 @@
describe Projects::HashedStorage::MigrateAttachmentsService do
subject(:service) { described_class.new(project, project.full_path, logger: nil) }
let(:project) { create(:project, :legacy_storage) }
let(:project) { create(:project, :repository, storage_version: 1, skip_disk_validation: true) }
let(:legacy_storage) { Storage::LegacyProject.new(project) }
let(:hashed_storage) { Storage::HashedProject.new(project) }
......@@ -28,6 +28,10 @@
expect(File.file?(old_disk_path)).to be_falsey
expect(File.file?(new_disk_path)).to be_truthy
end
it 'returns true' do
expect(service.execute).to be_truthy
end
end
context 'when original folder does not exist anymore' do
......@@ -43,6 +47,12 @@
expect(File.exist?(base_path(hashed_storage))).to be_falsey
expect(File.file?(new_disk_path)).to be_falsey
end
it 'returns true' do
expect(FileUtils).not_to receive(:mv).with(base_path(legacy_storage), base_path(hashed_storage))
expect(service.execute).to be_truthy
end
end
context 'when target folder already exists' do
......@@ -58,6 +68,18 @@
end
end
context '#old_disk_path' do
it 'returns old disk_path for project' do
expect(service.old_disk_path).to eq(project.full_path)
end
end
context '#new_disk_path' do
it 'returns new disk_path for project' do
expect(service.new_disk_path).to eq(project.disk_path)
end
end
def base_path(storage)
File.join(FileUploader.root, storage.disk_path)
end
......
......@@ -8,9 +8,12 @@
let(:legacy_storage) { Storage::LegacyProject.new(project) }
let(:hashed_storage) { Storage::HashedProject.new(project) }
subject(:service) { described_class.new(project, project.full_path) }
subject(:service) { described_class.new(project, project.disk_path) }
describe '#execute' do
let(:old_disk_path) { legacy_storage.disk_path }
let(:new_disk_path) { hashed_storage.disk_path }
before do
allow(service).to receive(:gitlab_shell) { gitlab_shell }
end
......@@ -33,8 +36,8 @@
it 'renames project and wiki repositories' do
service.execute
expect(gitlab_shell.exists?(project.repository_storage, "#{hashed_storage.disk_path}.git")).to be_truthy
expect(gitlab_shell.exists?(project.repository_storage, "#{hashed_storage.disk_path}.wiki.git")).to be_truthy
expect(gitlab_shell.exists?(project.repository_storage, "#{new_disk_path}.git")).to be_truthy
expect(gitlab_shell.exists?(project.repository_storage, "#{new_disk_path}.wiki.git")).to be_truthy
end
it 'updates project to be hashed and not read-only' do
......@@ -45,8 +48,8 @@
end
it 'move operation is called for both repositories' do
expect_move_repository(project.disk_path, hashed_storage.disk_path)
expect_move_repository("#{project.disk_path}.wiki", "#{hashed_storage.disk_path}.wiki")
expect_move_repository(old_disk_path, new_disk_path)
expect_move_repository("#{old_disk_path}.wiki", "#{new_disk_path}.wiki")
service.execute
end
......@@ -62,32 +65,27 @@
context 'when one move fails' do
it 'rollsback repositories to original name' do
from_name = project.disk_path
to_name = hashed_storage.disk_path
allow(service).to receive(:move_repository).and_call_original
allow(service).to receive(:move_repository).with(from_name, to_name).once { false } # will disable first move only
allow(service).to receive(:move_repository).with(old_disk_path, new_disk_path).once { false } # will disable first move only
expect(service).to receive(:rollback_folder_move).and_call_original
service.execute
expect(gitlab_shell.exists?(project.repository_storage, "#{hashed_storage.disk_path}.git")).to be_falsey
expect(gitlab_shell.exists?(project.repository_storage, "#{hashed_storage.disk_path}.wiki.git")).to be_falsey
expect(gitlab_shell.exists?(project.repository_storage, "#{new_disk_path}.git")).to be_falsey
expect(gitlab_shell.exists?(project.repository_storage, "#{new_disk_path}.wiki.git")).to be_falsey
expect(project.repository_read_only?).to be_falsey
end
context 'when rollback fails' do
let(:from_name) { legacy_storage.disk_path }
let(:to_name) { hashed_storage.disk_path }
before do
hashed_storage.ensure_storage_path_exists
gitlab_shell.mv_repository(project.repository_storage, from_name, to_name)
gitlab_shell.mv_repository(project.repository_storage, old_disk_path, new_disk_path)
end
it 'does not try to move nil repository over hashed' do
expect(gitlab_shell).not_to receive(:mv_repository).with(project.repository_storage, from_name, to_name)
expect_move_repository("#{project.disk_path}.wiki", "#{hashed_storage.disk_path}.wiki")
it 'does not try to move nil repository over existing' do
expect(gitlab_shell).not_to receive(:mv_repository).with(project.repository_storage, old_disk_path, new_disk_path)
expect_move_repository("#{old_disk_path}.wiki", "#{new_disk_path}.wiki")
service.execute
end
......
......@@ -58,7 +58,7 @@
context '0 legacy projects' do
it 'does nothing' do
expect(StorageMigratorWorker).not_to receive(:perform_async)
expect(::HashedStorage::MigratorWorker).not_to receive(:perform_async)
run_rake_task(task)
end
......@@ -72,9 +72,9 @@
stub_env('BATCH' => 1)
end
it 'enqueues one StorageMigratorWorker per project' do
it 'enqueues one HashedStorage::MigratorWorker per project' do
projects.each do |project|
expect(StorageMigratorWorker).to receive(:perform_async).with(project.id, project.id, :migrate)
expect(::HashedStorage::MigratorWorker).to receive(:perform_async).with(project.id, project.id)
end
run_rake_task(task)
......@@ -86,10 +86,10 @@
stub_env('BATCH' => 2)
end
it 'enqueues one StorageMigratorWorker per 2 projects' do
it 'enqueues one HashedStorage::MigratorWorker per 2 projects' do
projects.map(&:id).sort.each_slice(2) do |first, last|
last ||= first
expect(StorageMigratorWorker).to receive(:perform_async).with(first, last, :migrate)
expect(::HashedStorage::MigratorWorker).to receive(:perform_async).with(first, last)
end
run_rake_task(task)
......
require 'spec_helper'
describe StorageMigratorWorker do
describe HashedStorage::MigratorWorker do
subject(:worker) { described_class.new }
let(:projects) { create_list(:project, 2, :legacy_storage, :empty_repo) }
let(:ids) { projects.map(&:id) }
describe '#perform' do
it 'delegates to MigratorService' do
expect_any_instance_of(Gitlab::HashedStorage::Migrator).to receive(:bulk_migrate).with(start: 5, finish: 10, operation: :migrate)
expect_any_instance_of(Gitlab::HashedStorage::Migrator).to receive(:bulk_migrate).with(start: 5, finish: 10)
worker.perform(5, 10)
end
......
......@@ -4,12 +4,13 @@
include ExclusiveLeaseHelpers
describe '#perform' do
let(:project) { create(:project, :empty_repo) }
let(:project) { create(:project, :empty_repo, :legacy_storage) }
let(:lease_key) { "project_migrate_hashed_storage_worker:#{project.id}" }
let(:lease_timeout) { ProjectMigrateHashedStorageWorker::LEASE_TIMEOUT }
let(:lease_timeout) { described_class::LEASE_TIMEOUT }
let(:migration_service) { ::Projects::HashedStorage::MigrationService }
it 'skips when project no longer exists' do
expect(::Projects::HashedStorage::MigrationService).not_to receive(:new)
expect(migration_service).not_to receive(:new)
subject.perform(-1)
end
......@@ -17,29 +18,29 @@
it 'skips when project is pending delete' do
pending_delete_project = create(:project, :empty_repo, pending_delete: true)
expect(::Projects::HashedStorage::MigrationService).not_to receive(:new)
expect(migration_service).not_to receive(:new)
subject.perform(pending_delete_project.id)
end
it 'delegates removal to service class when have exclusive lease' do
it 'delegates migration to service class when we have exclusive lease' do
stub_exclusive_lease(lease_key, 'uuid', timeout: lease_timeout)
migration_service = spy
service_spy = spy
allow(::Projects::HashedStorage::MigrationService)
allow(migration_service)
.to receive(:new).with(project, project.full_path, logger: subject.logger)
.and_return(migration_service)
.and_return(service_spy)
subject.perform(project.id)
expect(migration_service).to have_received(:execute)
expect(service_spy).to have_received(:execute)
end
it 'skips when dont have lease when dont have exclusive lease' do
it 'skips when it cant acquire the exclusive lease' do
stub_exclusive_lease_taken(lease_key, timeout: lease_timeout)
expect(::Projects::HashedStorage::MigrationService).not_to receive(:new)
expect(migration_service).not_to receive(:new)
subject.perform(project.id)
end
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment