You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
7.3 KiB
241 lines
7.3 KiB
4 weeks ago
|
/* eslint-disable no-async-promise-executor */
|
||
|
|
||
|
const debug = require('debug')('ali-oss:multipart-copy');
|
||
|
const copy = require('copy-to');
|
||
|
|
||
|
const proto = exports;
|
||
|
|
||
|
/**
|
||
|
* Upload a part copy in a multipart from the source bucket/object
|
||
|
* used with initMultipartUpload and completeMultipartUpload.
|
||
|
* @param {String} name copy object name
|
||
|
* @param {String} uploadId the upload id
|
||
|
* @param {Number} partNo the part number
|
||
|
* @param {String} range like 0-102400 part size need to copy
|
||
|
* @param {Object} sourceData
|
||
|
* {String} sourceData.sourceKey the source object name
|
||
|
* {String} sourceData.sourceBucketName the source bucket name
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
/* eslint max-len: [0] */
|
||
|
proto.uploadPartCopy = async function uploadPartCopy(name, uploadId, partNo, range, sourceData, options = {}) {
|
||
|
options.headers = options.headers || {};
|
||
|
const versionId = options.versionId || (options.subres && options.subres.versionId) || null;
|
||
|
let copySource;
|
||
|
if (versionId) {
|
||
|
copySource = `/${sourceData.sourceBucketName}/${encodeURIComponent(sourceData.sourceKey)}?versionId=${versionId}`;
|
||
|
} else {
|
||
|
copySource = `/${sourceData.sourceBucketName}/${encodeURIComponent(sourceData.sourceKey)}`;
|
||
|
}
|
||
|
|
||
|
options.headers['x-oss-copy-source'] = copySource;
|
||
|
if (range) {
|
||
|
options.headers['x-oss-copy-source-range'] = `bytes=${range}`;
|
||
|
}
|
||
|
|
||
|
options.subres = {
|
||
|
partNumber: partNo,
|
||
|
uploadId
|
||
|
};
|
||
|
const params = this._objectRequestParams('PUT', name, options);
|
||
|
params.mime = options.mime;
|
||
|
params.successStatuses = [200];
|
||
|
|
||
|
const result = await this.request(params);
|
||
|
|
||
|
return {
|
||
|
name,
|
||
|
etag: result.res.headers.etag,
|
||
|
res: result.res
|
||
|
};
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* @param {String} name copy object name
|
||
|
* @param {Object} sourceData
|
||
|
* {String} sourceData.sourceKey the source object name
|
||
|
* {String} sourceData.sourceBucketName the source bucket name
|
||
|
* {Number} sourceData.startOffset data copy start byte offset, e.g: 0
|
||
|
* {Number} sourceData.endOffset data copy end byte offset, e.g: 102400
|
||
|
* @param {Object} options
|
||
|
* {Number} options.partSize
|
||
|
*/
|
||
|
proto.multipartUploadCopy = async function multipartUploadCopy(name, sourceData, options = {}) {
|
||
|
this.resetCancelFlag();
|
||
|
const { versionId = null } = options;
|
||
|
const metaOpt = {
|
||
|
versionId
|
||
|
};
|
||
|
const objectMeta = await this._getObjectMeta(sourceData.sourceBucketName, sourceData.sourceKey, metaOpt);
|
||
|
const fileSize = objectMeta.res.headers['content-length'];
|
||
|
sourceData.startOffset = sourceData.startOffset || 0;
|
||
|
sourceData.endOffset = sourceData.endOffset || fileSize;
|
||
|
|
||
|
if (options.checkpoint && options.checkpoint.uploadId) {
|
||
|
return await this._resumeMultipartCopy(options.checkpoint, sourceData, options);
|
||
|
}
|
||
|
|
||
|
const minPartSize = 100 * 1024;
|
||
|
|
||
|
const copySize = sourceData.endOffset - sourceData.startOffset;
|
||
|
if (copySize < minPartSize) {
|
||
|
throw new Error(`copySize must not be smaller than ${minPartSize}`);
|
||
|
}
|
||
|
|
||
|
if (options.partSize && options.partSize < minPartSize) {
|
||
|
throw new Error(`partSize must not be smaller than ${minPartSize}`);
|
||
|
}
|
||
|
|
||
|
const init = await this.initMultipartUpload(name, options);
|
||
|
const { uploadId } = init;
|
||
|
const partSize = this._getPartSize(copySize, options.partSize);
|
||
|
|
||
|
const checkpoint = {
|
||
|
name,
|
||
|
copySize,
|
||
|
partSize,
|
||
|
uploadId,
|
||
|
doneParts: []
|
||
|
};
|
||
|
|
||
|
if (options && options.progress) {
|
||
|
await options.progress(0, checkpoint, init.res);
|
||
|
}
|
||
|
|
||
|
return await this._resumeMultipartCopy(checkpoint, sourceData, options);
|
||
|
};
|
||
|
|
||
|
/*
|
||
|
* Resume multipart copy from checkpoint. The checkpoint will be
|
||
|
* updated after each successful part copy.
|
||
|
* @param {Object} checkpoint the checkpoint
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto._resumeMultipartCopy = async function _resumeMultipartCopy(checkpoint, sourceData, options) {
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
const { versionId = null } = options;
|
||
|
const metaOpt = {
|
||
|
versionId
|
||
|
};
|
||
|
const { copySize, partSize, uploadId, doneParts, name } = checkpoint;
|
||
|
|
||
|
const partOffs = this._divideMultipartCopyParts(copySize, partSize, sourceData.startOffset);
|
||
|
const numParts = partOffs.length;
|
||
|
|
||
|
const uploadPartCopyOptions = {
|
||
|
headers: {}
|
||
|
};
|
||
|
|
||
|
if (options.copyheaders) {
|
||
|
copy(options.copyheaders).to(uploadPartCopyOptions.headers);
|
||
|
}
|
||
|
if (versionId) {
|
||
|
copy(metaOpt).to(uploadPartCopyOptions);
|
||
|
}
|
||
|
|
||
|
const uploadPartJob = function uploadPartJob(self, partNo, source) {
|
||
|
return new Promise(async (resolve, reject) => {
|
||
|
try {
|
||
|
if (!self.isCancel()) {
|
||
|
const pi = partOffs[partNo - 1];
|
||
|
const range = `${pi.start}-${pi.end - 1}`;
|
||
|
|
||
|
let result;
|
||
|
try {
|
||
|
result = await self.uploadPartCopy(name, uploadId, partNo, range, source, uploadPartCopyOptions);
|
||
|
} catch (error) {
|
||
|
if (error.status === 404) {
|
||
|
throw self._makeAbortEvent();
|
||
|
}
|
||
|
throw error;
|
||
|
}
|
||
|
if (!self.isCancel()) {
|
||
|
debug(`content-range ${result.res.headers['content-range']}`);
|
||
|
doneParts.push({
|
||
|
number: partNo,
|
||
|
etag: result.res.headers.etag
|
||
|
});
|
||
|
checkpoint.doneParts = doneParts;
|
||
|
|
||
|
if (options && options.progress) {
|
||
|
await options.progress(doneParts.length / numParts, checkpoint, result.res);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
resolve();
|
||
|
} catch (err) {
|
||
|
err.partNum = partNo;
|
||
|
reject(err);
|
||
|
}
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const all = Array.from(new Array(numParts), (x, i) => i + 1);
|
||
|
const done = doneParts.map(p => p.number);
|
||
|
const todo = all.filter(p => done.indexOf(p) < 0);
|
||
|
const defaultParallel = 5;
|
||
|
const parallel = options.parallel || defaultParallel;
|
||
|
|
||
|
if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
|
||
|
for (let i = 0; i < todo.length; i++) {
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
/* eslint no-await-in-loop: [0] */
|
||
|
await uploadPartJob(this, todo[i], sourceData);
|
||
|
}
|
||
|
} else {
|
||
|
// upload in parallel
|
||
|
const errors = await this._parallelNode(todo, parallel, uploadPartJob, sourceData);
|
||
|
|
||
|
const abortEvent = errors.find(err => err.name === 'abort');
|
||
|
if (abortEvent) throw abortEvent;
|
||
|
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
|
||
|
// check errors after all jobs are completed
|
||
|
if (errors && errors.length > 0) {
|
||
|
const err = errors[0];
|
||
|
err.message = `Failed to copy some parts with error: ${err.toString()} part_num: ${err.partNum}`;
|
||
|
throw err;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return await this.completeMultipartUpload(name, uploadId, doneParts, options);
|
||
|
};
|
||
|
|
||
|
proto._divideMultipartCopyParts = function _divideMultipartCopyParts(fileSize, partSize, startOffset) {
|
||
|
const numParts = Math.ceil(fileSize / partSize);
|
||
|
|
||
|
const partOffs = [];
|
||
|
for (let i = 0; i < numParts; i++) {
|
||
|
const start = partSize * i + startOffset;
|
||
|
const end = Math.min(start + partSize, fileSize + startOffset);
|
||
|
|
||
|
partOffs.push({
|
||
|
start,
|
||
|
end
|
||
|
});
|
||
|
}
|
||
|
|
||
|
return partOffs;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Get Object Meta
|
||
|
* @param {String} bucket bucket name
|
||
|
* @param {String} name object name
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto._getObjectMeta = async function _getObjectMeta(bucket, name, options) {
|
||
|
const currentBucket = this.getBucket();
|
||
|
this.setBucket(bucket);
|
||
|
const data = await this.head(name, options);
|
||
|
this.setBucket(currentBucket);
|
||
|
return data;
|
||
|
};
|