You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
parttimejob/node_modules/ali-oss/lib/common/multipart-copy.js

241 lines
7.3 KiB

4 weeks ago
/* eslint-disable no-async-promise-executor */
const debug = require('debug')('ali-oss:multipart-copy');
const copy = require('copy-to');
const proto = exports;
/**
* Upload a part copy in a multipart from the source bucket/object
* used with initMultipartUpload and completeMultipartUpload.
* @param {String} name copy object name
* @param {String} uploadId the upload id
* @param {Number} partNo the part number
* @param {String} range like 0-102400 part size need to copy
* @param {Object} sourceData
* {String} sourceData.sourceKey the source object name
* {String} sourceData.sourceBucketName the source bucket name
* @param {Object} options
*/
/* eslint max-len: [0] */
proto.uploadPartCopy = async function uploadPartCopy(name, uploadId, partNo, range, sourceData, options = {}) {
options.headers = options.headers || {};
const versionId = options.versionId || (options.subres && options.subres.versionId) || null;
let copySource;
if (versionId) {
copySource = `/${sourceData.sourceBucketName}/${encodeURIComponent(sourceData.sourceKey)}?versionId=${versionId}`;
} else {
copySource = `/${sourceData.sourceBucketName}/${encodeURIComponent(sourceData.sourceKey)}`;
}
options.headers['x-oss-copy-source'] = copySource;
if (range) {
options.headers['x-oss-copy-source-range'] = `bytes=${range}`;
}
options.subres = {
partNumber: partNo,
uploadId
};
const params = this._objectRequestParams('PUT', name, options);
params.mime = options.mime;
params.successStatuses = [200];
const result = await this.request(params);
return {
name,
etag: result.res.headers.etag,
res: result.res
};
};
/**
* @param {String} name copy object name
* @param {Object} sourceData
* {String} sourceData.sourceKey the source object name
* {String} sourceData.sourceBucketName the source bucket name
* {Number} sourceData.startOffset data copy start byte offset, e.g: 0
* {Number} sourceData.endOffset data copy end byte offset, e.g: 102400
* @param {Object} options
* {Number} options.partSize
*/
proto.multipartUploadCopy = async function multipartUploadCopy(name, sourceData, options = {}) {
this.resetCancelFlag();
const { versionId = null } = options;
const metaOpt = {
versionId
};
const objectMeta = await this._getObjectMeta(sourceData.sourceBucketName, sourceData.sourceKey, metaOpt);
const fileSize = objectMeta.res.headers['content-length'];
sourceData.startOffset = sourceData.startOffset || 0;
sourceData.endOffset = sourceData.endOffset || fileSize;
if (options.checkpoint && options.checkpoint.uploadId) {
return await this._resumeMultipartCopy(options.checkpoint, sourceData, options);
}
const minPartSize = 100 * 1024;
const copySize = sourceData.endOffset - sourceData.startOffset;
if (copySize < minPartSize) {
throw new Error(`copySize must not be smaller than ${minPartSize}`);
}
if (options.partSize && options.partSize < minPartSize) {
throw new Error(`partSize must not be smaller than ${minPartSize}`);
}
const init = await this.initMultipartUpload(name, options);
const { uploadId } = init;
const partSize = this._getPartSize(copySize, options.partSize);
const checkpoint = {
name,
copySize,
partSize,
uploadId,
doneParts: []
};
if (options && options.progress) {
await options.progress(0, checkpoint, init.res);
}
return await this._resumeMultipartCopy(checkpoint, sourceData, options);
};
/*
* Resume multipart copy from checkpoint. The checkpoint will be
* updated after each successful part copy.
* @param {Object} checkpoint the checkpoint
* @param {Object} options
*/
proto._resumeMultipartCopy = async function _resumeMultipartCopy(checkpoint, sourceData, options) {
if (this.isCancel()) {
throw this._makeCancelEvent();
}
const { versionId = null } = options;
const metaOpt = {
versionId
};
const { copySize, partSize, uploadId, doneParts, name } = checkpoint;
const partOffs = this._divideMultipartCopyParts(copySize, partSize, sourceData.startOffset);
const numParts = partOffs.length;
const uploadPartCopyOptions = {
headers: {}
};
if (options.copyheaders) {
copy(options.copyheaders).to(uploadPartCopyOptions.headers);
}
if (versionId) {
copy(metaOpt).to(uploadPartCopyOptions);
}
const uploadPartJob = function uploadPartJob(self, partNo, source) {
return new Promise(async (resolve, reject) => {
try {
if (!self.isCancel()) {
const pi = partOffs[partNo - 1];
const range = `${pi.start}-${pi.end - 1}`;
let result;
try {
result = await self.uploadPartCopy(name, uploadId, partNo, range, source, uploadPartCopyOptions);
} catch (error) {
if (error.status === 404) {
throw self._makeAbortEvent();
}
throw error;
}
if (!self.isCancel()) {
debug(`content-range ${result.res.headers['content-range']}`);
doneParts.push({
number: partNo,
etag: result.res.headers.etag
});
checkpoint.doneParts = doneParts;
if (options && options.progress) {
await options.progress(doneParts.length / numParts, checkpoint, result.res);
}
}
}
resolve();
} catch (err) {
err.partNum = partNo;
reject(err);
}
});
};
const all = Array.from(new Array(numParts), (x, i) => i + 1);
const done = doneParts.map(p => p.number);
const todo = all.filter(p => done.indexOf(p) < 0);
const defaultParallel = 5;
const parallel = options.parallel || defaultParallel;
if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
for (let i = 0; i < todo.length; i++) {
if (this.isCancel()) {
throw this._makeCancelEvent();
}
/* eslint no-await-in-loop: [0] */
await uploadPartJob(this, todo[i], sourceData);
}
} else {
// upload in parallel
const errors = await this._parallelNode(todo, parallel, uploadPartJob, sourceData);
const abortEvent = errors.find(err => err.name === 'abort');
if (abortEvent) throw abortEvent;
if (this.isCancel()) {
throw this._makeCancelEvent();
}
// check errors after all jobs are completed
if (errors && errors.length > 0) {
const err = errors[0];
err.message = `Failed to copy some parts with error: ${err.toString()} part_num: ${err.partNum}`;
throw err;
}
}
return await this.completeMultipartUpload(name, uploadId, doneParts, options);
};
proto._divideMultipartCopyParts = function _divideMultipartCopyParts(fileSize, partSize, startOffset) {
const numParts = Math.ceil(fileSize / partSize);
const partOffs = [];
for (let i = 0; i < numParts; i++) {
const start = partSize * i + startOffset;
const end = Math.min(start + partSize, fileSize + startOffset);
partOffs.push({
start,
end
});
}
return partOffs;
};
/**
* Get Object Meta
* @param {String} bucket bucket name
* @param {String} name object name
* @param {Object} options
*/
proto._getObjectMeta = async function _getObjectMeta(bucket, name, options) {
const currentBucket = this.getBucket();
this.setBucket(bucket);
const data = await this.head(name, options);
this.setBucket(currentBucket);
return data;
};