Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ async main() {

Assuming `test.png` is 1,800,000 bytes this will upload the first 1,000,000 bytes to `http://my.server.com/test.png.1` and the next 800,000 bytes to `http://my.server.com/test.png.2`.

## Debugging
To enable debug output when using `node-httptransfer` library, set the `DEBUG` environment variable to `httptransfer:*`.
You can also specify a specific loglevel per [./lib/logger.js](./lib/logger.js), e.g.:

```bash
$ DEBUG='httptransfer:warn' npm run test
```

## Testbed

A CLI tool [testbed](./testbed/index.js) is provided to try out the `node-httptransfer` functionality. It supports uploading, downloading, and transferring file content. It also supports Azure Blob stores through Shared Access Signature (SAS) urls created on the fly.
Expand Down
3 changes: 2 additions & 1 deletion lib/aem/createassetservletrequestgenerator.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
const FormData = require("form-data");
const { HttpRequestGenerator } = require("../asset/httprequestgenerator");
const { HTTP } = require("../constants");
const logger = require("../logger");

const FILE_OFFSET = "file@Offset";
const CHUNK_LENGTH = "chunk@Length";
Expand All @@ -38,7 +39,7 @@ class CreateAssetServletRequestGenerator extends HttpRequestGenerator {
if (this.isPartChunked(transferPart.totalSize, contentRange)) {
// this is a chunk of the file - add more information to form
const { low, length } = contentRange || {};
console.log(`Create asset servlet upload process adding form elements file@Offset=${low}, chunk@Length=${length}, file@Length=${transferPart.totalSize}`);
logger.info(`Create asset servlet upload process adding form elements file@Offset=${low}, chunk@Length=${length}, file@Length=${transferPart.totalSize}`);
form.append(FILE_OFFSET, low);
form.append(CHUNK_LENGTH, length);
form.append(FILE_LENGTH, transferPart.totalSize);
Expand Down
25 changes: 13 additions & 12 deletions lib/block/blockdownload.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const { RandomFileAccess } = require('../randomfileaccess');
const { FilterFailedAssets } = require('../functions/filterfailedassets');
const { GetAssetMetadata } = require('../functions/getassetmetadata');
const { BlockRequestGenerator } = require('../asset/blockrequestgenerator');
const logger = require("../logger");

/**
* Generate AEM download transfer assets
Expand All @@ -40,7 +41,7 @@ const { BlockRequestGenerator } = require('../asset/blockrequestgenerator');
* @yields {TransferAsset} Transfer asset
*/
async function* generateBlockDownloadTransfer(options) {
console.log(`Generating block download transfer parts`);
logger.info(`Generating block download transfer parts`);
const expectedLength = options.downloadFiles && (options.downloadFiles.length || options.downloadFiles.size);

let assetCounter = 0;
Expand All @@ -58,12 +59,12 @@ async function* generateBlockDownloadTransfer(options) {
});

assetCounter++;
console.log(`Generated download transfer asset ${assetCounter} of ${expectedLength}`);
logger.info(`Generated download transfer asset ${assetCounter} of ${expectedLength}`);

yield transferAsset;
}

console.log(`Generated ${assetCounter} download transfer assets (files to download: ${expectedLength})`);
logger.info(`Generated ${assetCounter} download transfer assets (files to download: ${expectedLength})`);
}

/**
Expand Down Expand Up @@ -110,7 +111,7 @@ class BlockDownload extends EventEmitter {
this.errorEvents = [];

controller.on(TransferEvents.CREATE_TRANSFER_PARTS, transferEvent => {
console.log("Block download: block download controller starting file download");
logger.info("Block download: block download controller starting file download");
this.emit("filestart", transferEvent.transferAsset.eventData);
});

Expand All @@ -122,12 +123,12 @@ class BlockDownload extends EventEmitter {
});

controller.on(TransferEvents.AFTER_JOIN_TRANSFER_PARTS, transferEvent => {
console.log("Block download: block download controller finishing file download");
logger.info("Block download: block download controller finishing file download");
this.emit("fileend", transferEvent.transferAsset.eventData);
});

controller.on(TransferEvents.ERROR, transferEvent => {
console.log(`Error during block download: ${transferEvent.error}`);
logger.info(`Error during block download: ${transferEvent.error}`);
this.errorEvents.push(transferEvent);

if (transferEvent.props.firstError) {
Expand Down Expand Up @@ -162,7 +163,7 @@ class BlockDownload extends EventEmitter {
controller.removeAllListeners(TransferEvents.AFTER_JOIN_TRANSFER_PARTS);
controller.removeAllListeners(TransferEvents.ERROR);
} catch (err) {
console.log(`Failed to remove event listeners from block download controller: ${err}`);
logger.info(`Failed to remove event listeners from block download controller: ${err}`);
}
}

Expand Down Expand Up @@ -190,13 +191,13 @@ class BlockDownload extends EventEmitter {
);
pipeline.setFilterFunction(new FilterFailedAssets());

console.log("Block download: executing block download pipeline");
logger.info("Block download: executing block download pipeline");
await executePipeline(pipeline, generateBlockDownloadTransfer(options), controller);
console.log("Block download: finished executing block download pipeline");
logger.info("Block download: finished executing block download pipeline");

if (this.errorEvents && this.errorEvents.length > 0) {
// throw the first emitted error during transfer
console.log(`Errors encountered during block download (${this.errorEvents.length} total error(s))`);
logger.info(`Errors encountered during block download (${this.errorEvents.length} total error(s))`);
throw this.errorEvents[0].error;
}
} finally {
Expand All @@ -206,11 +207,11 @@ class BlockDownload extends EventEmitter {

if (randomFileAccess) {
await randomFileAccess.close();
console.log("Block download: closed random file accessor");
logger.info("Block download: closed random file accessor");
}
if (controller) {
await controller.cleanupFailedTransfers();
console.log("Block download: cleaned up failed transfers");
logger.info("Block download: cleaned up failed transfers");
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions lib/block/blockupload.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const { IllegalArgumentError } = require("../error");
const { getFileStats } = require('../util');
const { AssetMultipart } = require("../asset/assetmultipart");
const { BlockRequestGenerator } = require("../asset/blockrequestgenerator");
const logger = require("../logger");

const DEFAULT_MAX_CONCURRENCY = 8;
// Default part size is 10mb
Expand All @@ -49,7 +50,7 @@ const DEFAULT_PART_SIZE = 10 * 1024 * 1024;
* @yields {TransferAsset} Transfer asset
*/
async function* generateBlockUploadTransfer(options) {
console.log(`Generating block upload transfer parts`);
logger.info(`Generating block upload transfer parts`);
const expectedLength = options.uploadFiles && (options.uploadFiles.length || options.uploadFiles.size);

let assetCounter = 0;
Expand All @@ -67,10 +68,10 @@ async function* generateBlockUploadTransfer(options) {
if (typeof uploadFile.fileUrl === "object"
&& Array.isArray(uploadFile.fileUrl)
&& uploadFile.fileUrl.length > 0) {
console.log("Multiple uploads to run");
logger.info("Multiple uploads to run");
assetTarget = uploadFile.fileUrl[0];
} else {
console.log("Single upload to run");
logger.info("Single upload to run");
assetTarget = uploadFile.fileUrl;
}
const targetUrl = new URL(assetTarget);
Expand All @@ -79,13 +80,13 @@ async function* generateBlockUploadTransfer(options) {
const target = new Asset(targetUrl, options.headers, uploadFile.multipartHeaders);

if (!uploadFile.fileSize) {
console.log("Getting transfer asset size from file to upload");
logger.info("Getting transfer asset size from file to upload");
const { size } = await getFileStats(uploadFile.filePath);
uploadFile.fileSize = size;
}

const contentType = options.headers && options.headers['content-type'];
console.log(`Transfer asset to upload is of content type ${contentType} and size ${uploadFile.fileSize} bytes`);
logger.info(`Transfer asset to upload is of content type ${contentType} and size ${uploadFile.fileSize} bytes`);
const transferAsset = new TransferAsset(source, target, {
acceptRanges: true,
metadata: new AssetMetadata(uploadFile.filePath, contentType, uploadFile.fileSize)
Expand All @@ -96,7 +97,7 @@ async function* generateBlockUploadTransfer(options) {
const minPartSize = uploadFile.minPartSize || Math.min(10, maxPartSize); // maxPartSize must be defined

if (typeof uploadURIs === "object" && Array.isArray(uploadURIs) && uploadURIs.length > 0) {
console.log(`Upload target is multipart ( ${uploadURIs.length} parts), min part size: ${minPartSize}, max part size: ${maxPartSize}`);
logger.info(`Upload target is multipart ( ${uploadURIs.length} parts), min part size: ${minPartSize}, max part size: ${maxPartSize}`);
transferAsset.multipartTarget = new AssetMultipart(
uploadURIs,
minPartSize,
Expand All @@ -106,12 +107,12 @@ async function* generateBlockUploadTransfer(options) {
}

assetCounter++;
console.log(`Generated upload transfer asset ${assetCounter} of ${expectedLength}`);
logger.info(`Generated upload transfer asset ${assetCounter} of ${expectedLength}`);

yield transferAsset;
}

console.log(`Generated ${assetCounter} upload transfer assets (files to upload: ${expectedLength})`);
logger.info(`Generated ${assetCounter} upload transfer assets (files to upload: ${expectedLength})`);
}

class BlockUpload extends EventEmitter {
Expand Down Expand Up @@ -155,7 +156,7 @@ class BlockUpload extends EventEmitter {
this.errorEvents = [];

controller.on(TransferEvents.TRANSFER, transferEvent => {
console.log("Block upload: block upload controller starting part upload");
logger.info("Block upload: block upload controller starting part upload");
this.emit("transferPart", transferEvent.transferAsset.eventData);
});

Expand All @@ -167,12 +168,12 @@ class BlockUpload extends EventEmitter {
});

controller.on(TransferEvents.AFTER_TRANSFER, transferEvent => {
console.log("Block upload: block upload controller finishing part upload");
logger.info("Block upload: block upload controller finishing part upload");
this.emit("aftertransfer", transferEvent.transferAsset.eventData);
});

controller.on(TransferEvents.ERROR, transferEvent => {
console.log(`Error during block upload: ${transferEvent.error}`);
logger.info(`Error during block upload: ${transferEvent.error}`);
if (transferEvent.props.firstError) {
this.errorEvents.push(transferEvent);
this.emit("fileerror", {
Expand Down Expand Up @@ -207,7 +208,7 @@ class BlockUpload extends EventEmitter {
controller.removeAllListeners(TransferEvents.AFTER_JOIN_TRANSFER_PARTS);
controller.removeAllListeners(TransferEvents.ERROR);
} catch (err) {
console.log(`Failed to remove event listeners from block upload controller: ${err}`);
logger.info(`Failed to remove event listeners from block upload controller: ${err}`);
}
}

Expand All @@ -234,14 +235,14 @@ class BlockUpload extends EventEmitter {
);
pipeline.setFilterFunction(new FilterFailedAssets);

console.log("Block upload: executing block upload pipeline");
logger.info("Block upload: executing block upload pipeline");
await executePipeline(pipeline, generateBlockUploadTransfer(options), controller);
console.log("Block upload: finished executing block upload pipeline");
logger.info("Block upload: finished executing block upload pipeline");

if (this.errorEvents && this.errorEvents.length > 0) {
// delete file (not needed - as AEM won't commit the blob in case of error)
// throw the first emitted error
console.log(`Errors encountered during block upload (${this.errorEvents.length} total error(s))`);
logger.info(`Errors encountered during block upload (${this.errorEvents.length} total error(s))`);
throw this.errorEvents[0].error;
}
} finally {
Expand All @@ -251,7 +252,7 @@ class BlockUpload extends EventEmitter {

if (randomFileAccess) {
await randomFileAccess.close();
console.log("Block upload: closed random file accessor");
logger.info("Block upload: closed random file accessor");
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions lib/functions/getassetmetadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ class GetAssetMetadata extends AsyncGeneratorFunction {
&& transferAsset.metadata.filename
&& transferAsset.metadata.contentType
&& transferAsset.metadata.contentLength) {
console.log(`Transfer asset has all needed metadata to proceed (content-type: ${transferAsset.metadata.contentType}, content length: ${transferAsset.metadata.contentLength})`);
logger.info(`Transfer asset has all needed metadata to proceed (content-type: ${transferAsset.metadata.contentType}, content length: ${transferAsset.metadata.contentLength})`);
} else {
console.log("Transfer asset needs to acquire additional metadata. Executing metadata request");
logger.info("Transfer asset needs to acquire additional metadata. Executing metadata request");
await retry(async (options) => {
// S3 doesn't support HEAD requests against presigned URLs
// TODO: 0-byte file support for S3 which results in a 416 error
Expand All @@ -92,7 +92,7 @@ class GetAssetMetadata extends AsyncGeneratorFunction {
requestOptions: options && options.requestOptions
});
transferAsset.acceptRanges = headers.get("accept-ranges") === "bytes";
console.log(`Server accepts ranges: ${transferAsset.acceptRanges} (accept-ranges header set to bytes)`);
logger.info(`Server accepts ranges: ${transferAsset.acceptRanges} (accept-ranges header set to bytes)`);
transferAsset.metadata = new AssetMetadata(
getFilename(headers),
getContentType(headers),
Expand Down
18 changes: 9 additions & 9 deletions lib/functions/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,23 @@ class Transfer extends AsyncGeneratorFunction {
throw Error(`'contentRange.length' too large, not supported yet: ${transferPart.source.url}`);
}

console.log(`Transferred content range: ${contentRange}`);
logger.info(`Transferred content range: ${contentRange}`);
if (isFileProtocol(transferPart.source.url) && targetUrl) {
console.log("Source has protocol 'file'");
logger.info("Source has protocol 'file'");

const buf = await this.randomFileAccess.read(transferPart.source.url, contentRange);

if (contentRange) {
if (buf && buf.length) {
console.log(`Read file ${transferPart.source.url} for content range low ${contentRange.low}, to high ${contentRange.high}, read ${buf.length} bytes`);
logger.info(`Read file ${transferPart.source.url} for content range low ${contentRange.low}, to high ${contentRange.high}, read ${buf.length} bytes`);
} else {
console.log(`Nothing read from file for content range low ${contentRange.low}, to high ${contentRange.high}`);
logger.info(`Nothing read from file for content range low ${contentRange.low}, to high ${contentRange.high}`);
}
} else {
if (buf && buf.length) {
console.log(`Read file ${transferPart.source.url} (no content range), read ${buf.length} bytes`);
logger.info(`Read file ${transferPart.source.url} (no content range), read ${buf.length} bytes`);
} else {
console.log(`Nothing read from file (no content range)`);
logger.info(`Nothing read from file (no content range)`);
}
}

Expand Down Expand Up @@ -132,7 +132,7 @@ class Transfer extends AsyncGeneratorFunction {
await issuePut(targetUrl, requestOptions);
}, this.options);
} else if (transferPart.source.blob && targetUrl) {
console.log(`Source is blob, transferring ranges low ${contentRange.low}, to high ${contentRange.high}`);
logger.info(`Source is blob, transferring ranges low ${contentRange.low}, to high ${contentRange.high}`);
const blob = transferPart.source.blob.slice(contentRange.low, contentRange.high + 1);
await retry(async () => {
const body = this.requestGenerator.createPartHttpBody({
Expand Down Expand Up @@ -160,7 +160,7 @@ class Transfer extends AsyncGeneratorFunction {
await issuePut(targetUrl, requestOptions);
}, this.options);
} else if (targetUrl && isFileProtocol(targetUrl) && transferPart.source.url) {
console.log("Target has protocol 'file'");
logger.info("Target has protocol 'file'");

await retry(async () => {
const totalSize = transferPart.metadata.contentLength;
Expand All @@ -185,7 +185,7 @@ class Transfer extends AsyncGeneratorFunction {
throw Error(`Server does not seem to have respected Range header. Expected range ${contentRange.low}-${contentRange.high}, content length is ${contentLength}B`);
}

console.log("Converting stream to buffer to get response data");
logger.info("Converting stream to buffer to get response data");
const buffer = await streamToBuffer(HTTP.METHOD.GET, transferPart.source.url, response.status, response.body, contentLength);
await this.randomFileAccess.write(targetUrl, contentRange, buffer, totalSize);
}, this.options);
Expand Down
Loading
Loading