Files
cworker/cworker.c
2024-01-02 03:28:26 +02:00

399 lines
10 KiB
C

/*
*
* Copyright 2023 Oleg Borodin <borodin@unix7.org>
*
*/
#ifdef __linux__
#define _GNU_SOURCE
#endif
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/time.h>
//#include <sys/event.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <limits.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <poll.h>
#include <signal.h>
#include <cllexer.h>
#include <clparser.h>
#include <cflexer.h>
#include <cfparser.h>
#include <massert.h>
#include <rcache.h>
#include <jlexer.h>
#include <jparser.h>
#include <jblock.h>
#include <config.h>
#include <cworker.h>
#include <logger.h>
static const int64_t default_port = 9002;
static cworker_t* pworker = NULL;
static int mkdirall(const char* path, mode_t mode);
void sighandler(int signum);
static int cworker_readconf(const cworker_t* worker) {
log_debug("Configuration reading");
int conffd = -1;
if ((conffd = open(srv_configpath, O_RDONLY)) < 0) {
log_error("Configuration file opening error: %s %s", strerror(errno), srv_configpath);
return -1;
}
rcache_t cache;
cflexer_t lexer;
cfparser_t parser;
rcache_init(&cache, conffd);
cflexer_init(&lexer, &cache);
cfparser_init(&parser, &lexer);
if (cfparser_parse(&parser) < 0) {
log_error("Configuration parsing error");
return -1;
}
cfparser_bind(&parser, CFVALTYPE_INT, "port", (void *)&(worker->port));
cfparser_bind(&parser, CFVALTYPE_BOOL, "nofork", (void *)&(worker->nofork));
cfparser_destroy(&parser);
cflexer_destroy(&lexer);
rcache_destroy(&cache);
return 0;
}
static int cworker_readopts(const cworker_t* worker, char** argv, int argc) {
log_debug("Reading options");
cllexer_t lexer;
clparser_t parser;
cllexer_init(&lexer);
clparser_init(&parser, &lexer);
clparser_bind(&parser, CLVALTYPE_INT, "port", (void *)&(worker->port));
clparser_bind(&parser, CLVALTYPE_BOOL, "nofork", (void *)&(worker->nofork));
if (clparser_parse(&parser, &argv[1], argc - 1) < 0) {
log_error("Args parsing error");
return -1;
}
clparser_destroy(&parser);
return 0;
}
static int cworker_openlog(cworker_t* worker) {
if (!worker->nofork) {
log_debug("Redirect output");
if (mkdirall(srv_logpath, S_IRWXU|S_IRGRP|S_IXGRP) < 0) {
log_error("Log dir creating error: %s %s", strerror(errno), srv_logpath);
}
if ((worker->logfd = open(srv_logpath, O_WRONLY|O_APPEND|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP)) < 0) {
log_error("Log file opening error: %s %s", strerror(errno), srv_logpath);
return -1;
}
dup2(worker->logfd, STDOUT_FILENO);
dup2(worker->logfd, STDERR_FILENO);
}
return 0;
}
int cworker_writepid(const cworker_t* worker) {
if (!worker->nofork) {
log_debug("Write pid file");
if (mkdirall(srv_runpath, S_IRWXU|S_IRGRP|S_IXGRP) < 0) {
log_error("Run directory creating error: %s %s", strerror(errno), srv_runpath);
}
int pid_fd = -1;
if ((pid_fd = open(srv_runpath, O_WRONLY|O_TRUNC|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP)) < 0) {
log_error("Process ID file opening error: %s %s", strerror(errno), srv_runpath);
return -1;
}
if (dprintf(pid_fd, "%d", getpid()) < 0) {
log_error("Process ID file writing error: %s %s", strerror(errno), srv_runpath);
close(pid_fd);
return -1;
}
close(pid_fd);
}
return 0;
}
int cworker_init(cworker_t* worker, char** argv, int argc) {
log_init();
log_debug("Init service");
worker->port = default_port;
worker->nofork = true;
worker->socket = 0;
worker->logfd = 0;
pworker = worker;
if (cworker_readconf(worker) < 0) {
log_error("Config reading error");
return -1;
}
if (cworker_readopts(worker, argv, argc) < 0) {
log_error("Options reading error");
return -1;
}
if (!worker->nofork) {
if (cworker_openlog(worker) < 0) {
log_error("Log opening error");
return -1;
}
}
log_debug("Listening port: %d", worker->port);
return 0;
}
int cworker_detach(const cworker_t* worker) {
if (!worker->nofork) {
log_debug("Service detaching");
int childpid = -1;
if((childpid = fork()) < 0) {
log_error("Fork error %d: %s ", errno, strerror(errno));
return -1;
}
if (childpid == 0) {
log_debug("Service forked");
return 0;
// child
} else {
// parent
exit(0);
}
}
return 0;
}
int cworker_build(cworker_t* worker) {
log_debug("Service building");
signal(SIGCHLD, SIG_IGN);
signal(SIGHUP, sighandler);
signal(SIGTERM, sighandler);
if((worker->socket = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
log_debug("Socket creating error %d: %s", errno, strerror(errno));
return -1;
}
int optval = 1;
if (setsockopt(worker->socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
log_debug("Socket option setting error %d: %s", errno, strerror(errno));
return -1;
}
struct sockaddr addr;
struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
paddr->sin_family = AF_INET;
paddr->sin_addr.s_addr = INADDR_ANY;
paddr->sin_port = htons(worker->port);
if (bind(worker->socket, (struct sockaddr*)paddr, sizeof(struct sockaddr_in)) < 0) {
log_debug("Socket binding error %d: %s ", errno, strerror(errno));
return -1;
}
int backlog = 100;
if (listen(worker->socket, backlog) < 0) {
log_debug("Socket listening error %d: %s ", errno, strerror(errno));
return -1;
}
return 0;
}
int cworker_handler(const cworker_t* worker, int socket);
int cworker_run(const cworker_t* worker) {
log_debug("Service running");
while (1) {
int newsocket = 0;
if ((newsocket = accept(worker->socket, NULL, 0)) > 3) {
int childpid = -1;
if((childpid = fork()) < 0) {
log_error("Fork error %d: %s ", errno, strerror(errno));
continue;
}
if (childpid == 0) {
/* Child */
log_debug("Service %d forked", getpid());
signal(SIGHUP, SIG_IGN);
signal(SIGTERM, SIG_IGN);
struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
if (setsockopt(newsocket, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
log_debug("Socket option setting error %d: %s ", errno, strerror(errno));
return -1;
}
int res = cworker_handler(worker, newsocket);
log_debug("Handler %d done with res code %d", getpid(), res);
close(newsocket);
return 0;
} else {
/* Parent */
close(newsocket);
}
}
}
close(worker->socket);
return 0;
}
int cworker_handler(const cworker_t* worker, int socket) {
int err = 0;
rcache_t cache;
jlexer_t lexer;
jparser_t parser;
rcache_init(&cache, socket);
jlexer_init(&lexer, &cache);
jparser_init(&parser, &lexer);
if (jparser_parse(&parser) < 0) {
log_error("Cannot parse json");
err = -1;
goto exit;
}
int64_t id = 0;
int64_t timeout = 0;
char* name = "none";
if (jparser_bind(&parser, JVALTYPE_INTEG, "id", (void *)&id) < 0) {
log_error("Cannot bind id");
}
if (jparser_bind(&parser, JVALTYPE_INTEG, "timeout", (void *)&timeout) < 0) {
log_error("Cannot bind timeout");
}
if (jparser_bind(&parser, JVALTYPE_STR, "name", (void *)&name) < 0) {
log_error("Cannot bind name");
}
char* msg = NULL;
asprintf(&msg, "hello, %s!", name);
jblock_t jb;
jblock_init(&jb);
jblock_addstr(&jb, "message", msg);
jblock_addbool(&jb, "error", false);
jblock_addint(&jb, "id", id);
jblock_addint(&jb, "timeout", timeout);
for (int64_t i = 0; i < timeout; i++) {
log_debug("The handler slept for %ld second from %ld", i + 1, timeout);
sleep(1);
}
char* jsonstr = NULL;
jblock_outjson(&jb, &jsonstr);
jblock_destroy(&jb);
size_t jsize = strlen(jsonstr);
ssize_t wsize = write(socket, jsonstr, jsize);
if (wsize < 0) {
log_error("Write error: %s", strerror(errno));
}
if (wsize != jsize && wsize >= 0 ) {
log_error("Wrote only %ld from %ld", (size_t)wsize, jsize);
}
free(jsonstr);
//free(name);
free(msg);
exit:
jparser_destroy(&parser);
jlexer_destroy(&lexer);
rcache_destroy(&cache);
return err;
}
void cworker_shutdown(cworker_t* worker) {
log_warning("Shutdown service %d", getpid());
if (worker->socket != 0) {
close(worker->socket);
worker->logfd = 0;
}
if (worker->logfd != 0) {
close(worker->logfd);
worker->logfd = 0;
}
wait(NULL);
log_destroy();
}
void sighandler(int signum) {
log_warning("Handle signal %d", signum);
if (signum == SIGHUP) {
log_warning("Handle HUP signal");
} else if (signum == SIGTERM) {
log_warning("Handle TERM signal");
if (pworker != NULL) {
cworker_shutdown(pworker);
sleep(1);
exit(0);
}
}
}
static int mkdirall(const char* path, mode_t mode) {
char buffer[PATH_MAX];
size_t psize = strlen(path);
if (psize > PATH_MAX) {
return -1;
}
if (psize == 0) return 0;
if (psize == 1 && path[0] == '/') {
return 0;
}
for (int i = 1; i < psize; i++) {
if (path[i] == '/') {
strncpy(buffer, path, i);
buffer[i] = '\0';
if (mkdir(buffer, mode) < 0) {
if (errno != EEXIST) {
return -1;
}
}
}
}
return 0;
}