2007年7月 6日
C++と Pthreads でミニマルなHTTPサーバを書く
『UNIXネットワークプログラミング』を読んでいると、自分でも何かネットワーク系の小さなプログラムを書いてみたくなりました。そこで、ミニマルなHTTPサーバを C++と Pthreads で書いてみました。
同じ著者の「詳解UNIXプログラミング」もそうだったように、今回の本もほとんどすべてのページに、重要なことが書かれています(最後のほうのXTIの部分は例外かもしれませんが)。
たとえば、27章ではネットワークサーバの実装として、次の設計方針がそれぞれ検討され、実際のコード付きで解説されています。
- クライアントごとに fork
- 事前に fork - 各プロセスで accept
- 事前に fork - ファイルロックで accept を保護
- 事前に fork - Mutex ロックで accept を保護 (PTHREAD_PROCESS_SHARED)
- 事前に fork - ソケットディスクリプタパッシング
- クライアントごとにスレッド生成
- 事前にスレッド生成 - Mutex ロックで accept を保護
- 事前にスレッド生成 - メインスレッドで accept
今回、実装した HTTPサーバは最後の「事前にスレッド生成 - メインスレッドで accept」の方針をとりました。メインのスレッドがクライアントからの接続を受け付けてキューに入れて、スレッドプールのワーカースレッドに実際の処理をやらせる、という方式です。
以下がそのコードです。書籍を参考にしながら、ネットワーク部分のコードを書いてみるのが目的だったので、HTTPヘッダの解析などの処理は省きました。動作としては、クライアントから受け取った HTTPリクエストをそのままエコー(鸚鵡返し)するだけのサーバです。
単体では何の実用性もありませんが、欠けている部分の処理を加えていけば簡単な埋め込みHTTPサーバくらいにはなるかもしれません。
追記
y-hamigakiさんから、大きな問題を2点、ご指摘いただきました。SIGPIPE を無視するコードと、accept()がエラーを返してもリトライするコードを追加しておきました。ご指摘、ありがとうございました。
#include <arpa/inet.h> #include <assert.h> #include <errno.h> #include <pthread.h> #include <signal.h> #include <stdarg.h> #include <sys/socket.h> #include <sys/select.h> #include <unistd.h> #include <deque> #include <queue> #include <sstream> #include <string> #include <vector> using namespace std; // The MiniHTTPServer is yet another minimal HTTP server. // It just echo backs the HTTP reques header. Uses threads // for handling concurrent requests. class MiniHTTPServer { public: MiniHTTPServer(int port, int num_threads); // Starts the server in another thread. The function // immediately returns upon success. On error, the // program aborts. void Start(); int port() const { return port_; } int num_threads() const { return num_threads_; } // Waits for the server to finish. Will not return. void Wait(); private: int port_; int num_threads_; pthread_t listner_thread_; }; // Logging macro, unlocked. Thread ID, file name, line // number, will be prefixed. Example: // 32771 httpserver.cc:173: Thread started #define LOG_UNLOCKED(...) {\ const int thread_id = static_cast<int>(pthread_self()); \ fprintf(stderr, "%08x %s:%d: ", thread_id, __FILE__, __LINE__); \ fprintf(stderr, __VA_ARGS__); \ fprintf(stderr, "\n"); \ } // Logging macro, guarded by mutex. static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER; #define LOG(...) { \ PTH_ASSERT(pthread_mutex_lock(&log_mutex)); \ LOG_UNLOCKED(__VA_ARGS__); \ PTH_ASSERT(pthread_mutex_unlock(&log_mutex)); \ } // Sudden death error handling for system calls. #define SYS_ASSERT(expression) { \ if ((expression) < 0) { \ LOG("%s: %s", #expression, strerror(errno)); \ abort(); \ } \ } // Sudden death error handling for pthread routines. #define PTH_ASSERT(expression) { \ const int status = (expression); \ if (status != 0) { \ LOG_UNLOCKED("%s: %s",#expression, strerror(status)); \ abort(); \ } \ } \ // Writes "str" to "fd". Returns true on success. static bool WriteString(const int fd, const string& str) { size_t num_bytes_written = 0; while (num_bytes_written < str.size()) { ssize_t len; do { // TODO: timeout is not handled here. len = write(fd, str.data() + num_bytes_written, str.size() - num_bytes_written); } while (len < 0 && errno == EINTR); if (len < 0) { // Ignore errors. return false; } num_bytes_written += len; } return true; } // Waits until "fd" is readable, with timeout. Returns true // if "fd" becomes readable within the timeout. On error or // timeout, returns false. static bool WaitUntilFileDescriptorIsReadable(const int fd) { struct timeval timeout = { 5, 0 }; // Timeout in 5 secs. fd_set read_fds; FD_ZERO(&read_fds); FD_SET(fd, &read_fds); const int num_ready_fds = select(fd + 1, &read_fds, NULL, NULL, &timeout); if (num_ready_fds < 0) { LOG("%s", strerror(errno)); return false; } else if (num_ready_fds == 0) { LOG("Connection timed out"); return false; } else if (!FD_ISSET(fd, &read_fds)) { LOG("Should not reach"); abort(); } return true; } // Reads HTTP request header from "fd" and stores it in "out". // Returns true on success. // TODO: Should limit number of bytes to read, to be safe. static bool ReadRequest(const int fd, string *out) { out->clear(); bool status = false; char buf[1024]; while (true) { ssize_t len; do { if (!WaitUntilFileDescriptorIsReadable(fd)) { return false; } len = read(fd, buf, sizeof(buf)); } while (len < 0 && errno == EINTR); if (len < 0 || // Error other than EINTR. len == 0) { // Reached EOF. status = false; break; } out->append(buf, len); // Check if the request header ends properly. // Assume the request is GET. TODO: Others are not supported. if (out->size() >= 4 && out->substr(out->size() - 4) == "\r\n\r\n") { status = true; break; } } if (status == false) { LOG("Malformed HTTP request"); } return status; } // The max number of queued connections, used for the 2nd // parameter to listen(). On linux, the number can be obtained by: // % cat /proc/sys/net/ipv4/tcp_max_syn_backlog const int LISTEN_QUEUE_SIZE = 1024; // Implementation of the HTTP server. class MiniHTTPServerImpl { public: typedef queue<int, deque<int> > Queue; MiniHTTPServerImpl(const int port, const int num_threads) : port_(port), num_threads_(num_threads) { PTH_ASSERT(pthread_mutex_init(&queue_mutex_, NULL)); PTH_ASSERT(pthread_cond_init(&queue_cond_, NULL)); } void Start() { int listen_fd; SYS_ASSERT(listen_fd = socket(PF_INET, SOCK_STREAM, 0)); // Prepare listen port and the file descriptor for it. struct sockaddr_in server_addr = {}; // Zero-clear. server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_ANY); server_addr.sin_port = htons(port_); int so_reuseaddr = 1; SYS_ASSERT(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr, sizeof(so_reuseaddr))); SYS_ASSERT(bind(listen_fd, (sockaddr*) &server_addr, sizeof(server_addr))); SYS_ASSERT(listen(listen_fd, LISTEN_QUEUE_SIZE)); // Create worker threads. threads_ = new pthread_t[num_threads_]; // Will not be deleted. for (int i = 0; i < num_threads_; ++i) { PTH_ASSERT(pthread_create(&threads_[i], NULL, &WorkerThread, this)); } // The accept loop. while (true) { int conn_fd; while (true) { conn_fd = accept(listen_fd, NULL, NULL); if (conn_fd < 0) { LOG("accept() error: %s", strerror(errno)); // Retrying... } else { break; } } PTH_ASSERT(pthread_mutex_lock(&queue_mutex_)); // TODO: the queue size is unlimited (not safe). queue_.push(conn_fd); // Notify that the file descriptor is added to the queue. PTH_ASSERT(pthread_cond_signal(&queue_cond_)); PTH_ASSERT(pthread_mutex_unlock(&queue_mutex_)); } // Not reached. } pthread_mutex_t *queue_mutex() { return &queue_mutex_; } pthread_cond_t *queue_cond() { return &queue_cond_; } Queue *queue() { return &queue_; } private: int port_; int num_threads_; pthread_t *threads_; Queue queue_; pthread_mutex_t queue_mutex_; pthread_cond_t queue_cond_; static void *WorkerThread(void *ptr) { LOG("Thread started"); MiniHTTPServerImpl *server = reinterpret_cast<MiniHTTPServerImpl*>(ptr); while (true) { PTH_ASSERT(pthread_mutex_lock(server->queue_mutex())); // Wait until a file descriptor gets added. while (server->queue()->empty()) { PTH_ASSERT(pthread_cond_wait(server->queue_cond(), server->queue_mutex())); } LOG("Connection arrived"); // Get the incoming file descriptor. const int conn_fd = server->queue()->front(); server->queue()->pop(); PTH_ASSERT(pthread_mutex_unlock(server->queue_mutex())); // The following part will be executed concurrently. string request; if (ReadRequest(conn_fd, &request)) { ostringstream header; header << "HTTP/1.1 200 OK\r\n"; header << "Content-Type: text/plain\r\n"; header << "Content-Length: " << request.size() << "\r\n"; header << "Connection: close\r\n"; header << "\r\n"; // Just echo back the request header. const string response = header.str() + request; if (WriteString(conn_fd, response)) { LOG("Response sent successfully"); } else { LOG("Error while sending data"); } } SYS_ASSERT(close(conn_fd)); } return NULL; } }; static void *AcceptorThread(void *ptr) { MiniHTTPServer *server = reinterpret_cast<MiniHTTPServer*>(ptr); MiniHTTPServerImpl impl(server->port(), server->num_threads()); impl.Start(); // Blocks forever. // Not reached. return NULL; } MiniHTTPServer::MiniHTTPServer(const int port, const int num_threads) : port_(port), num_threads_(num_threads) { } void MiniHTTPServer::Start() { // Start the server in another thread. PTH_ASSERT(pthread_create(&listner_thread_, NULL, &AcceptorThread, this)); } void MiniHTTPServer::Wait() { PTH_ASSERT(pthread_join(listner_thread_, NULL)); // Not reached. } int main () { // Set signal handler to ignore SIGPIPE. struct sigaction sa = {}; // Zero-clear. sa.sa_handler = SIG_IGN; sigemptyset(&sa.sa_mask); SYS_ASSERT(sigaction(SIGPIPE, &sa, NULL)); // Use port 8080 and 10 threads. MiniHTTPServer http_server(8080, 10); http_server.Start(); http_server.Wait(); return 0; }
短く書くつもりが割と長くなってしまいました。。
UNIXネットワークプログラミング〈Vol.1〉ネットワークAPI:ソケットとXTI
posted with amazlet on 07.07.06
W.リチャード スティーヴンス W.Richard Stevens 篠田 陽一
ピアソンエデュケーション (2000/04)
売り上げランキング: 133969
ピアソンエデュケーション (2000/04)
売り上げランキング: 133969