You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
362 lines
10 KiB
362 lines
10 KiB
4 weeks ago
|
// var debug = require('debug')('ali-oss:multipart');
|
||
|
const util = require('util');
|
||
|
const path = require('path');
|
||
|
const mime = require('mime');
|
||
|
const copy = require('copy-to');
|
||
|
const { isBlob } = require('../common/utils/isBlob');
|
||
|
const { isFile } = require('../common/utils/isFile');
|
||
|
const { isBuffer } = require('../common/utils/isBuffer');
|
||
|
|
||
|
const proto = exports;
|
||
|
|
||
|
/**
|
||
|
* Multipart operations
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* Upload a file to OSS using multipart uploads
|
||
|
* @param {String} name
|
||
|
* @param {String|File|Buffer} file
|
||
|
* @param {Object} options
|
||
|
* {Object} [options.callback] The callback parameter is composed of a JSON string encoded in Base64
|
||
|
* {String} options.callback.url the OSS sends a callback request to this URL
|
||
|
* {String} [options.callback.host] The host header value for initiating callback requests
|
||
|
* {String} options.callback.body The value of the request body when a callback is initiated
|
||
|
* {String} [options.callback.contentType] The Content-Type of the callback requests initiated
|
||
|
* {Boolean} [options.callback.callbackSNI] Whether OSS sends SNI to the origin address specified by callbackUrl when a callback request is initiated from the client
|
||
|
* {Object} [options.callback.customValue] Custom parameters are a map of key-values, e.g:
|
||
|
* customValue = {
|
||
|
* key1: 'value1',
|
||
|
* key2: 'value2'
|
||
|
* }
|
||
|
*/
|
||
|
proto.multipartUpload = async function multipartUpload(name, file, options = {}) {
|
||
|
this.resetCancelFlag();
|
||
|
options.disabledMD5 = options.disabledMD5 === undefined ? true : !!options.disabledMD5;
|
||
|
if (options.checkpoint && options.checkpoint.uploadId) {
|
||
|
if (file && isFile(file)) options.checkpoint.file = file;
|
||
|
|
||
|
return await this._resumeMultipart(options.checkpoint, options);
|
||
|
}
|
||
|
|
||
|
const minPartSize = 100 * 1024;
|
||
|
|
||
|
if (!options.mime) {
|
||
|
if (isFile(file)) {
|
||
|
options.mime = mime.getType(path.extname(file.name));
|
||
|
} else if (isBlob(file)) {
|
||
|
options.mime = file.type;
|
||
|
} else if (isBuffer(file)) {
|
||
|
options.mime = '';
|
||
|
} else {
|
||
|
options.mime = mime.getType(path.extname(file));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
options.headers = options.headers || {};
|
||
|
this._convertMetaToHeaders(options.meta, options.headers);
|
||
|
|
||
|
const fileSize = await this._getFileSize(file);
|
||
|
if (fileSize < minPartSize) {
|
||
|
options.contentLength = fileSize;
|
||
|
const result = await this.put(name, file, options);
|
||
|
if (options && options.progress) {
|
||
|
await options.progress(1);
|
||
|
}
|
||
|
|
||
|
const ret = {
|
||
|
res: result.res,
|
||
|
bucket: this.options.bucket,
|
||
|
name,
|
||
|
etag: result.res.headers.etag
|
||
|
};
|
||
|
|
||
|
if ((options.headers && options.headers['x-oss-callback']) || options.callback) {
|
||
|
ret.data = result.data;
|
||
|
}
|
||
|
|
||
|
return ret;
|
||
|
}
|
||
|
if (options.partSize && !(parseInt(options.partSize, 10) === options.partSize)) {
|
||
|
throw new Error('partSize must be int number');
|
||
|
}
|
||
|
|
||
|
if (options.partSize && options.partSize < minPartSize) {
|
||
|
throw new Error(`partSize must not be smaller than ${minPartSize}`);
|
||
|
}
|
||
|
|
||
|
const initResult = await this.initMultipartUpload(name, options);
|
||
|
const { uploadId } = initResult;
|
||
|
const partSize = this._getPartSize(fileSize, options.partSize);
|
||
|
|
||
|
const checkpoint = {
|
||
|
file,
|
||
|
name,
|
||
|
fileSize,
|
||
|
partSize,
|
||
|
uploadId,
|
||
|
doneParts: []
|
||
|
};
|
||
|
|
||
|
if (options && options.progress) {
|
||
|
await options.progress(0, checkpoint, initResult.res);
|
||
|
}
|
||
|
|
||
|
return await this._resumeMultipart(checkpoint, options);
|
||
|
};
|
||
|
|
||
|
/*
|
||
|
* Resume multipart upload from checkpoint. The checkpoint will be
|
||
|
* updated after each successful part upload.
|
||
|
* @param {Object} checkpoint the checkpoint
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto._resumeMultipart = async function _resumeMultipart(checkpoint, options) {
|
||
|
const that = this;
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
const { file, fileSize, partSize, uploadId, doneParts, name } = checkpoint;
|
||
|
|
||
|
const internalDoneParts = [];
|
||
|
|
||
|
if (doneParts.length > 0) {
|
||
|
copy(doneParts).to(internalDoneParts);
|
||
|
}
|
||
|
|
||
|
const partOffs = this._divideParts(fileSize, partSize);
|
||
|
const numParts = partOffs.length;
|
||
|
let multipartFinish = false;
|
||
|
|
||
|
let uploadPartJob = (self, partNo) => {
|
||
|
// eslint-disable-next-line no-async-promise-executor
|
||
|
return new Promise(async (resolve, reject) => {
|
||
|
try {
|
||
|
if (!self.isCancel()) {
|
||
|
const pi = partOffs[partNo - 1];
|
||
|
const content = await self._createBuffer(file, pi.start, pi.end);
|
||
|
const data = {
|
||
|
content,
|
||
|
size: pi.end - pi.start
|
||
|
};
|
||
|
|
||
|
let result;
|
||
|
try {
|
||
|
result = await self._uploadPart(name, uploadId, partNo, data, options);
|
||
|
} catch (error) {
|
||
|
if (error.status === 404) {
|
||
|
throw self._makeAbortEvent();
|
||
|
}
|
||
|
throw error;
|
||
|
}
|
||
|
if (!self.isCancel() && !multipartFinish) {
|
||
|
checkpoint.doneParts.push({
|
||
|
number: partNo,
|
||
|
etag: result.res.headers.etag
|
||
|
});
|
||
|
|
||
|
if (options.progress) {
|
||
|
await options.progress(doneParts.length / (numParts + 1), checkpoint, result.res);
|
||
|
}
|
||
|
|
||
|
resolve({
|
||
|
number: partNo,
|
||
|
etag: result.res.headers.etag
|
||
|
});
|
||
|
} else {
|
||
|
resolve();
|
||
|
}
|
||
|
} else {
|
||
|
resolve();
|
||
|
}
|
||
|
} catch (err) {
|
||
|
const tempErr = new Error();
|
||
|
tempErr.name = err.name;
|
||
|
tempErr.message = err.message;
|
||
|
tempErr.stack = err.stack;
|
||
|
tempErr.partNum = partNo;
|
||
|
copy(err).to(tempErr);
|
||
|
reject(tempErr);
|
||
|
}
|
||
|
});
|
||
|
};
|
||
|
|
||
|
const all = Array.from(new Array(numParts), (x, i) => i + 1);
|
||
|
const done = internalDoneParts.map(p => p.number);
|
||
|
const todo = all.filter(p => done.indexOf(p) < 0);
|
||
|
const defaultParallel = 5;
|
||
|
const parallel = options.parallel || defaultParallel;
|
||
|
|
||
|
// upload in parallel
|
||
|
const jobErr = await this._parallel(todo, parallel, value => {
|
||
|
return new Promise((resolve, reject) => {
|
||
|
uploadPartJob(that, value)
|
||
|
.then(result => {
|
||
|
if (result) {
|
||
|
internalDoneParts.push(result);
|
||
|
}
|
||
|
resolve();
|
||
|
})
|
||
|
.catch(err => {
|
||
|
reject(err);
|
||
|
});
|
||
|
});
|
||
|
});
|
||
|
multipartFinish = true;
|
||
|
|
||
|
const abortEvent = jobErr.find(err => err.name === 'abort');
|
||
|
if (abortEvent) throw abortEvent;
|
||
|
|
||
|
if (this.isCancel()) {
|
||
|
uploadPartJob = null;
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
|
||
|
if (jobErr && jobErr.length > 0) {
|
||
|
jobErr[0].message = `Failed to upload some parts with error: ${jobErr[0].toString()} part_num: ${
|
||
|
jobErr[0].partNum
|
||
|
}`;
|
||
|
throw jobErr[0];
|
||
|
}
|
||
|
|
||
|
return await this.completeMultipartUpload(name, uploadId, internalDoneParts, options);
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Get file size
|
||
|
*/
|
||
|
proto._getFileSize = async function _getFileSize(file) {
|
||
|
if (isBuffer(file)) {
|
||
|
return file.length;
|
||
|
} else if (isBlob(file) || isFile(file)) {
|
||
|
return file.size;
|
||
|
}
|
||
|
|
||
|
throw new Error('_getFileSize requires Buffer/File/Blob.');
|
||
|
};
|
||
|
|
||
|
/*
|
||
|
* Readable stream for Web File
|
||
|
*/
|
||
|
const { Readable } = require('stream');
|
||
|
|
||
|
function WebFileReadStream(file, options) {
|
||
|
if (!(this instanceof WebFileReadStream)) {
|
||
|
return new WebFileReadStream(file, options);
|
||
|
}
|
||
|
|
||
|
Readable.call(this, options);
|
||
|
|
||
|
this.file = file;
|
||
|
this.reader = new FileReader();
|
||
|
this.start = 0;
|
||
|
this.finish = false;
|
||
|
this.fileBuffer = null;
|
||
|
}
|
||
|
util.inherits(WebFileReadStream, Readable);
|
||
|
|
||
|
WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
|
||
|
if (this.fileBuffer) {
|
||
|
let pushRet = true;
|
||
|
while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) {
|
||
|
const { start } = this;
|
||
|
let end = start + size;
|
||
|
end = end > this.fileBuffer.length ? this.fileBuffer.length : end;
|
||
|
this.start = end;
|
||
|
pushRet = this.push(this.fileBuffer.slice(start, end));
|
||
|
}
|
||
|
}
|
||
|
};
|
||
|
|
||
|
WebFileReadStream.prototype._read = function _read(size) {
|
||
|
if (
|
||
|
(this.file && this.start >= this.file.size) ||
|
||
|
(this.fileBuffer && this.start >= this.fileBuffer.length) ||
|
||
|
this.finish ||
|
||
|
(this.start === 0 && !this.file)
|
||
|
) {
|
||
|
if (!this.finish) {
|
||
|
this.fileBuffer = null;
|
||
|
this.finish = true;
|
||
|
}
|
||
|
this.push(null);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
const defaultReadSize = 16 * 1024;
|
||
|
size = size || defaultReadSize;
|
||
|
|
||
|
const that = this;
|
||
|
this.reader.onload = function onload(e) {
|
||
|
that.fileBuffer = Buffer.from(new Uint8Array(e.target.result));
|
||
|
that.file = null;
|
||
|
that.readFileAndPush(size);
|
||
|
};
|
||
|
|
||
|
if (this.start === 0) {
|
||
|
this.reader.readAsArrayBuffer(this.file);
|
||
|
} else {
|
||
|
this.readFileAndPush(size);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
function getBuffer(file) {
|
||
|
// Some browsers do not support Blob.prototype.arrayBuffer, such as IE
|
||
|
if (file.arrayBuffer) return file.arrayBuffer();
|
||
|
return new Promise((resolve, reject) => {
|
||
|
const reader = new FileReader();
|
||
|
reader.onload = function (e) {
|
||
|
resolve(e.target.result);
|
||
|
};
|
||
|
reader.onerror = function (e) {
|
||
|
reject(e);
|
||
|
};
|
||
|
reader.readAsArrayBuffer(file);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
proto._createBuffer = async function _createBuffer(file, start, end) {
|
||
|
if (isBlob(file) || isFile(file)) {
|
||
|
const _file = file.slice(start, end);
|
||
|
const fileContent = await getBuffer(_file);
|
||
|
return Buffer.from(fileContent);
|
||
|
} else if (isBuffer(file)) {
|
||
|
return file.subarray(start, end);
|
||
|
} else {
|
||
|
throw new Error('_createBuffer requires File/Blob/Buffer.');
|
||
|
}
|
||
|
};
|
||
|
|
||
|
proto._getPartSize = function _getPartSize(fileSize, partSize) {
|
||
|
const maxNumParts = 10 * 1000;
|
||
|
const defaultPartSize = 1 * 1024 * 1024;
|
||
|
|
||
|
if (!partSize) partSize = defaultPartSize;
|
||
|
const safeSize = Math.ceil(fileSize / maxNumParts);
|
||
|
|
||
|
if (partSize < safeSize) {
|
||
|
partSize = safeSize;
|
||
|
console.warn(
|
||
|
`partSize has been set to ${partSize}, because the partSize you provided causes partNumber to be greater than 10,000`
|
||
|
);
|
||
|
}
|
||
|
return partSize;
|
||
|
};
|
||
|
|
||
|
proto._divideParts = function _divideParts(fileSize, partSize) {
|
||
|
const numParts = Math.ceil(fileSize / partSize);
|
||
|
|
||
|
const partOffs = [];
|
||
|
for (let i = 0; i < numParts; i++) {
|
||
|
const start = partSize * i;
|
||
|
const end = Math.min(start + partSize, fileSize);
|
||
|
|
||
|
partOffs.push({
|
||
|
start,
|
||
|
end
|
||
|
});
|
||
|
}
|
||
|
|
||
|
return partOffs;
|
||
|
};
|