2007年7月 6日
C++と Pthreads でミニマルなHTTPサーバを書く
『UNIXネットワークプログラミング』を読んでいると、自分でも何かネットワーク系の小さなプログラムを書いてみたくなりました。そこで、ミニマルなHTTPサーバを C++と Pthreads で書いてみました。
同じ著者の「詳解UNIXプログラミング」もそうだったように、今回の本もほとんどすべてのページに、重要なことが書かれています(最後のほうのXTIの部分は例外かもしれませんが)。
たとえば、27章ではネットワークサーバの実装として、次の設計方針がそれぞれ検討され、実際のコード付きで解説されています。
- クライアントごとに fork
- 事前に fork - 各プロセスで accept
- 事前に fork - ファイルロックで accept を保護
- 事前に fork - Mutex ロックで accept を保護 (PTHREAD_PROCESS_SHARED)
- 事前に fork - ソケットディスクリプタパッシング
- クライアントごとにスレッド生成
- 事前にスレッド生成 - Mutex ロックで accept を保護
- 事前にスレッド生成 - メインスレッドで accept
今回、実装した HTTPサーバは最後の「事前にスレッド生成 - メインスレッドで accept」の方針をとりました。メインのスレッドがクライアントからの接続を受け付けてキューに入れて、スレッドプールのワーカースレッドに実際の処理をやらせる、という方式です。
以下がそのコードです。書籍を参考にしながら、ネットワーク部分のコードを書いてみるのが目的だったので、HTTPヘッダの解析などの処理は省きました。動作としては、クライアントから受け取った HTTPリクエストをそのままエコー(鸚鵡返し)するだけのサーバです。
単体では何の実用性もありませんが、欠けている部分の処理を加えていけば簡単な埋め込みHTTPサーバくらいにはなるかもしれません。
追記
y-hamigakiさんから、大きな問題を2点、ご指摘いただきました。SIGPIPE を無視するコードと、accept()がエラーを返してもリトライするコードを追加しておきました。ご指摘、ありがとうございました。
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <unistd.h>
#include <deque>
#include <queue>
#include <sstream>
#include <string>
#include <vector>
using namespace std;
// The MiniHTTPServer is yet another minimal HTTP server.
// It just echo backs the HTTP reques header. Uses threads
// for handling concurrent requests.
class MiniHTTPServer {
public:
MiniHTTPServer(int port, int num_threads);
// Starts the server in another thread. The function
// immediately returns upon success. On error, the
// program aborts.
void Start();
int port() const { return port_; }
int num_threads() const { return num_threads_; }
// Waits for the server to finish. Will not return.
void Wait();
private:
int port_;
int num_threads_;
pthread_t listner_thread_;
};
// Logging macro, unlocked. Thread ID, file name, line
// number, will be prefixed. Example:
// 32771 httpserver.cc:173: Thread started
#define LOG_UNLOCKED(...) {\
const int thread_id = static_cast<int>(pthread_self()); \
fprintf(stderr, "%08x %s:%d: ", thread_id, __FILE__, __LINE__); \
fprintf(stderr, __VA_ARGS__); \
fprintf(stderr, "\n"); \
}
// Logging macro, guarded by mutex.
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
#define LOG(...) { \
PTH_ASSERT(pthread_mutex_lock(&log_mutex)); \
LOG_UNLOCKED(__VA_ARGS__); \
PTH_ASSERT(pthread_mutex_unlock(&log_mutex)); \
}
// Sudden death error handling for system calls.
#define SYS_ASSERT(expression) { \
if ((expression) < 0) { \
LOG("%s: %s", #expression, strerror(errno)); \
abort(); \
} \
}
// Sudden death error handling for pthread routines.
#define PTH_ASSERT(expression) { \
const int status = (expression); \
if (status != 0) { \
LOG_UNLOCKED("%s: %s",#expression, strerror(status)); \
abort(); \
} \
} \
// Writes "str" to "fd". Returns true on success.
static bool WriteString(const int fd, const string& str) {
size_t num_bytes_written = 0;
while (num_bytes_written < str.size()) {
ssize_t len;
do {
// TODO: timeout is not handled here.
len = write(fd, str.data() + num_bytes_written,
str.size() - num_bytes_written);
} while (len < 0 && errno == EINTR);
if (len < 0) { // Ignore errors.
return false;
}
num_bytes_written += len;
}
return true;
}
// Waits until "fd" is readable, with timeout. Returns true
// if "fd" becomes readable within the timeout. On error or
// timeout, returns false.
static bool WaitUntilFileDescriptorIsReadable(const int fd) {
struct timeval timeout = { 5, 0 }; // Timeout in 5 secs.
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(fd, &read_fds);
const int num_ready_fds = select(fd + 1, &read_fds, NULL, NULL, &timeout);
if (num_ready_fds < 0) {
LOG("%s", strerror(errno));
return false;
} else if (num_ready_fds == 0) {
LOG("Connection timed out");
return false;
} else if (!FD_ISSET(fd, &read_fds)) {
LOG("Should not reach");
abort();
}
return true;
}
// Reads HTTP request header from "fd" and stores it in "out".
// Returns true on success.
// TODO: Should limit number of bytes to read, to be safe.
static bool ReadRequest(const int fd, string *out) {
out->clear();
bool status = false;
char buf[1024];
while (true) {
ssize_t len;
do {
if (!WaitUntilFileDescriptorIsReadable(fd)) {
return false;
}
len = read(fd, buf, sizeof(buf));
} while (len < 0 && errno == EINTR);
if (len < 0 || // Error other than EINTR.
len == 0) { // Reached EOF.
status = false;
break;
}
out->append(buf, len);
// Check if the request header ends properly.
// Assume the request is GET. TODO: Others are not supported.
if (out->size() >= 4 &&
out->substr(out->size() - 4) == "\r\n\r\n") {
status = true;
break;
}
}
if (status == false) {
LOG("Malformed HTTP request");
}
return status;
}
// The max number of queued connections, used for the 2nd
// parameter to listen(). On linux, the number can be obtained by:
// % cat /proc/sys/net/ipv4/tcp_max_syn_backlog
const int LISTEN_QUEUE_SIZE = 1024;
// Implementation of the HTTP server.
class MiniHTTPServerImpl {
public:
typedef queue<int, deque<int> > Queue;
MiniHTTPServerImpl(const int port, const int num_threads) :
port_(port),
num_threads_(num_threads) {
PTH_ASSERT(pthread_mutex_init(&queue_mutex_, NULL));
PTH_ASSERT(pthread_cond_init(&queue_cond_, NULL));
}
void Start() {
int listen_fd;
SYS_ASSERT(listen_fd = socket(PF_INET, SOCK_STREAM, 0));
// Prepare listen port and the file descriptor for it.
struct sockaddr_in server_addr = {}; // Zero-clear.
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(port_);
int so_reuseaddr = 1;
SYS_ASSERT(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr,
sizeof(so_reuseaddr)));
SYS_ASSERT(bind(listen_fd, (sockaddr*) &server_addr, sizeof(server_addr)));
SYS_ASSERT(listen(listen_fd, LISTEN_QUEUE_SIZE));
// Create worker threads.
threads_ = new pthread_t[num_threads_]; // Will not be deleted.
for (int i = 0; i < num_threads_; ++i) {
PTH_ASSERT(pthread_create(&threads_[i], NULL, &WorkerThread, this));
}
// The accept loop.
while (true) {
int conn_fd;
while (true) {
conn_fd = accept(listen_fd, NULL, NULL);
if (conn_fd < 0) {
LOG("accept() error: %s", strerror(errno));
// Retrying...
} else {
break;
}
}
PTH_ASSERT(pthread_mutex_lock(&queue_mutex_));
// TODO: the queue size is unlimited (not safe).
queue_.push(conn_fd);
// Notify that the file descriptor is added to the queue.
PTH_ASSERT(pthread_cond_signal(&queue_cond_));
PTH_ASSERT(pthread_mutex_unlock(&queue_mutex_));
}
// Not reached.
}
pthread_mutex_t *queue_mutex() { return &queue_mutex_; }
pthread_cond_t *queue_cond() { return &queue_cond_; }
Queue *queue() { return &queue_; }
private:
int port_;
int num_threads_;
pthread_t *threads_;
Queue queue_;
pthread_mutex_t queue_mutex_;
pthread_cond_t queue_cond_;
static void *WorkerThread(void *ptr) {
LOG("Thread started");
MiniHTTPServerImpl *server = reinterpret_cast<MiniHTTPServerImpl*>(ptr);
while (true) {
PTH_ASSERT(pthread_mutex_lock(server->queue_mutex()));
// Wait until a file descriptor gets added.
while (server->queue()->empty()) {
PTH_ASSERT(pthread_cond_wait(server->queue_cond(),
server->queue_mutex()));
}
LOG("Connection arrived");
// Get the incoming file descriptor.
const int conn_fd = server->queue()->front();
server->queue()->pop();
PTH_ASSERT(pthread_mutex_unlock(server->queue_mutex()));
// The following part will be executed concurrently.
string request;
if (ReadRequest(conn_fd, &request)) {
ostringstream header;
header << "HTTP/1.1 200 OK\r\n";
header << "Content-Type: text/plain\r\n";
header << "Content-Length: " << request.size() << "\r\n";
header << "Connection: close\r\n";
header << "\r\n";
// Just echo back the request header.
const string response = header.str() + request;
if (WriteString(conn_fd, response)) {
LOG("Response sent successfully");
} else {
LOG("Error while sending data");
}
}
SYS_ASSERT(close(conn_fd));
}
return NULL;
}
};
static void *AcceptorThread(void *ptr) {
MiniHTTPServer *server = reinterpret_cast<MiniHTTPServer*>(ptr);
MiniHTTPServerImpl impl(server->port(), server->num_threads());
impl.Start(); // Blocks forever.
// Not reached.
return NULL;
}
MiniHTTPServer::MiniHTTPServer(const int port, const int num_threads) :
port_(port),
num_threads_(num_threads) {
}
void MiniHTTPServer::Start() {
// Start the server in another thread.
PTH_ASSERT(pthread_create(&listner_thread_, NULL, &AcceptorThread, this));
}
void MiniHTTPServer::Wait() {
PTH_ASSERT(pthread_join(listner_thread_, NULL));
// Not reached.
}
int main () {
// Set signal handler to ignore SIGPIPE.
struct sigaction sa = {}; // Zero-clear.
sa.sa_handler = SIG_IGN;
sigemptyset(&sa.sa_mask);
SYS_ASSERT(sigaction(SIGPIPE, &sa, NULL));
// Use port 8080 and 10 threads.
MiniHTTPServer http_server(8080, 10);
http_server.Start();
http_server.Wait();
return 0;
}
短く書くつもりが割と長くなってしまいました。。
UNIXネットワークプログラミング〈Vol.1〉ネットワークAPI:ソケットとXTI
posted with amazlet on 07.07.06
W.リチャード スティーヴンス W.Richard Stevens 篠田 陽一
ピアソンエデュケーション (2000/04)
売り上げランキング: 133969
ピアソンエデュケーション (2000/04)
売り上げランキング: 133969
