Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ The service can be configured using environment variables or a configuration fil
|--------------|------|-------------|---------|
| EXPORT_JOB_TYPE | string | Export job type | `"Export"` |
| EXPORT_CLEANUP_EXPIRATION_DAYS | number | Days until exported files are cleaned up | `7` |
| EXPORT_GPKGS_PATH | string | Path to store geopackage files | `"/tmp/gpkgs"` |
| EXPORT_DOWNLOAD_PATH | string | Download path for exported files | `"/downloads"` |
| EXPORT_GPKGS_ROOT_DIR | string | Root directory for storing exported geopackage (GPKG) files | `"gpkgs"` |
| TILES_EXPORTING_TASK_TYPE | string | Tiles exporting task type | `"tiles-exporting"` |

# Core Functionality
Expand Down
3 changes: 1 addition & 2 deletions config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@
"__name": "EXPORT_CLEANUP_EXPIRATION_DAYS",
"__format": "number"
},
"gpkgsPath": "EXPORT_GPKGS_PATH",
"downloadPath": "EXPORT_DOWNLOAD_PATH"
"gpkgsRootDir": "EXPORT_GPKGS_ROOT_DIR"
}
},
"tasks": {
Expand Down
3 changes: 1 addition & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@
"export": {
"type": "Export",
"cleanupExpirationDays": 14,
"gpkgsPath": "/gpkgs",
"downloadPath": "downloads"
"gpkgsRootDir": "gpkgs"
}
},
"tasks": {
Expand Down
5 changes: 1 addition & 4 deletions helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
{{- $serviceUrls := (include "common.serviceUrls.merged" .) | fromYaml }}
{{- $storage := (include "common.storage.merged" .) | fromYaml }}
{{- $jobDefinitions := (include "common.jobDefinitions.merged" .) | fromYaml }}
{{- $gpkgPath := (printf "%s/%s" $storage.fs.internalPvc.mountPath $storage.fs.internalPvc.gpkgSubPath) }}


{{- if .Values.enabled -}}
apiVersion: v1
Expand Down Expand Up @@ -56,8 +54,7 @@ data:
INGESTION_SEED_JOB_TYPE : {{ $jobDefinitions.jobs.seed.type | quote }}
EXPORT_JOB_TYPE: {{ $jobDefinitions.jobs.export.type | quote }}
EXPORT_CLEANUP_EXPIRATION_DAYS: {{ $jobDefinitions.jobs.export.cleanupExpirationDays | quote }}
EXPORT_GPKGS_PATH: {{ $gpkgPath | quote }}
EXPORT_DOWNLOAD_PATH: {{ $jobDefinitions.jobs.export.downloadPath | quote }}
EXPORT_GPKGS_ROOT_DIR: {{ $jobDefinitions.jobs.export.gpkgsRootDir | quote }}
TILES_MERGING_TASK_TYPE: {{ $jobDefinitions.tasks.merge.type | quote }}
TILES_MERGING_TILE_BATCH_SIZE: {{ $jobDefinitions.tasks.merge.tileBatchSize | quote }}
TILES_MERGING_TASK_BATCH_SIZE: {{ $jobDefinitions.tasks.merge.taskBatchSize | quote }}
Expand Down
2 changes: 1 addition & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ jobDefinitions:
export:
type: ""
cleanupExpirationDays: 14
downloadPath: ""
gpkgsRootDir: "gpkgs"
Comment thread
vitaligi marked this conversation as resolved.
tasks:
createTasks:
type: ""
Expand Down
47 changes: 23 additions & 24 deletions src/job/models/export/exportJobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {
EXPORT_FAILURE_MESSAGE,
EXPORT_SUCCESS_MESSAGE,
GPKG_CONTENT_TYPE,
GPKGS_PREFIX,
JSON_CONTENT_TYPE,
SERVICES,
StorageProvider,
Expand All @@ -29,6 +28,7 @@ import { TaskMetrics } from '../../../utils/metrics/taskMetrics';
import {
AggregationLayerMetadata,
ExportFinalizeExecutionContext,
ExportFinalizeGpkgPaths,
ExportTask,
ExportTaskParameters,
GpkgArtifactProperties,
Expand All @@ -47,14 +47,15 @@ import { CallbackClient } from '../../../httpClients/callbackClient';
import { JobTrackerClient } from '../../../httpClients/jobTrackerClient';
import { PolygonPartsMangerClient } from '../../../httpClients/polygonPartsMangerClient';
import { convertObjectKeysToSnakeCase } from '../../../utils/db/dbUtils';
import { buildUrl } from '../../../utils/url';

@injectable()
export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJob, ExportInitTask, ExportJob, ExportFinalizeTask> {
private readonly exportTaskType: string;
private readonly gpkgsPath: string;
private readonly gpkgsRootDir: string;
private readonly isS3GpkgProvider: boolean;
private readonly cleanupExpirationDays: number;
private readonly downloadUrl: string;
private readonly downloadServerUrl: string;
public constructor(
@inject(SERVICES.LOGGER) logger: Logger,
@inject(SERVICES.CONFIG) config: IConfig,
Expand All @@ -71,14 +72,12 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
) {
super(logger, config, queueClient, jobTrackerClient);
this.exportTaskType = config.get<string>('jobManagement.export.tasks.tilesExporting.type');
this.gpkgsPath = config.get<string>('jobManagement.export.pollingJobs.export.gpkgsPath');
this.gpkgsRootDir = config.get<string>('jobManagement.export.pollingJobs.export.gpkgsRootDir');
// eslint-disable-next-line @typescript-eslint/naming-convention
const gpkgProvider = config.get<StorageProvider>('gpkgStorageProvider');
this.isS3GpkgProvider = gpkgProvider === StorageProvider.S3;
this.cleanupExpirationDays = config.get<number>('jobManagement.export.pollingJobs.export.cleanupExpirationDays');
const downloadServerUrl = config.get<string>('servicesUrl.downloadServerPublicDNS');
const downloadPath = config.get<string>('jobManagement.export.pollingJobs.export.downloadPath');
this.downloadUrl = `${downloadServerUrl}/${downloadPath}`;
this.downloadServerUrl = config.get<string>('servicesUrl.downloadServerPublicDNS');
}
public async handleJobInit(job: ExportJob, task: ExportInitTask): Promise<void> {
await context.with(trace.setSpan(context.active(), this.tracer.startSpan(`${ExportJobHandler.name}.${this.handleJobInit.name}`)), async () => {
Expand Down Expand Up @@ -150,7 +149,7 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
logger.info({ msg: 'should send callbacks', shouldSendCallbacks, isTerminalStatus, isErrorCallback });
if (shouldSendCallbacks) {
tracingSpan?.addEvent('callbacks.sending.started');
await this.sendCallbacks(validJob, task.id, finalizeParams, paths.gpkgDirPath);
await this.sendCallbacks(validJob, task.id, finalizeParams, paths);
tracingSpan?.addEvent('callbacks.sending.completed');
if (isErrorCallback) {
logger.info({ msg: 'Finalize task type is Error_Callback, Completing after callbacks sends' });
Expand All @@ -172,7 +171,7 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
activeSpan?.setAttributes(monitorAttributes);

const gpkgRelativePath = job.parameters.additionalParams.packageRelativePath;
const gpkgFilePath = path.join(this.gpkgsPath, gpkgRelativePath);
const gpkgFilePath = path.join('/', this.gpkgsRootDir, gpkgRelativePath);
const gpkgDirPath = path.dirname(gpkgFilePath);

return {
Expand All @@ -197,7 +196,7 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
): Promise<ExportFinalizeTaskParams> {
const { job, task, paths, telemetry, logger } = context;
const { taskTracker, tracingSpan: activeSpan } = telemetry;
const { gpkgFilePath, gpkgRelativePath } = paths;
const { gpkgFilePath } = paths;

let fullProcessingParams = taskParams;
let gpkgProcessingComplete = false;
Expand All @@ -220,7 +219,7 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo

if (shouldUploadToS3) {
activeSpan?.addEvent('s3.upload.started');
fullProcessingParams = await this.uploadGpkgToS3(gpkgFilePath, gpkgRelativePath, job.id, task.id, fullProcessingParams);
fullProcessingParams = await this.uploadGpkgToS3(paths, job.id, task.id, fullProcessingParams);
activeSpan?.addEvent('s3.upload.completed', { success: fullProcessingParams.gpkgUploadedToS3 });
}

Expand Down Expand Up @@ -359,21 +358,21 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
}

private async uploadGpkgToS3(
gpkgPath: string,
gpkgRelativePath: string,
paths: ExportFinalizeGpkgPaths,
jobId: string,
taskId: string,
taskParams: ExportFinalizeFullProcessingParams
): Promise<ExportFinalizeFullProcessingParams> {
const gpkgS3Key = `${GPKGS_PREFIX}/${gpkgRelativePath}`;
const { gpkgFilePath } = paths;
Comment thread
vitaligi marked this conversation as resolved.
const gpkgS3Key = path.posix.join(this.gpkgsRootDir, paths.gpkgRelativePath);
Comment thread
vitaligi marked this conversation as resolved.
const jsonFilePath = gpkgFilePath.replace(/\.gpkg$/, '.json'); //TODO: In future, we will remove the json metadata file and support only gpkg
const jsonS3Key = gpkgS3Key.replace(/\.gpkg$/, '.json'); //TODO: In future, we will remove the json metadata file and support only gpkg
const jsonPath = gpkgPath.replace(/\.gpkg$/, '.json'); //TODO: In future, we will remove the json metadata file and support only gpkg

await this.s3Service.uploadFiles([
{ filePath: gpkgPath, s3Key: gpkgS3Key, contentType: GPKG_CONTENT_TYPE },
{ filePath: jsonPath, s3Key: jsonS3Key, contentType: JSON_CONTENT_TYPE },
{ filePath: gpkgFilePath, s3Key: gpkgS3Key, contentType: GPKG_CONTENT_TYPE },
{ filePath: jsonFilePath, s3Key: jsonS3Key, contentType: JSON_CONTENT_TYPE },
]);
await this.fsService.deleteFileAndParentDir(gpkgPath);
await this.fsService.deleteFileAndParentDir(gpkgFilePath);
const updatedParams = await this.markFinalizeStepAsCompleted(jobId, taskId, taskParams, 'gpkgUploadedToS3');
return updatedParams;
Comment thread
vitaligi marked this conversation as resolved.
}
Expand All @@ -390,11 +389,11 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
await this.queueClient.jobManagerClient.updateJob(job.id, { parameters: { ...job.parameters } });
}

private async sendCallbacks(job: ExportJob, taskId: string, taskParams: ExportFinalizeTaskParams, dirPath: string): Promise<void> {
private async sendCallbacks(job: ExportJob, taskId: string, taskParams: ExportFinalizeTaskParams, paths: ExportFinalizeGpkgPaths): Promise<void> {
try {
const cleanupExpirationTimeUTC = createExpirationDate(this.cleanupExpirationDays);
const cleanupDataParams: CleanupData = { cleanupExpirationTimeUTC, directoryPath: dirPath };
const callbackParams = this.createCallbacksParams(job, cleanupExpirationTimeUTC);
const cleanupDataParams: CleanupData = { cleanupExpirationTimeUTC, directoryPath: paths.gpkgDirPath };
const callbackParams = this.createCallbacksParams(job, cleanupExpirationTimeUTC, paths);

await this.queueClient.jobManagerClient.updateJob(job.id, {
parameters: { ...job.parameters, cleanupDataParams, callbackParams },
Expand Down Expand Up @@ -429,9 +428,9 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
}
}

private createCallbacksParams(job: ExportJob, expirationDate: Date): CallbackExportResponse {
private createCallbacksParams(job: ExportJob, expirationDate: Date, paths: ExportFinalizeGpkgPaths): CallbackExportResponse {
const { additionalParams, callbackParams } = job.parameters;
const { packageRelativePath, fileNamesTemplates } = additionalParams;
const { fileNamesTemplates } = additionalParams;
const validCallbackParams = callbackExportResponseSchema.parse(callbackParams);
const { jsonFileData, ...clientsCallbackParams } = validCallbackParams;

Expand All @@ -445,7 +444,7 @@ export class ExportJobHandler extends JobHandler implements IJobHandler<ExportJo
};

if (job.status === OperationStatus.COMPLETED) {
const gpkgDownloadUrl = `${this.downloadUrl}/${GPKGS_PREFIX}/${packageRelativePath}`;
const gpkgDownloadUrl = buildUrl(this.downloadServerUrl, paths.gpkgFilePath);
const jsonDownloadUrl = gpkgDownloadUrl.replace(/\.gpkg$/, '.json'); //TODO: In future, we will remove the json metadata file and support only gpkg

callbackResponse.artifacts = [
Expand Down
7 changes: 7 additions & 0 deletions src/utils/url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import path from 'path';

export const buildUrl = (baseUrl: string, ...pathSegments: string[]): string => {
Comment thread
vitaligi marked this conversation as resolved.
const url = new URL(baseUrl);
Comment thread
vitaligi marked this conversation as resolved.
url.pathname = path.posix.join(url.pathname, ...pathSegments);
Comment thread
vitaligi marked this conversation as resolved.
return url.href;
};
4 changes: 2 additions & 2 deletions tests/configurations/unit/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ module.exports = {
'!**/routes/**',
'!<rootDir>/src/*',
],
modulePathIgnorePatterns: ['<rootDir>/src/utils/metrics/taskMetrics.ts'],
coveragePathIgnorePatterns: ['<rootDir>/src/utils/metrics/taskMetrics.ts'],
modulePathIgnorePatterns: ['<rootDir>/src/utils/metrics/taskMetrics.ts', '<rootDir>/src/utils/url.ts'],
coveragePathIgnorePatterns: ['<rootDir>/src/utils/metrics/taskMetrics.ts', '<rootDir>/src/utils/url.ts'],
coverageDirectory: '<rootDir>/coverage',
reporters: [
'default',
Expand Down
9 changes: 4 additions & 5 deletions tests/unit/job/exportJobHandler/exportJobHandler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ describe('ExportJobHandler', () => {
});

describe('handleJobFinalize', () => {
const gpkgsPath = '/gpkgs';
const gpkgsRootDir = 'gpkgs';
const gpkgRelativePath = 'package.gpkg';
const gpkgFilePath = `${gpkgsPath}/${gpkgRelativePath}`;
const gpkgFilePath = path.join('/', gpkgsRootDir, gpkgRelativePath);
const jsonFilePath = gpkgFilePath.replace('.gpkg', '.json');
const gpkgDirPath = '/path/to/gpkgs';
let joinSpy: jest.SpyInstance;
Expand Down Expand Up @@ -165,7 +165,7 @@ describe('ExportJobHandler', () => {
await exportJobHandler.handleJobFinalize(job, task);

// Verify path methods were called correctly
expect(joinSpy).toHaveBeenCalledWith(gpkgsPath, gpkgRelativePath);
expect(joinSpy).toHaveBeenCalledWith('/', gpkgsRootDir, gpkgRelativePath);
expect(dirnameSpy).toHaveBeenCalledWith(gpkgFilePath);

// Verify metadata processing
Expand Down Expand Up @@ -229,7 +229,6 @@ describe('ExportJobHandler', () => {
describe('when handling S3 upload', () => {
it('should upload GPKG to S3 and delete local file when storage provider is S3', async () => {
setValue('gpkgStorageProvider', 'S3');
setValue('jobManagement.jobs.export.gpkgsPath', '/gpkgs');
const { exportJobHandler, s3ServiceMock, fsServiceMock, jobManagerClientMock } = setupExportJobHandlerTest();

const job = {
Expand Down Expand Up @@ -258,7 +257,7 @@ describe('ExportJobHandler', () => {

await exportJobHandler.handleJobFinalize(job, task);

expect(joinSpy).toHaveBeenCalledWith(gpkgsPath, gpkgRelativePath);
expect(joinSpy).toHaveBeenCalledWith('/', gpkgsRootDir, gpkgRelativePath);

expect(s3ServiceMock.uploadFiles).toHaveBeenCalledWith([
{
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/mocks/configMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const registerDefaultConfig = (): void => {
mapproxyDns: 'http://mapproxy',
polygonPartsManager: 'http://polygon-parts-manager',
geoserverDns: 'http://geoserver',
downloadServerPublicDNS: 'http://download-server',
jobTracker: 'http://job-tracker',
},
geoserver: {
Expand Down Expand Up @@ -169,7 +170,7 @@ const registerDefaultConfig = (): void => {
pollingJobs: {
export: {
type: 'Export',
gpkgsPath: '/gpkgs',
gpkgsRootDir: 'gpkgs',
},
},
tasks: {
Expand Down
Loading