You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
350 lines
9.4 KiB
350 lines
9.4 KiB
6 years ago
|
// mail-qman - deliver mail messages from the queue
|
||
|
|
||
|
// The spool directory has the following structure:
|
||
|
// * pid/<pid> - message files under construction
|
||
|
// * mess/<inumber> - message files
|
||
|
// * todo/<message inumber> - envelope files
|
||
|
// * notify - a UNIX socket that receives an <inumber> when a message
|
||
|
// is added to the spool
|
||
|
|
||
|
|
||
|
#include "max_align.h"
|
||
|
|
||
|
#include "libutil.h"
|
||
|
#include "shutil.h"
|
||
|
//#include "xsys.h"
|
||
|
|
||
|
#include <err.h>
|
||
|
#include <fcntl.h>
|
||
|
#include <spawn.h>
|
||
|
#include <stdio.h>
|
||
|
#include <stdlib.h>
|
||
|
#include <unistd.h>
|
||
|
#include <sys/socket.h>
|
||
|
#include <sys/stat.h>
|
||
|
#include <sys/un.h>
|
||
|
#include <sys/wait.h>
|
||
|
|
||
|
#include <stdexcept>
|
||
|
#include <string>
|
||
|
#include <thread>
|
||
|
|
||
|
#define SOCK_DGRAM_UNORDERED (-1)
|
||
|
#define O_ANYFD (0)
|
||
|
|
||
|
using std::string;
|
||
|
using std::thread;
|
||
|
|
||
|
extern char **environ;
|
||
|
|
||
|
static bool alt;
|
||
|
|
||
|
class spool_reader
|
||
|
{
|
||
|
string spooldir_;
|
||
|
int notifyfd_;
|
||
|
struct sockaddr_un notify_sun_;
|
||
|
|
||
|
public:
|
||
|
spool_reader(const string &spooldir) : spooldir_(spooldir)
|
||
|
{
|
||
|
// Create notification socket
|
||
|
notifyfd_ = socket(AF_UNIX, alt ? SOCK_DGRAM_UNORDERED : SOCK_DGRAM, 0);
|
||
|
if (notifyfd_ < 0)
|
||
|
edie("socket failed");
|
||
|
struct sockaddr_un sun{};
|
||
|
sun.sun_family = AF_UNIX;
|
||
|
snprintf(sun.sun_path, sizeof sun.sun_path, "%s/notify", spooldir.c_str());
|
||
|
|
||
|
// Normally we would just unlink(sun.sun_path), but since there's
|
||
|
// no way to kill mail-qman on xv6 right now, if it exists, it
|
||
|
// means this is a duplicate.
|
||
|
struct stat st;
|
||
|
if (stat(sun.sun_path, &st) == 0)
|
||
|
die("%s exists; mail-qman already running?", sun.sun_path);
|
||
|
|
||
|
unlink(sun.sun_path);
|
||
|
if (bind(notifyfd_, (struct sockaddr*)&sun, SUN_LEN(&sun)) < 0)
|
||
|
edie("bind failed");
|
||
|
|
||
|
notify_sun_ = sun;
|
||
|
}
|
||
|
|
||
|
string dequeue()
|
||
|
{
|
||
|
char buf[256];
|
||
|
ssize_t r = recv(notifyfd_, buf, sizeof buf, 0);
|
||
|
if (r < 0)
|
||
|
edie("recv failed");
|
||
|
return {buf, (size_t)r};
|
||
|
}
|
||
|
|
||
|
string get_recipient(const string &id)
|
||
|
{
|
||
|
char path[256];
|
||
|
snprintf(path, sizeof path, "%s/todo/%s", spooldir_.c_str(), id.c_str());
|
||
|
int fd = open(path, O_RDONLY|O_CLOEXEC|(alt ? O_ANYFD : 0));
|
||
|
if (fd < 0)
|
||
|
edie("open %s failed", path);
|
||
|
struct stat st;
|
||
|
// We don't use "alt" here, even though this is an alternate
|
||
|
// interface, because this commutes regardless. Passing
|
||
|
// STAT_OMIT_NLINK is just for performance.
|
||
|
// XXX Maybe we should use a resizing read rather than fstat.
|
||
|
if (fstatx(fd, &st, STAT_OMIT_NLINK) < 0)
|
||
|
edie("fstat %s failed", path);
|
||
|
string res(st.st_size, 0);
|
||
|
if (readall(fd, &res.front(), res.size()) != (ssize_t)res.size())
|
||
|
edie("readall %s failed", path);
|
||
|
close(fd);
|
||
|
return res;
|
||
|
}
|
||
|
|
||
|
int open_message(const string &id)
|
||
|
{
|
||
|
char path[256];
|
||
|
snprintf(path, sizeof path, "%s/mess/%s", spooldir_.c_str(), id.c_str());
|
||
|
int fd = open(path, O_RDONLY|O_CLOEXEC|(alt ? O_ANYFD : 0));
|
||
|
if (fd < 0)
|
||
|
edie("open %s failed", path);
|
||
|
return fd;
|
||
|
}
|
||
|
|
||
|
void remove(const string &id)
|
||
|
{
|
||
|
string x;
|
||
|
x.append(spooldir_).append("/todo/").append(id);
|
||
|
unlink(x.c_str());
|
||
|
x.clear();
|
||
|
x.append(spooldir_).append("/mess/").append(id);
|
||
|
unlink(x.c_str());
|
||
|
}
|
||
|
|
||
|
void exit_others(int mycpu, int nthread)
|
||
|
{
|
||
|
// xv6 doesn't have an easy way to kill processes, so cascade the
|
||
|
// exit message to all other threads. We have to affinitize
|
||
|
// ourselves around in case socket load balancing is off.
|
||
|
int notifyfd = socket(AF_UNIX, SOCK_DGRAM, 0);
|
||
|
if (notifyfd < 0)
|
||
|
edie("%s: socket failed", __func__);
|
||
|
|
||
|
const char *killmsg = "EXIT2";
|
||
|
for (int i = 0; i < nthread; ++i) {
|
||
|
if (i == mycpu)
|
||
|
continue;
|
||
|
//setaffinity(i);
|
||
|
if (sendto(notifyfd, killmsg, strlen(killmsg), 0,
|
||
|
(struct sockaddr*)¬ify_sun_, SUN_LEN(¬ify_sun_)) < 0)
|
||
|
edie("%s: sendto failed", __func__);
|
||
|
}
|
||
|
|
||
|
close(notifyfd);
|
||
|
|
||
|
//setaffinity(mycpu);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
class deliverer
|
||
|
{
|
||
|
pid_t pid_;
|
||
|
string mailroot_;
|
||
|
bool pool_;
|
||
|
string pool_recipient_;
|
||
|
int msgpipe[2], respipe[2];
|
||
|
|
||
|
void start_child(const char *argv[], int stdin, int stdout)
|
||
|
{
|
||
|
if (alt) {
|
||
|
#if defined(XV6_USER)
|
||
|
// xv6 doesn't define errno.
|
||
|
int errno = 0;
|
||
|
#endif
|
||
|
posix_spawn_file_actions_t actions;
|
||
|
if ((errno = posix_spawn_file_actions_init(&actions)))
|
||
|
edie("posix_spawn_file_actions_init failed");
|
||
|
if (stdin >= 0)
|
||
|
if ((errno = posix_spawn_file_actions_adddup2(&actions, stdin, 0)))
|
||
|
edie("posix_spawn_file_actions_adddup2 failed");
|
||
|
if (stdout >= 0)
|
||
|
if ((errno = posix_spawn_file_actions_adddup2(&actions, stdout, 1)))
|
||
|
edie("posix_spawn_file_actions_adddup2 failed");
|
||
|
if ((errno = posix_spawn(&pid_, argv[0], &actions, nullptr,
|
||
|
const_cast<char *const*>(argv), environ)))
|
||
|
edie("posix_spawn failed");
|
||
|
if ((errno = posix_spawn_file_actions_destroy(&actions)))
|
||
|
edie("posix_spawn_file_actions_destroy failed");
|
||
|
} else {
|
||
|
pid_ = fork();
|
||
|
if (pid_ < 0)
|
||
|
edie("fork failed");
|
||
|
if (pid_ == 0) {
|
||
|
// Note that this doesn't handle the case where stdin/stdout are
|
||
|
// 0/1 and have O_CLOEXEC set, but that never happens here.
|
||
|
if (stdin >= 0 && dup2(stdin, 0) < 0)
|
||
|
edie("dup2 stdin failed");
|
||
|
if (stdout >= 0 && dup2(stdout, 1) < 0)
|
||
|
edie("dup2 stdout failed");
|
||
|
execv(argv[0], const_cast<char *const*>(argv));
|
||
|
edie("execv %s failed", argv[0]);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void wait_child()
|
||
|
{
|
||
|
if (pool_) {
|
||
|
close(msgpipe[1]);
|
||
|
close(respipe[0]);
|
||
|
}
|
||
|
|
||
|
int status;
|
||
|
if (waitpid(pid_, &status, 0) < 0)
|
||
|
edie("waitpid failed");
|
||
|
if (!WIFEXITED(status) || WEXITSTATUS(status))
|
||
|
die("deliver failed: status %d", status);
|
||
|
pid_ = 0;
|
||
|
}
|
||
|
|
||
|
public:
|
||
|
deliverer(const string &mailroot, bool pool)
|
||
|
: pid_(0), mailroot_(mailroot), pool_(pool) { }
|
||
|
|
||
|
~deliverer()
|
||
|
{
|
||
|
if (pid_)
|
||
|
wait_child();
|
||
|
}
|
||
|
|
||
|
void
|
||
|
deliver(const string &recipient, int msgfd)
|
||
|
{
|
||
|
if (pool_) {
|
||
|
if (pid_ && recipient != pool_recipient_)
|
||
|
// Restart child if recipient changed
|
||
|
wait_child();
|
||
|
|
||
|
if (!pid_) {
|
||
|
// Start batch-mode deliver process
|
||
|
const char *argv[] = {EP("mail-deliver"), "-b", mailroot_.c_str(),
|
||
|
recipient.c_str(), nullptr};
|
||
|
pool_recipient_ = recipient;
|
||
|
//if (pipe2(msgpipe, O_CLOEXEC|O_ANYFD) < 0)
|
||
|
if (pipe2(msgpipe, O_CLOEXEC) < 0)
|
||
|
edie("pipe msgpipe failed");
|
||
|
//if (pipe2(respipe, O_CLOEXEC|O_ANYFD) < 0)
|
||
|
if (pipe2(respipe, O_CLOEXEC) < 0)
|
||
|
edie("pipe respipe failed");
|
||
|
start_child(argv, msgpipe[0], respipe[1]);
|
||
|
close(msgpipe[0]);
|
||
|
close(respipe[1]);
|
||
|
}
|
||
|
|
||
|
// Get length of message
|
||
|
uint64_t msg_len;
|
||
|
if ((msg_len = fd_len(msgfd)) < 0)
|
||
|
edie("mail-qman: failed to get length of message");
|
||
|
|
||
|
// Deliver message to running deliver process
|
||
|
xwrite(msgpipe[1], &msg_len, sizeof msg_len);
|
||
|
ssize_t r = copy_fd_n(msgpipe[1], msgfd, msg_len);
|
||
|
if (r < 0)
|
||
|
edie("mail-qman: copy_fd_n to mail-deliver failed");
|
||
|
else if ((uint64_t)r < msg_len)
|
||
|
die("mail-qman: short write to mail-deliver (%zd < %zu)",
|
||
|
r, (size_t)msg_len);
|
||
|
|
||
|
// Get batch-mode response
|
||
|
uint64_t res;
|
||
|
if (xread(respipe[0], &res, sizeof res) != sizeof res)
|
||
|
die("mail-qman: short read of deliver result code");
|
||
|
if (res != 0)
|
||
|
die("mail-qman: mail-deliver returned status %d", (int)res);
|
||
|
} else {
|
||
|
const char *argv[] = {EP("mail-deliver"), mailroot_.c_str(),
|
||
|
recipient.c_str(), nullptr};
|
||
|
start_child(argv, msgfd, -1);
|
||
|
wait_child();
|
||
|
}
|
||
|
}
|
||
|
};
|
||
|
|
||
|
static void
|
||
|
do_process(spool_reader *spool, const string &mailroot, bool pool,
|
||
|
int nthread, int cpu)
|
||
|
{
|
||
|
deliverer d{mailroot, pool};
|
||
|
while (true) {
|
||
|
string id = spool->dequeue();
|
||
|
if (id == "EXIT") {
|
||
|
spool->exit_others(cpu, nthread);
|
||
|
return;
|
||
|
}
|
||
|
if (id == "EXIT2")
|
||
|
return;
|
||
|
string recip = spool->get_recipient(id);
|
||
|
int msgfd = spool->open_message(id);
|
||
|
d.deliver(recip, msgfd);
|
||
|
close(msgfd);
|
||
|
spool->remove(id);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
static void
|
||
|
usage(const char *argv0)
|
||
|
{
|
||
|
fprintf(stderr, "Usage: %s [options] spooldir mailroot nthread\n", argv0);
|
||
|
fprintf(stderr, " -a none Use regular APIs (default)\n");
|
||
|
fprintf(stderr, " all Use alternate APIs\n");
|
||
|
fprintf(stderr, " -p Use pooled mail-deliver\n");
|
||
|
exit(2);
|
||
|
}
|
||
|
|
||
|
int
|
||
|
main(int argc, char **argv)
|
||
|
{
|
||
|
int opt;
|
||
|
bool pool = false;
|
||
|
while ((opt = getopt(argc, argv, "a:p")) != -1) {
|
||
|
switch (opt) {
|
||
|
case 'a':
|
||
|
if (strcmp(optarg, "all") == 0) {
|
||
|
errx(-1, "alt interface not supported");
|
||
|
alt = true;
|
||
|
} else if (strcmp(optarg, "none") == 0)
|
||
|
alt = false;
|
||
|
else
|
||
|
usage(argv[0]);
|
||
|
break;
|
||
|
case 'p':
|
||
|
pool = true;
|
||
|
break;
|
||
|
default:
|
||
|
usage(argv[0]);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (argc - optind != 3)
|
||
|
usage(argv[0]);
|
||
|
|
||
|
const char *spooldir = argv[optind];
|
||
|
const char *mailroot = argv[optind+1];
|
||
|
int nthread = atoi(argv[optind+2]);
|
||
|
if (nthread <= 0)
|
||
|
usage(argv[0]);
|
||
|
|
||
|
spool_reader reader{spooldir};
|
||
|
|
||
|
thread *threads = new thread[nthread];
|
||
|
|
||
|
for (int i = 0; i < nthread; ++i) {
|
||
|
//setaffinity(i);
|
||
|
threads[i] = std::move(thread(do_process, &reader, mailroot, pool,
|
||
|
nthread, i));
|
||
|
}
|
||
|
|
||
|
for (int i = 0; i < nthread; ++i)
|
||
|
threads[i].join();
|
||
|
return 0;
|
||
|
}
|