You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
252 lines
7.1 KiB
252 lines
7.1 KiB
1 month ago
|
'use strict';
|
||
|
|
||
|
// FIXME:
|
||
|
// replace this Transform mess with a method that pipes input argument to output argument
|
||
|
|
||
|
const MessageParser = require('./message-parser');
|
||
|
const RelaxedBody = require('./relaxed-body');
|
||
|
const sign = require('./sign');
|
||
|
const PassThrough = require('stream').PassThrough;
|
||
|
const fs = require('fs');
|
||
|
const path = require('path');
|
||
|
const crypto = require('crypto');
|
||
|
|
||
|
const DKIM_ALGO = 'sha256';
|
||
|
const MAX_MESSAGE_SIZE = 128 * 1024; // buffer messages larger than this to disk
|
||
|
|
||
|
/*
|
||
|
// Usage:
|
||
|
|
||
|
let dkim = new DKIM({
|
||
|
domainName: 'example.com',
|
||
|
keySelector: 'key-selector',
|
||
|
privateKey,
|
||
|
cacheDir: '/tmp'
|
||
|
});
|
||
|
dkim.sign(input).pipe(process.stdout);
|
||
|
|
||
|
// Where inputStream is a rfc822 message (either a stream, string or Buffer)
|
||
|
// and outputStream is a DKIM signed rfc822 message
|
||
|
*/
|
||
|
|
||
|
class DKIMSigner {
|
||
|
constructor(options, keys, input, output) {
|
||
|
this.options = options || {};
|
||
|
this.keys = keys;
|
||
|
|
||
|
this.cacheTreshold = Number(this.options.cacheTreshold) || MAX_MESSAGE_SIZE;
|
||
|
this.hashAlgo = this.options.hashAlgo || DKIM_ALGO;
|
||
|
|
||
|
this.cacheDir = this.options.cacheDir || false;
|
||
|
|
||
|
this.chunks = [];
|
||
|
this.chunklen = 0;
|
||
|
this.readPos = 0;
|
||
|
this.cachePath = this.cacheDir ? path.join(this.cacheDir, 'message.' + Date.now() + '-' + crypto.randomBytes(14).toString('hex')) : false;
|
||
|
this.cache = false;
|
||
|
|
||
|
this.headers = false;
|
||
|
this.bodyHash = false;
|
||
|
this.parser = false;
|
||
|
this.relaxedBody = false;
|
||
|
|
||
|
this.input = input;
|
||
|
this.output = output;
|
||
|
this.output.usingCache = false;
|
||
|
|
||
|
this.errored = false;
|
||
|
|
||
|
this.input.on('error', err => {
|
||
|
this.errored = true;
|
||
|
this.cleanup();
|
||
|
output.emit('error', err);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
cleanup() {
|
||
|
if (!this.cache || !this.cachePath) {
|
||
|
return;
|
||
|
}
|
||
|
fs.unlink(this.cachePath, () => false);
|
||
|
}
|
||
|
|
||
|
createReadCache() {
|
||
|
// pipe remainings to cache file
|
||
|
this.cache = fs.createReadStream(this.cachePath);
|
||
|
this.cache.once('error', err => {
|
||
|
this.cleanup();
|
||
|
this.output.emit('error', err);
|
||
|
});
|
||
|
this.cache.once('close', () => {
|
||
|
this.cleanup();
|
||
|
});
|
||
|
this.cache.pipe(this.output);
|
||
|
}
|
||
|
|
||
|
sendNextChunk() {
|
||
|
if (this.errored) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (this.readPos >= this.chunks.length) {
|
||
|
if (!this.cache) {
|
||
|
return this.output.end();
|
||
|
}
|
||
|
return this.createReadCache();
|
||
|
}
|
||
|
let chunk = this.chunks[this.readPos++];
|
||
|
if (this.output.write(chunk) === false) {
|
||
|
return this.output.once('drain', () => {
|
||
|
this.sendNextChunk();
|
||
|
});
|
||
|
}
|
||
|
setImmediate(() => this.sendNextChunk());
|
||
|
}
|
||
|
|
||
|
sendSignedOutput() {
|
||
|
let keyPos = 0;
|
||
|
let signNextKey = () => {
|
||
|
if (keyPos >= this.keys.length) {
|
||
|
this.output.write(this.parser.rawHeaders);
|
||
|
return setImmediate(() => this.sendNextChunk());
|
||
|
}
|
||
|
let key = this.keys[keyPos++];
|
||
|
let dkimField = sign(this.headers, this.hashAlgo, this.bodyHash, {
|
||
|
domainName: key.domainName,
|
||
|
keySelector: key.keySelector,
|
||
|
privateKey: key.privateKey,
|
||
|
headerFieldNames: this.options.headerFieldNames,
|
||
|
skipFields: this.options.skipFields
|
||
|
});
|
||
|
if (dkimField) {
|
||
|
this.output.write(Buffer.from(dkimField + '\r\n'));
|
||
|
}
|
||
|
return setImmediate(signNextKey);
|
||
|
};
|
||
|
|
||
|
if (this.bodyHash && this.headers) {
|
||
|
return signNextKey();
|
||
|
}
|
||
|
|
||
|
this.output.write(this.parser.rawHeaders);
|
||
|
this.sendNextChunk();
|
||
|
}
|
||
|
|
||
|
createWriteCache() {
|
||
|
this.output.usingCache = true;
|
||
|
// pipe remainings to cache file
|
||
|
this.cache = fs.createWriteStream(this.cachePath);
|
||
|
this.cache.once('error', err => {
|
||
|
this.cleanup();
|
||
|
// drain input
|
||
|
this.relaxedBody.unpipe(this.cache);
|
||
|
this.relaxedBody.on('readable', () => {
|
||
|
while (this.relaxedBody.read() !== null) {
|
||
|
// do nothing
|
||
|
}
|
||
|
});
|
||
|
this.errored = true;
|
||
|
// emit error
|
||
|
this.output.emit('error', err);
|
||
|
});
|
||
|
this.cache.once('close', () => {
|
||
|
this.sendSignedOutput();
|
||
|
});
|
||
|
this.relaxedBody.removeAllListeners('readable');
|
||
|
this.relaxedBody.pipe(this.cache);
|
||
|
}
|
||
|
|
||
|
signStream() {
|
||
|
this.parser = new MessageParser();
|
||
|
this.relaxedBody = new RelaxedBody({
|
||
|
hashAlgo: this.hashAlgo
|
||
|
});
|
||
|
|
||
|
this.parser.on('headers', value => {
|
||
|
this.headers = value;
|
||
|
});
|
||
|
|
||
|
this.relaxedBody.on('hash', value => {
|
||
|
this.bodyHash = value;
|
||
|
});
|
||
|
|
||
|
this.relaxedBody.on('readable', () => {
|
||
|
let chunk;
|
||
|
if (this.cache) {
|
||
|
return;
|
||
|
}
|
||
|
while ((chunk = this.relaxedBody.read()) !== null) {
|
||
|
this.chunks.push(chunk);
|
||
|
this.chunklen += chunk.length;
|
||
|
if (this.chunklen >= this.cacheTreshold && this.cachePath) {
|
||
|
return this.createWriteCache();
|
||
|
}
|
||
|
}
|
||
|
});
|
||
|
|
||
|
this.relaxedBody.on('end', () => {
|
||
|
if (this.cache) {
|
||
|
return;
|
||
|
}
|
||
|
this.sendSignedOutput();
|
||
|
});
|
||
|
|
||
|
this.parser.pipe(this.relaxedBody);
|
||
|
setImmediate(() => this.input.pipe(this.parser));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class DKIM {
|
||
|
constructor(options) {
|
||
|
this.options = options || {};
|
||
|
this.keys = [].concat(
|
||
|
this.options.keys || {
|
||
|
domainName: options.domainName,
|
||
|
keySelector: options.keySelector,
|
||
|
privateKey: options.privateKey
|
||
|
}
|
||
|
);
|
||
|
}
|
||
|
|
||
|
sign(input, extraOptions) {
|
||
|
let output = new PassThrough();
|
||
|
let inputStream = input;
|
||
|
let writeValue = false;
|
||
|
|
||
|
if (Buffer.isBuffer(input)) {
|
||
|
writeValue = input;
|
||
|
inputStream = new PassThrough();
|
||
|
} else if (typeof input === 'string') {
|
||
|
writeValue = Buffer.from(input);
|
||
|
inputStream = new PassThrough();
|
||
|
}
|
||
|
|
||
|
let options = this.options;
|
||
|
if (extraOptions && Object.keys(extraOptions).length) {
|
||
|
options = {};
|
||
|
Object.keys(this.options || {}).forEach(key => {
|
||
|
options[key] = this.options[key];
|
||
|
});
|
||
|
Object.keys(extraOptions || {}).forEach(key => {
|
||
|
if (!(key in options)) {
|
||
|
options[key] = extraOptions[key];
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
let signer = new DKIMSigner(options, this.keys, inputStream, output);
|
||
|
setImmediate(() => {
|
||
|
signer.signStream();
|
||
|
if (writeValue) {
|
||
|
setImmediate(() => {
|
||
|
inputStream.end(writeValue);
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
|
||
|
return output;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = DKIM;
|