From 4cb0f379184f82a40917926a34ec35d8dd063d44 Mon Sep 17 00:00:00 2001 From: Jonathan Jogenfors Date: Thu, 7 Mar 2024 18:36:53 +0100 Subject: [PATCH] chore(server): Move library watcher to microservices (#7533) * move watcher init to micro * document watcher recovery * chore: fix lint * add try lock * use global library watch lock * fix: ensure lock stays on * fix: mocks * unit test for library watch lock * move statement to correct test * fix: correct return type of try lock * fix: tests * add library teardown * add chokidar error handler * make event strings an enum * wait for event refactor * refactor event type mocks * expect correct error * don't release lock in teardown * chore: lint * use enum * fix mock * fix lint * fix watcher await * remove await * simplify typing * remove async * Revert "remove async" This reverts commit 84ab5abac487532c79a7d770869b08fbba1294bf. * can now change watch settings at runtime * fix lint * only watch libraries if enabled --------- Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com> Co-authored-by: Alex Tran --- docs/docs/features/libraries.md | 10 ++++ .../jobs/specs/library-watcher.e2e-spec.ts | 25 ++++----- server/e2e/jobs/specs/library.e2e-spec.ts | 2 +- .../domain/library/library.service.spec.ts | 56 ++++++++++++------- server/src/domain/library/library.service.ts | 41 +++++++++++--- .../repositories/database.repository.ts | 2 + .../domain/repositories/storage.repository.ts | 8 +++ server/src/immich/app.service.ts | 7 --- .../infra/repositories/database.repository.ts | 10 ++++ .../infra/repositories/filesystem.provider.ts | 14 +++-- server/src/microservices/app.service.ts | 2 + server/src/test-utils/utils.ts | 42 ++++++++------ .../repositories/database.repository.mock.ts | 1 + .../repositories/storage.repository.mock.ts | 10 ++-- 14 files changed, 149 insertions(+), 81 deletions(-) diff --git a/docs/docs/features/libraries.md b/docs/docs/features/libraries.md index 58dd707ea0..0a68a79e0b 100644 --- a/docs/docs/features/libraries.md +++ b/docs/docs/features/libraries.md @@ -90,6 +90,16 @@ This feature - currently hidden in the config file - is considered experimental If your photos are on a network drive, automatic file watching likely won't work. In that case, you will have to rely on a periodic library refresh to pull in your changes. +#### Troubleshooting + +If you encounter an `ENOSPC` error, you need to increase your file watcher limit. In sysctl, this key is called `fs.inotify.max_user_watched` and has a default value of 8192. Increase this number to a suitable value greater than the number of files you will be watching. Note that Immich has to watch all files in your import paths including any ignored files. + +``` +ERROR [LibraryService] Library watcher for library c69faf55-f96d-4aa0-b83b-2d80cbc27d98 encountered error: Error: ENOSPC: System limit for number of file watchers reached, watch '/media/photo.jpg' +``` + +In rare cases, the library watcher can hang, preventing Immich from starting up. In this case, disable the library watcher in the configuration file. If the watcher is enabled from within Immich, the app must be started without the microservices. Disable the microservices in the docker compose file, start Immich, disable the library watcher in the admin settings, close Immich, re-enable the microservices, and then Immich can be started normally. + ### Nightly job There is an automatic job that's run once a day and refreshes all modified files in all libraries as well as cleans up any libraries stuck in deletion. diff --git a/server/e2e/jobs/specs/library-watcher.e2e-spec.ts b/server/e2e/jobs/specs/library-watcher.e2e-spec.ts index d22748e1c6..4757876898 100644 --- a/server/e2e/jobs/specs/library-watcher.e2e-spec.ts +++ b/server/e2e/jobs/specs/library-watcher.e2e-spec.ts @@ -1,4 +1,4 @@ -import { LibraryResponseDto, LibraryService, LoginResponseDto } from '@app/domain'; +import { LibraryResponseDto, LibraryService, LoginResponseDto, StorageEventType } from '@app/domain'; import { AssetType, LibraryType } from '@app/infra/entities'; import fs from 'node:fs/promises'; import path from 'node:path'; @@ -33,7 +33,7 @@ describe(`Library watcher (e2e)`, () => { }); afterEach(async () => { - await libraryService.unwatchAll(); + await libraryService.teardown(); }); afterAll(async () => { @@ -57,7 +57,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/file.jpg`, ); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD); const afterAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(afterAssets.length).toEqual(1); @@ -84,10 +84,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/file5.jPg`, ); - await waitForEvent(libraryService, 'add'); - await waitForEvent(libraryService, 'add'); - await waitForEvent(libraryService, 'add'); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD, 4); const afterAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(afterAssets.length).toEqual(4); @@ -99,7 +96,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/file.jpg`, ); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD); const originalAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(originalAssets.length).toEqual(1); @@ -109,7 +106,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/file.jpg`, ); - await waitForEvent(libraryService, 'change'); + await waitForEvent(libraryService, StorageEventType.CHANGE); const afterAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(afterAssets).toEqual([ @@ -161,9 +158,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/dir3/file4.jpg`, ); - await waitForEvent(libraryService, 'add'); - await waitForEvent(libraryService, 'add'); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD, 3); const assets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(assets.length).toEqual(3); @@ -175,14 +170,14 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/dir1/file.jpg`, ); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD); const addedAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(addedAssets.length).toEqual(1); await fs.unlink(`${IMMICH_TEST_ASSET_TEMP_PATH}/dir1/file.jpg`); - await waitForEvent(libraryService, 'unlink'); + await waitForEvent(libraryService, StorageEventType.UNLINK); const afterAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(afterAssets[0].isOffline).toEqual(true); @@ -220,7 +215,7 @@ describe(`Library watcher (e2e)`, () => { `${IMMICH_TEST_ASSET_TEMP_PATH}/dir4/file.jpg`, ); - await waitForEvent(libraryService, 'add'); + await waitForEvent(libraryService, StorageEventType.ADD); const afterAssets = await api.assetApi.getAllAssets(server, admin.accessToken); expect(afterAssets.length).toEqual(1); diff --git a/server/e2e/jobs/specs/library.e2e-spec.ts b/server/e2e/jobs/specs/library.e2e-spec.ts index 0657227f8d..6dca783c42 100644 --- a/server/e2e/jobs/specs/library.e2e-spec.ts +++ b/server/e2e/jobs/specs/library.e2e-spec.ts @@ -368,7 +368,7 @@ describe(`${LibraryController.name} (e2e)`, () => { expect(body).toEqual(errorStub.unauthorized); }); - it('should remvove offline files', async () => { + it('should remove offline files', async () => { await fs.promises.cp(`${IMMICH_TEST_ASSET_PATH}/albums/nature`, `${IMMICH_TEST_ASSET_TEMP_PATH}/albums/nature`, { recursive: true, }); diff --git a/server/src/domain/library/library.service.spec.ts b/server/src/domain/library/library.service.spec.ts index 720824b672..a44624c43a 100644 --- a/server/src/domain/library/library.service.spec.ts +++ b/server/src/domain/library/library.service.spec.ts @@ -9,11 +9,11 @@ import { newAccessRepositoryMock, newAssetRepositoryMock, newCryptoRepositoryMock, + newDatabaseRepositoryMock, newJobRepositoryMock, newLibraryRepositoryMock, newStorageRepositoryMock, newSystemConfigRepositoryMock, - newUserRepositoryMock, systemConfigStub, userStub, } from '@test'; @@ -23,11 +23,12 @@ import { ILibraryFileJob, ILibraryRefreshJob, JobName } from '../job'; import { IAssetRepository, ICryptoRepository, + IDatabaseRepository, IJobRepository, ILibraryRepository, IStorageRepository, ISystemConfigRepository, - IUserRepository, + StorageEventType, } from '../repositories'; import { SystemConfigCore } from '../system-config/system-config.core'; import { mapLibrary } from './library.dto'; @@ -40,20 +41,20 @@ describe(LibraryService.name, () => { let assetMock: jest.Mocked; let configMock: jest.Mocked; let cryptoMock: jest.Mocked; - let userMock: jest.Mocked; let jobMock: jest.Mocked; let libraryMock: jest.Mocked; let storageMock: jest.Mocked; + let databaseMock: jest.Mocked; beforeEach(() => { accessMock = newAccessRepositoryMock(); configMock = newSystemConfigRepositoryMock(); libraryMock = newLibraryRepositoryMock(); - userMock = newUserRepositoryMock(); assetMock = newAssetRepositoryMock(); jobMock = newJobRepositoryMock(); cryptoMock = newCryptoRepositoryMock(); storageMock = newStorageRepositoryMock(); + databaseMock = newDatabaseRepositoryMock(); // Always validate owner access for library. accessMock.library.checkOwnerAccess.mockImplementation((_, libraryIds) => Promise.resolve(libraryIds)); @@ -66,8 +67,10 @@ describe(LibraryService.name, () => { jobMock, libraryMock, storageMock, - userMock, + databaseMock, ); + + databaseMock.tryLock.mockResolvedValue(true); }); it('should work', () => { @@ -125,13 +128,22 @@ describe(LibraryService.name, () => { ); }); - it('should not initialize when watching is disabled', async () => { + it('should not initialize watcher when watching is disabled', async () => { configMock.load.mockResolvedValue(systemConfigStub.libraryWatchDisabled); await sut.init(); expect(storageMock.watch).not.toHaveBeenCalled(); }); + + it('should not initialize watcher when lock is taken', async () => { + configMock.load.mockResolvedValue(systemConfigStub.libraryWatchEnabled); + databaseMock.tryLock.mockResolvedValue(false); + + await sut.init(); + + expect(storageMock.watch).not.toHaveBeenCalled(); + }); }); describe('handleQueueAssetRefresh', () => { @@ -146,7 +158,6 @@ describe(LibraryService.name, () => { storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']); assetMock.getPathsNotInLibrary.mockResolvedValue(['/data/user1/photo.jpg']); assetMock.getByLibraryId.mockResolvedValue([]); - userMock.get.mockResolvedValue(userStub.admin); await sut.handleQueueAssetRefresh(mockLibraryJob); @@ -173,7 +184,6 @@ describe(LibraryService.name, () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1); storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']); assetMock.getByLibraryId.mockResolvedValue([]); - userMock.get.mockResolvedValue(userStub.admin); await sut.handleQueueAssetRefresh(mockLibraryJob); @@ -224,7 +234,6 @@ describe(LibraryService.name, () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); storageMock.crawl.mockResolvedValue([]); assetMock.getByLibraryId.mockResolvedValue([]); - userMock.get.mockResolvedValue(userStub.externalPathRoot); await sut.handleQueueAssetRefresh(mockLibraryJob); @@ -240,7 +249,6 @@ describe(LibraryService.name, () => { beforeEach(() => { mockUser = userStub.admin; - userMock.get.mockResolvedValue(mockUser); storageMock.stat.mockResolvedValue({ size: 100, @@ -1167,7 +1175,9 @@ describe(LibraryService.name, () => { it('should handle a new file event', async () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]); - storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] })); + storageMock.watch.mockImplementation( + makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }), + ); await sut.watchAll(); @@ -1188,7 +1198,7 @@ describe(LibraryService.name, () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]); storageMock.watch.mockImplementation( - makeMockWatcher({ items: [{ event: 'change', value: '/foo/photo.jpg' }] }), + makeMockWatcher({ items: [{ event: StorageEventType.CHANGE, value: '/foo/photo.jpg' }] }), ); await sut.watchAll(); @@ -1211,7 +1221,7 @@ describe(LibraryService.name, () => { libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]); assetMock.getByLibraryIdAndOriginalPath.mockResolvedValue(assetStub.external); storageMock.watch.mockImplementation( - makeMockWatcher({ items: [{ event: 'unlink', value: '/foo/photo.jpg' }] }), + makeMockWatcher({ items: [{ event: StorageEventType.UNLINK, value: '/foo/photo.jpg' }] }), ); await sut.watchAll(); @@ -1225,17 +1235,19 @@ describe(LibraryService.name, () => { libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]); storageMock.watch.mockImplementation( makeMockWatcher({ - items: [{ event: 'error', value: 'Error!' }], + items: [{ event: StorageEventType.ERROR, value: 'Error!' }], }), ); - await sut.watchAll(); + await expect(sut.watchAll()).rejects.toThrow('Error!'); }); it('should ignore unknown extensions', async () => { libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]); - storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] })); + storageMock.watch.mockImplementation( + makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }), + ); await sut.watchAll(); @@ -1245,7 +1257,9 @@ describe(LibraryService.name, () => { it('should ignore excluded paths', async () => { libraryMock.get.mockResolvedValue(libraryStub.patternPath); libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]); - storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/dir1/photo.txt' }] })); + storageMock.watch.mockImplementation( + makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/dir1/photo.txt' }] }), + ); await sut.watchAll(); @@ -1255,7 +1269,9 @@ describe(LibraryService.name, () => { it('should ignore excluded paths without case sensitivity', async () => { libraryMock.get.mockResolvedValue(libraryStub.patternPath); libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]); - storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/DIR1/photo.txt' }] })); + storageMock.watch.mockImplementation( + makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/DIR1/photo.txt' }] }), + ); await sut.watchAll(); @@ -1264,7 +1280,7 @@ describe(LibraryService.name, () => { }); }); - describe('tearDown', () => { + describe('teardown', () => { it('should tear down all watchers', async () => { libraryMock.getAll.mockResolvedValue([ libraryStub.externalLibraryWithImportPaths1, @@ -1286,7 +1302,7 @@ describe(LibraryService.name, () => { storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose })); await sut.init(); - await sut.unwatchAll(); + await sut.teardown(); expect(mockClose).toHaveBeenCalledTimes(2); }); diff --git a/server/src/domain/library/library.service.ts b/server/src/domain/library/library.service.ts index 2c509cdaaa..c74e97ea36 100644 --- a/server/src/domain/library/library.service.ts +++ b/server/src/domain/library/library.service.ts @@ -13,14 +13,16 @@ import { handlePromiseError, usePagination, validateCronExpression } from '../do import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job'; import { + DatabaseLock, IAccessRepository, IAssetRepository, ICryptoRepository, + IDatabaseRepository, IJobRepository, ILibraryRepository, IStorageRepository, ISystemConfigRepository, - IUserRepository, + StorageEventType, WithProperty, } from '../repositories'; import { SystemConfigCore } from '../system-config'; @@ -43,6 +45,7 @@ export class LibraryService extends EventEmitter { private access: AccessCore; private configCore: SystemConfigCore; private watchLibraries = false; + private watchLock = false; private watchers: Record Promise> = {}; constructor( @@ -53,7 +56,7 @@ export class LibraryService extends EventEmitter { @Inject(IJobRepository) private jobRepository: IJobRepository, @Inject(ILibraryRepository) private repository: ILibraryRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, - @Inject(IUserRepository) private userRepository: IUserRepository, + @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository, ) { super(); this.access = AccessCore.create(accessRepository); @@ -68,8 +71,15 @@ export class LibraryService extends EventEmitter { async init() { const config = await this.configCore.getConfig(); + const { watch, scan } = config.library; - this.watchLibraries = watch.enabled; + + // This ensures that library watching only occurs in one microservice + // TODO: we could make the lock be per-library instead of global + this.watchLock = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch); + + this.watchLibraries = this.watchLock && watch.enabled; + this.jobRepository.addCronJob( 'libraryScan', scan.cronExpression, @@ -89,6 +99,7 @@ export class LibraryService extends EventEmitter { this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled); if (library.watch.enabled !== this.watchLibraries) { + // Watch configuration changed, update accordingly this.watchLibraries = library.watch.enabled; handlePromiseError(this.watchLibraries ? this.watchAll() : this.unwatchAll(), this.logger); } @@ -134,7 +145,7 @@ export class LibraryService extends EventEmitter { if (matcher(path)) { await this.scanAssets(library.id, [path], library.ownerId, false); } - this.emit('add', path); + this.emit(StorageEventType.ADD, path); }; return handlePromiseError(handler(), this.logger); }, @@ -145,7 +156,7 @@ export class LibraryService extends EventEmitter { // Note: if the changed file was not previously imported, it will be imported now. await this.scanAssets(library.id, [path], library.ownerId, false); } - this.emit('change', path); + this.emit(StorageEventType.CHANGE, path); }; return handlePromiseError(handler(), this.logger); }, @@ -156,13 +167,13 @@ export class LibraryService extends EventEmitter { if (asset && matcher(path)) { await this.assetRepository.save({ id: asset.id, isOffline: true }); } - this.emit('unlink', path); + this.emit(StorageEventType.UNLINK, path); }; return handlePromiseError(handler(), this.logger); }, onError: (error) => { - // TODO: should we log, or throw an exception? this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`); + this.emit(StorageEventType.ERROR, error); }, }, ); @@ -180,13 +191,25 @@ export class LibraryService extends EventEmitter { } } - async unwatchAll() { + async teardown() { + await this.unwatchAll(); + } + + private async unwatchAll() { + if (!this.watchLock) { + return false; + } + for (const id in this.watchers) { await this.unwatch(id); } } async watchAll() { + if (!this.watchLock) { + return false; + } + const libraries = await this.repository.getAll(false, LibraryType.EXTERNAL); for (const library of libraries) { @@ -267,7 +290,7 @@ export class LibraryService extends EventEmitter { this.logger.log(`Creating ${dto.type} library for user ${auth.user.name}`); - if (dto.type === LibraryType.EXTERNAL && this.watchLibraries) { + if (dto.type === LibraryType.EXTERNAL) { await this.watch(library.id); } diff --git a/server/src/domain/repositories/database.repository.ts b/server/src/domain/repositories/database.repository.ts index d32939fe61..55911e7ce5 100644 --- a/server/src/domain/repositories/database.repository.ts +++ b/server/src/domain/repositories/database.repository.ts @@ -19,6 +19,7 @@ export enum DatabaseLock { Migrations = 200, StorageTemplateMigration = 420, CLIPDimSize = 512, + LibraryWatch = 1337, } export const extName: Record = { @@ -46,6 +47,7 @@ export interface IDatabaseRepository { shouldReindex(name: VectorIndex): Promise; runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise; withLock(lock: DatabaseLock, callback: () => Promise): Promise; + tryLock(lock: DatabaseLock): Promise; isBusy(lock: DatabaseLock): boolean; wait(lock: DatabaseLock): Promise; } diff --git a/server/src/domain/repositories/storage.repository.ts b/server/src/domain/repositories/storage.repository.ts index d263713afa..f4f8cab7b9 100644 --- a/server/src/domain/repositories/storage.repository.ts +++ b/server/src/domain/repositories/storage.repository.ts @@ -31,6 +31,14 @@ export interface WatchEvents { onError(error: Error): void; } +export enum StorageEventType { + READY = 'ready', + ADD = 'add', + CHANGE = 'change', + UNLINK = 'unlink', + ERROR = 'error', +} + export interface IStorageRepository { createZipStream(): ImmichZipStream; createReadStream(filepath: string, mimeType?: string | null): Promise; diff --git a/server/src/immich/app.service.ts b/server/src/immich/app.service.ts index be82ae4dc8..f3369b1210 100644 --- a/server/src/immich/app.service.ts +++ b/server/src/immich/app.service.ts @@ -2,7 +2,6 @@ import { AuthService, DatabaseService, JobService, - LibraryService, ONE_HOUR, OpenGraphTags, ServerInfoService, @@ -45,7 +44,6 @@ export class AppService { private authService: AuthService, private configService: SystemConfigService, private jobService: JobService, - private libraryService: LibraryService, private serverService: ServerInfoService, private sharedLinkService: SharedLinkService, private storageService: StorageService, @@ -66,15 +64,10 @@ export class AppService { await this.databaseService.init(); await this.configService.init(); this.storageService.init(); - await this.libraryService.init(); await this.serverService.init(); this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`); } - async teardown() { - await this.libraryService.unwatchAll(); - } - ssr(excludePaths: string[]) { let index = ''; try { diff --git a/server/src/infra/repositories/database.repository.ts b/server/src/infra/repositories/database.repository.ts index b0e4623af5..b24602b899 100644 --- a/server/src/infra/repositories/database.repository.ts +++ b/server/src/infra/repositories/database.repository.ts @@ -210,6 +210,11 @@ export class DatabaseRepository implements IDatabaseRepository { return res as R; } + async tryLock(lock: DatabaseLock): Promise { + const queryRunner = this.dataSource.createQueryRunner(); + return await this.acquireTryLock(lock, queryRunner); + } + isBusy(lock: DatabaseLock): boolean { return this.asyncLock.isBusy(DatabaseLock[lock]); } @@ -222,6 +227,11 @@ export class DatabaseRepository implements IDatabaseRepository { return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]); } + private async acquireTryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise { + const lockResult = await queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]); + return lockResult[0].pg_try_advisory_lock; + } + private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise { return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]); } diff --git a/server/src/infra/repositories/filesystem.provider.ts b/server/src/infra/repositories/filesystem.provider.ts index 3ffcd8111f..fef184992d 100644 --- a/server/src/infra/repositories/filesystem.provider.ts +++ b/server/src/infra/repositories/filesystem.provider.ts @@ -1,11 +1,12 @@ import { CrawlOptionsDto, DiskUsage, + IStorageRepository, ImmichReadStream, ImmichZipStream, - IStorageRepository, - mimeTypes, + StorageEventType, WatchEvents, + mimeTypes, } from '@app/domain'; import { ImmichLogger } from '@app/infra/logger'; import archiver from 'archiver'; @@ -141,10 +142,11 @@ export class FilesystemProvider implements IStorageRepository { watch(paths: string[], options: WatchOptions, events: Partial) { const watcher = chokidar.watch(paths, options); - watcher.on('ready', () => events.onReady?.()); - watcher.on('add', (path) => events.onAdd?.(path)); - watcher.on('change', (path) => events.onChange?.(path)); - watcher.on('unlink', (path) => events.onUnlink?.(path)); + watcher.on(StorageEventType.READY, () => events.onReady?.()); + watcher.on(StorageEventType.ADD, (path) => events.onAdd?.(path)); + watcher.on(StorageEventType.CHANGE, (path) => events.onChange?.(path)); + watcher.on(StorageEventType.UNLINK, (path) => events.onUnlink?.(path)); + watcher.on(StorageEventType.ERROR, (error) => events.onError?.(error)); return () => watcher.close(); } diff --git a/server/src/microservices/app.service.ts b/server/src/microservices/app.service.ts index df1d9938b5..623538e594 100644 --- a/server/src/microservices/app.service.ts +++ b/server/src/microservices/app.service.ts @@ -40,6 +40,7 @@ export class AppService { async init() { await this.databaseService.init(); await this.configService.init(); + await this.libraryService.init(); await this.jobService.init({ [JobName.ASSET_DELETION]: (data) => this.assetService.handleAssetDeletion(data), [JobName.ASSET_DELETION_CHECK]: () => this.assetService.handleAssetDeletionCheck(), @@ -86,6 +87,7 @@ export class AppService { } async teardown() { + await this.libraryService.teardown(); await this.metadataService.teardown(); } } diff --git a/server/src/test-utils/utils.ts b/server/src/test-utils/utils.ts index 7b4faf99b9..cf98222954 100644 --- a/server/src/test-utils/utils.ts +++ b/server/src/test-utils/utils.ts @@ -1,4 +1,4 @@ -import { IJobRepository, IMediaRepository, JobItem, JobItemHandler, QueueName } from '@app/domain'; +import { IJobRepository, IMediaRepository, JobItem, JobItemHandler, QueueName, StorageEventType } from '@app/domain'; import { AppModule } from '@app/immich'; import { InfraModule, InfraTestModule, dataSource } from '@app/infra'; import { MediaRepository } from '@app/infra/repositories'; @@ -48,6 +48,9 @@ export const db = { if (deleteUsers) { await em.query(`DELETE FROM "users" CASCADE;`); } + + // Release all locks + await em.query('SELECT pg_advisory_unlock_all()'); }); }, disconnect: async () => { @@ -124,34 +127,37 @@ export const testApp = { }, reset: async (options?: ResetOptions) => { await db.reset(options); - await app.get(AppService).init(); - - await app.get(MicroAppService).init(); }, get: (member: any) => app.get(member), teardown: async () => { if (app) { await app.get(MicroAppService).teardown(); - await app.get(AppService).teardown(); await app.close(); } await db.disconnect(); }, }; -export function waitForEvent(emitter: EventEmitter, event: string): Promise { - return new Promise((resolve, reject) => { - const success = (value: T) => { - emitter.off('error', fail); - resolve(value); - }; - const fail = (error: Error) => { - emitter.off(event, success); - reject(error); - }; - emitter.once(event, success); - emitter.once('error', fail); - }); +export function waitForEvent(emitter: EventEmitter, event: string, times = 1): Promise { + const promises: Promise[] = []; + + for (let i = 1; i <= times; i++) { + promises.push( + new Promise((resolve, reject) => { + const success = (value: any) => { + emitter.off(StorageEventType.ERROR, fail); + resolve(value); + }; + const fail = (error: Error) => { + emitter.off(event, success); + reject(error); + }; + emitter.once(event, success); + emitter.once(StorageEventType.ERROR, fail); + }), + ); + } + return Promise.all(promises); } const directoryExists = async (dirPath: string) => diff --git a/server/test/repositories/database.repository.mock.ts b/server/test/repositories/database.repository.mock.ts index f5a4d39a67..19e2df17a3 100644 --- a/server/test/repositories/database.repository.mock.ts +++ b/server/test/repositories/database.repository.mock.ts @@ -13,6 +13,7 @@ export const newDatabaseRepositoryMock = (): jest.Mocked => shouldReindex: jest.fn(), runMigrations: jest.fn(), withLock: jest.fn().mockImplementation((_, function_: () => Promise) => function_()), + tryLock: jest.fn(), isBusy: jest.fn(), wait: jest.fn(), }; diff --git a/server/test/repositories/storage.repository.mock.ts b/server/test/repositories/storage.repository.mock.ts index 1ef51fabce..e0b244fc2d 100644 --- a/server/test/repositories/storage.repository.mock.ts +++ b/server/test/repositories/storage.repository.mock.ts @@ -1,4 +1,4 @@ -import { IStorageRepository, StorageCore, WatchEvents } from '@app/domain'; +import { IStorageRepository, StorageCore, StorageEventType, WatchEvents } from '@app/domain'; import { WatchOptions } from 'chokidar'; interface MockWatcherOptions { @@ -12,19 +12,19 @@ export const makeMockWatcher = events.onReady?.(); for (const item of items || []) { switch (item.event) { - case 'add': { + case StorageEventType.ADD: { events.onAdd?.(item.value); break; } - case 'change': { + case StorageEventType.CHANGE: { events.onChange?.(item.value); break; } - case 'unlink': { + case StorageEventType.UNLINK: { events.onUnlink?.(item.value); break; } - case 'error': { + case StorageEventType.ERROR: { events.onError?.(new Error(item.value)); } }