mirror of
https://github.com/immich-app/immich.git
synced 2024-11-15 18:08:48 -07:00
fix(server): auto-reconnect to database (#12320)
This commit is contained in:
parent
1783dfd393
commit
12b65e3c24
@ -18,10 +18,11 @@ import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { AuthGuard } from 'src/middleware/auth.guard';
|
||||
import { ErrorInterceptor } from 'src/middleware/error.interceptor';
|
||||
import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor';
|
||||
import { HttpExceptionFilter } from 'src/middleware/http-exception.filter';
|
||||
import { GlobalExceptionFilter } from 'src/middleware/global-exception.filter';
|
||||
import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
|
||||
import { repositories } from 'src/repositories';
|
||||
import { services } from 'src/services';
|
||||
import { DatabaseService } from 'src/services/database.service';
|
||||
import { setupEventHandlers } from 'src/utils/events';
|
||||
import { otelConfig } from 'src/utils/instrumentation';
|
||||
|
||||
@ -29,7 +30,7 @@ const common = [...services, ...repositories];
|
||||
|
||||
const middleware = [
|
||||
FileUploadInterceptor,
|
||||
{ provide: APP_FILTER, useClass: HttpExceptionFilter },
|
||||
{ provide: APP_FILTER, useClass: GlobalExceptionFilter },
|
||||
{ provide: APP_PIPE, useValue: new ValidationPipe({ transform: true, whitelist: true }) },
|
||||
{ provide: APP_INTERCEPTOR, useClass: LoggingInterceptor },
|
||||
{ provide: APP_INTERCEPTOR, useClass: ErrorInterceptor },
|
||||
@ -43,7 +44,17 @@ const imports = [
|
||||
ConfigModule.forRoot(immichAppConfig),
|
||||
EventEmitterModule.forRoot(),
|
||||
OpenTelemetryModule.forRoot(otelConfig),
|
||||
TypeOrmModule.forRoot(databaseConfig),
|
||||
TypeOrmModule.forRootAsync({
|
||||
inject: [ModuleRef],
|
||||
useFactory: (moduleRef: ModuleRef) => {
|
||||
return {
|
||||
...databaseConfig,
|
||||
poolErrorHandler: (error) => {
|
||||
moduleRef.get(DatabaseService, { strict: false }).handleConnectionError(error);
|
||||
},
|
||||
};
|
||||
},
|
||||
}),
|
||||
TypeOrmModule.forFeature(entities),
|
||||
];
|
||||
|
||||
|
@ -40,6 +40,7 @@ export interface VectorUpdateResult {
|
||||
export const IDatabaseRepository = 'IDatabaseRepository';
|
||||
|
||||
export interface IDatabaseRepository {
|
||||
reconnect(): Promise<boolean>;
|
||||
getExtensionVersion(extension: DatabaseExtension): Promise<ExtensionVersion>;
|
||||
getExtensionVersionRange(extension: VectorExtension): string;
|
||||
getPostgresVersion(): Promise<string>;
|
||||
|
@ -9,6 +9,7 @@ import {
|
||||
} from '@nestjs/common';
|
||||
import { Observable, catchError, throwError } from 'rxjs';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { logGlobalError } from 'src/utils/logger';
|
||||
import { routeToErrorMessage } from 'src/utils/misc';
|
||||
|
||||
@Injectable()
|
||||
@ -25,9 +26,10 @@ export class ErrorInterceptor implements NestInterceptor {
|
||||
return error;
|
||||
}
|
||||
|
||||
const errorMessage = routeToErrorMessage(context.getHandler().name);
|
||||
this.logger.error(errorMessage, error, error?.errors, error?.stack);
|
||||
return new InternalServerErrorException(errorMessage);
|
||||
logGlobalError(this.logger, error);
|
||||
|
||||
const message = routeToErrorMessage(context.getHandler().name);
|
||||
return new InternalServerErrorException(message);
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
47
server/src/middleware/global-exception.filter.ts
Normal file
47
server/src/middleware/global-exception.filter.ts
Normal file
@ -0,0 +1,47 @@
|
||||
import { ArgumentsHost, Catch, ExceptionFilter, HttpException, Inject } from '@nestjs/common';
|
||||
import { Response } from 'express';
|
||||
import { ClsService } from 'nestjs-cls';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { logGlobalError } from 'src/utils/logger';
|
||||
|
||||
@Catch()
|
||||
export class GlobalExceptionFilter implements ExceptionFilter<Error> {
|
||||
constructor(
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
private cls: ClsService,
|
||||
) {
|
||||
this.logger.setContext(GlobalExceptionFilter.name);
|
||||
}
|
||||
|
||||
catch(error: Error, host: ArgumentsHost) {
|
||||
const ctx = host.switchToHttp();
|
||||
const response = ctx.getResponse<Response>();
|
||||
const { status, body } = this.fromError(error);
|
||||
if (!response.headersSent) {
|
||||
response.status(status).json({ ...body, statusCode: status, correlationId: this.cls.getId() });
|
||||
}
|
||||
}
|
||||
|
||||
private fromError(error: Error) {
|
||||
logGlobalError(this.logger, error);
|
||||
|
||||
if (error instanceof HttpException) {
|
||||
const status = error.getStatus();
|
||||
let body = error.getResponse();
|
||||
|
||||
// unclear what circumstances would return a string
|
||||
if (typeof body === 'string') {
|
||||
body = { message: body };
|
||||
}
|
||||
|
||||
return { status, body };
|
||||
}
|
||||
|
||||
return {
|
||||
status: 500,
|
||||
body: {
|
||||
message: 'Internal server error',
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
@ -1,39 +0,0 @@
|
||||
import { ArgumentsHost, Catch, ExceptionFilter, HttpException, Inject } from '@nestjs/common';
|
||||
import { Response } from 'express';
|
||||
import { ClsService } from 'nestjs-cls';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
|
||||
@Catch(HttpException)
|
||||
export class HttpExceptionFilter implements ExceptionFilter {
|
||||
constructor(
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
private cls: ClsService,
|
||||
) {
|
||||
this.logger.setContext(HttpExceptionFilter.name);
|
||||
}
|
||||
|
||||
catch(exception: HttpException, host: ArgumentsHost) {
|
||||
const ctx = host.switchToHttp();
|
||||
const response = ctx.getResponse<Response>();
|
||||
const status = exception.getStatus();
|
||||
|
||||
this.logger.debug(`HttpException(${status}) ${JSON.stringify(exception.getResponse())}`);
|
||||
|
||||
let responseBody = exception.getResponse();
|
||||
// unclear what circumstances would return a string
|
||||
if (typeof responseBody === 'string') {
|
||||
responseBody = {
|
||||
error: 'Unknown',
|
||||
message: responseBody,
|
||||
statusCode: status,
|
||||
};
|
||||
}
|
||||
|
||||
if (!response.headersSent) {
|
||||
response.status(status).json({
|
||||
...responseBody,
|
||||
correlationId: this.cls.getId(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
@ -31,6 +31,19 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
this.logger.setContext(DatabaseRepository.name);
|
||||
}
|
||||
|
||||
async reconnect() {
|
||||
try {
|
||||
if (this.dataSource.isInitialized) {
|
||||
await this.dataSource.destroy();
|
||||
}
|
||||
const { isInitialized } = await this.dataSource.initialize();
|
||||
return isInitialized;
|
||||
} catch (error) {
|
||||
this.logger.error(`Database connection failed: ${error}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async getExtensionVersion(extension: DatabaseExtension): Promise<ExtensionVersion> {
|
||||
const [res]: ExtensionVersion[] = await this.dataSource.query(
|
||||
`SELECT default_version as "availableVersion", installed_version as "installedVersion"
|
||||
|
@ -3,7 +3,7 @@ import { isLogLevelEnabled } from '@nestjs/common/services/utils/is-log-level-en
|
||||
import { ClsService } from 'nestjs-cls';
|
||||
import { LogLevel } from 'src/config';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { LogColor } from 'src/utils/logger-colors';
|
||||
import { LogColor } from 'src/utils/logger';
|
||||
|
||||
const LOG_LEVELS = [LogLevel.VERBOSE, LogLevel.DEBUG, LogLevel.LOG, LogLevel.WARN, LogLevel.ERROR, LogLevel.FATAL];
|
||||
|
||||
|
@ -1,4 +1,5 @@
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { Duration } from 'luxon';
|
||||
import semver from 'semver';
|
||||
import { getVectorExtension } from 'src/database.config';
|
||||
import { OnEmit } from 'src/decorators';
|
||||
@ -59,8 +60,12 @@ const messages = {
|
||||
If ${name} ${installedVersion} is compatible with Immich, please ensure the Postgres instance has this available.`,
|
||||
};
|
||||
|
||||
const RETRY_DURATION = Duration.fromObject({ seconds: 5 });
|
||||
|
||||
@Injectable()
|
||||
export class DatabaseService {
|
||||
private reconnection?: NodeJS.Timeout;
|
||||
|
||||
constructor(
|
||||
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
@ -117,6 +122,26 @@ export class DatabaseService {
|
||||
});
|
||||
}
|
||||
|
||||
handleConnectionError(error: Error) {
|
||||
if (this.reconnection) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.error(`Database disconnected: ${error}`);
|
||||
this.reconnection = setInterval(() => void this.reconnect(), RETRY_DURATION.toMillis());
|
||||
}
|
||||
|
||||
private async reconnect() {
|
||||
const isConnected = await this.databaseRepository.reconnect();
|
||||
if (isConnected) {
|
||||
this.logger.log('Database reconnected');
|
||||
clearInterval(this.reconnection);
|
||||
delete this.reconnection;
|
||||
} else {
|
||||
this.logger.warn(`Database connection failed, retrying in ${RETRY_DURATION.toHuman()}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async createExtension(extension: DatabaseExtension) {
|
||||
try {
|
||||
await this.databaseRepository.createExtension(extension);
|
||||
|
@ -1,3 +1,7 @@
|
||||
import { HttpException } from '@nestjs/common';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { TypeORMError } from 'typeorm';
|
||||
|
||||
type ColorTextFn = (text: string) => string;
|
||||
|
||||
const isColorAllowed = () => !process.env.NO_COLOR;
|
||||
@ -15,3 +19,22 @@ export const LogColor = {
|
||||
export const LogStyle = {
|
||||
bold: colorIfAllowed((text: string) => `\u001B[1m${text}\u001B[0m`),
|
||||
};
|
||||
|
||||
export const logGlobalError = (logger: ILoggerRepository, error: Error) => {
|
||||
if (error instanceof HttpException) {
|
||||
const status = error.getStatus();
|
||||
const response = error.getResponse();
|
||||
logger.debug(`HttpException(${status}): ${JSON.stringify(response)}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (error instanceof TypeORMError) {
|
||||
logger.error(`Database error: ${error}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (error instanceof Error) {
|
||||
logger.error(`Unknown error: ${error}`);
|
||||
return;
|
||||
}
|
||||
};
|
@ -3,6 +3,7 @@ import { Mocked, vitest } from 'vitest';
|
||||
|
||||
export const newDatabaseRepositoryMock = (): Mocked<IDatabaseRepository> => {
|
||||
return {
|
||||
reconnect: vitest.fn(),
|
||||
getExtensionVersion: vitest.fn(),
|
||||
getExtensionVersionRange: vitest.fn(),
|
||||
getPostgresVersion: vitest.fn().mockResolvedValue('14.10 (Debian 14.10-1.pgdg120+1)'),
|
||||
|
Loading…
Reference in New Issue
Block a user