2007年7月 6日

C++と Pthreads でミニマルなHTTPサーバを書く

UNIXネットワークプログラミング』を読んでいると、自分でも何かネットワーク系の小さなプログラムを書いてみたくなりました。そこで、ミニマルなHTTPサーバを C++と Pthreads で書いてみました。

 

同じ著者の「詳解UNIXプログラミング」もそうだったように、今回の本もほとんどすべてのページに、重要なことが書かれています(最後のほうのXTIの部分は例外かもしれませんが)。

たとえば、27章ではネットワークサーバの実装として、次の設計方針がそれぞれ検討され、実際のコード付きで解説されています。

  1. クライアントごとに fork
  2. 事前に fork - 各プロセスで accept
  3. 事前に fork - ファイルロックで accept を保護
  4. 事前に fork - Mutex ロックで accept を保護 (PTHREAD_PROCESS_SHARED)
  5. 事前に fork - ソケットディスクリプタパッシング
  6. クライアントごとにスレッド生成
  7. 事前にスレッド生成 - Mutex ロックで accept を保護
  8. 事前にスレッド生成 - メインスレッドで accept

今回、実装した HTTPサーバは最後の「事前にスレッド生成 - メインスレッドで accept」の方針をとりました。メインのスレッドがクライアントからの接続を受け付けてキューに入れて、スレッドプールのワーカースレッドに実際の処理をやらせる、という方式です。

以下がそのコードです。書籍を参考にしながら、ネットワーク部分のコードを書いてみるのが目的だったので、HTTPヘッダの解析などの処理は省きました。動作としては、クライアントから受け取った HTTPリクエストをそのままエコー(鸚鵡返し)するだけのサーバです。

単体では何の実用性もありませんが、欠けている部分の処理を加えていけば簡単な埋め込みHTTPサーバくらいにはなるかもしれません。

追記

y-hamigakiさんから、大きな問題を2点、ご指摘いただきました。SIGPIPE を無視するコードと、accept()がエラーを返してもリトライするコードを追加しておきました。ご指摘、ありがとうございました。

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <unistd.h>
#include <deque>
#include <queue>
#include <sstream>
#include <string>
#include <vector>

using namespace std;

// The MiniHTTPServer is yet another minimal HTTP server.
// It just echo backs the HTTP reques header.  Uses threads
// for handling concurrent requests.
class MiniHTTPServer {
public:
  MiniHTTPServer(int port, int num_threads);
  // Starts the server in another thread.  The function
  // immediately returns upon success.  On error, the
  // program aborts.
  void Start();
  int port() const { return port_; }
  int num_threads() const { return num_threads_; }

  // Waits for the server to finish.  Will not return.
  void Wait();

private:
  int port_;
  int num_threads_;
  pthread_t listner_thread_;
};

// Logging macro, unlocked.  Thread ID, file name, line
// number, will be prefixed.  Example:
// 32771 httpserver.cc:173: Thread started
#define LOG_UNLOCKED(...) {\
    const int thread_id = static_cast<int>(pthread_self());             \
    fprintf(stderr, "%08x %s:%d: ", thread_id, __FILE__, __LINE__);     \
    fprintf(stderr, __VA_ARGS__);                                       \
    fprintf(stderr, "\n");                                              \
  }

// Logging macro, guarded by mutex.
static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
#define LOG(...) {                                                      \
    PTH_ASSERT(pthread_mutex_lock(&log_mutex));                         \
    LOG_UNLOCKED(__VA_ARGS__);                                          \
    PTH_ASSERT(pthread_mutex_unlock(&log_mutex));                       \
  }

// Sudden death error handling for system calls.
#define SYS_ASSERT(expression)  {                                       \
    if ((expression) < 0) {                                             \
      LOG("%s: %s", #expression, strerror(errno));                      \
      abort();                                                          \
    }                                                                   \
  }

// Sudden death error handling for pthread routines.
#define PTH_ASSERT(expression)  {                                       \
    const int status = (expression);                                    \
    if (status != 0) {                                                  \
      LOG_UNLOCKED("%s: %s",#expression, strerror(status));             \
      abort();                                                          \
    }                                                                   \
  }                                                                     \


// Writes "str" to "fd".  Returns true on success.
static bool WriteString(const int fd, const string& str) {
  size_t num_bytes_written = 0;
  while (num_bytes_written < str.size()) {
    ssize_t len;
    do {
      // TODO: timeout is not handled here.
      len = write(fd, str.data() + num_bytes_written,
                  str.size() - num_bytes_written);
    } while (len < 0 && errno == EINTR);
    if (len < 0) {  // Ignore errors.
      return false;
    }
    num_bytes_written += len;
  }
  return true;
}

// Waits until "fd" is readable, with timeout.  Returns true
// if "fd" becomes readable within the timeout.  On error or
// timeout, returns false.
static bool WaitUntilFileDescriptorIsReadable(const int fd) {
  struct timeval timeout = { 5, 0 };  // Timeout in 5 secs.
  fd_set read_fds;
  FD_ZERO(&read_fds);
  FD_SET(fd, &read_fds);
  const int num_ready_fds = select(fd + 1, &read_fds, NULL, NULL, &timeout);
  if (num_ready_fds < 0) {
    LOG("%s", strerror(errno));
    return false;
  } else if (num_ready_fds == 0) {
    LOG("Connection timed out");
    return false;
  } else if (!FD_ISSET(fd, &read_fds)) {
    LOG("Should not reach");
    abort();
  }
  return true;
}

// Reads HTTP request header from "fd" and stores it in "out".
// Returns true on success.
// TODO: Should limit number of bytes to read, to be safe.
static bool ReadRequest(const int fd, string *out) {
  out->clear();
  bool status = false;
  char buf[1024];
  while (true) {
    ssize_t len;

    do {
      if (!WaitUntilFileDescriptorIsReadable(fd)) {
        return false;
      }
      len = read(fd, buf, sizeof(buf));
    } while (len < 0 && errno == EINTR);
    if (len < 0 ||  // Error other than EINTR.
        len == 0) {  // Reached EOF.
      status = false;
      break;
    }
    out->append(buf, len);
    // Check if the request header ends properly.
    // Assume the request is GET.  TODO: Others are not supported.
    if (out->size() >= 4 &&
        out->substr(out->size() - 4) == "\r\n\r\n") {
      status = true;
      break;
    }
  }
  if (status == false) {
    LOG("Malformed HTTP request");
  }
  return status;
}

// The max number of queued connections, used for the 2nd
// parameter to listen().  On linux, the number can be obtained by:
// % cat /proc/sys/net/ipv4/tcp_max_syn_backlog
const int LISTEN_QUEUE_SIZE = 1024;

// Implementation of the HTTP server.
class MiniHTTPServerImpl {
public:
  typedef queue<int, deque<int> > Queue;
  MiniHTTPServerImpl(const int port, const int num_threads) :
    port_(port),
    num_threads_(num_threads) {
    PTH_ASSERT(pthread_mutex_init(&queue_mutex_, NULL));
    PTH_ASSERT(pthread_cond_init(&queue_cond_, NULL));
  }

  void Start() {
    int listen_fd;
    SYS_ASSERT(listen_fd = socket(PF_INET, SOCK_STREAM, 0));

    // Prepare listen port and the file descriptor for it.
    struct sockaddr_in server_addr = {};  // Zero-clear.
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(port_);
    int so_reuseaddr = 1;
    SYS_ASSERT(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &so_reuseaddr,
                          sizeof(so_reuseaddr)));
    SYS_ASSERT(bind(listen_fd, (sockaddr*) &server_addr, sizeof(server_addr)));
    SYS_ASSERT(listen(listen_fd, LISTEN_QUEUE_SIZE));

    // Create worker threads.
    threads_ = new pthread_t[num_threads_];  // Will not be deleted.
    for (int i = 0; i < num_threads_; ++i) {
      PTH_ASSERT(pthread_create(&threads_[i], NULL, &WorkerThread, this));
    }

    // The accept loop.
    while (true) {
      int conn_fd;
      while (true) {
        conn_fd = accept(listen_fd, NULL, NULL);
        if (conn_fd < 0) {
          LOG("accept() error: %s", strerror(errno));
          // Retrying...
        } else {
          break;
        }
      }
      PTH_ASSERT(pthread_mutex_lock(&queue_mutex_));
      // TODO: the queue size is unlimited (not safe).
      queue_.push(conn_fd);
      // Notify that the file descriptor is added to the queue.
      PTH_ASSERT(pthread_cond_signal(&queue_cond_));
      PTH_ASSERT(pthread_mutex_unlock(&queue_mutex_));
    }
    // Not reached.
  }

  pthread_mutex_t *queue_mutex() { return &queue_mutex_; }
  pthread_cond_t *queue_cond() { return &queue_cond_; }
  Queue *queue() { return &queue_; }

private:
  int port_;
  int num_threads_;
  pthread_t *threads_;
  Queue queue_;
  pthread_mutex_t queue_mutex_;
  pthread_cond_t queue_cond_;

  static void *WorkerThread(void *ptr) {
    LOG("Thread started");
    MiniHTTPServerImpl *server = reinterpret_cast<MiniHTTPServerImpl*>(ptr);
    while (true) {
      PTH_ASSERT(pthread_mutex_lock(server->queue_mutex()));
      // Wait until a file descriptor gets added.
      while (server->queue()->empty()) {
        PTH_ASSERT(pthread_cond_wait(server->queue_cond(),
                                     server->queue_mutex()));
      }
      LOG("Connection arrived");
      // Get the incoming file descriptor.
      const int conn_fd = server->queue()->front();
      server->queue()->pop();
      PTH_ASSERT(pthread_mutex_unlock(server->queue_mutex()));

      // The following part will be executed concurrently.
      string request;
      if (ReadRequest(conn_fd, &request)) {
        ostringstream header;
        header << "HTTP/1.1 200 OK\r\n";
        header << "Content-Type: text/plain\r\n";
        header << "Content-Length: " << request.size() << "\r\n";
        header << "Connection: close\r\n";
        header << "\r\n";

        // Just echo back the request header.
        const string response = header.str() + request;
        if (WriteString(conn_fd, response)) {
          LOG("Response sent successfully");
        } else {
          LOG("Error while sending data");
        }
      }
      SYS_ASSERT(close(conn_fd));
    }
    return NULL;
  }
};

static void *AcceptorThread(void *ptr) {
  MiniHTTPServer *server = reinterpret_cast<MiniHTTPServer*>(ptr);
  MiniHTTPServerImpl impl(server->port(), server->num_threads());
  impl.Start();  // Blocks forever.
  // Not reached.
  return NULL;
}

MiniHTTPServer::MiniHTTPServer(const int port, const int num_threads) :
  port_(port),
  num_threads_(num_threads) {
}

void MiniHTTPServer::Start() {
  // Start the server in another thread.
  PTH_ASSERT(pthread_create(&listner_thread_, NULL, &AcceptorThread, this));
}

void MiniHTTPServer::Wait() {
  PTH_ASSERT(pthread_join(listner_thread_, NULL));
  // Not reached.
}

int main () {
  // Set signal handler to ignore SIGPIPE.
  struct sigaction sa = {};  // Zero-clear.
  sa.sa_handler = SIG_IGN;
  sigemptyset(&sa.sa_mask);
  SYS_ASSERT(sigaction(SIGPIPE, &sa, NULL));

  // Use port 8080 and 10 threads.
  MiniHTTPServer http_server(8080, 10);
  http_server.Start();
  http_server.Wait();
  return 0;
}

短く書くつもりが割と長くなってしまいました。。

UNIXネットワークプログラミング〈Vol.1〉ネットワークAPI:ソケットとXTI
W.リチャード スティーヴンス W.Richard Stevens 篠田 陽一
ピアソンエデュケーション (2000/04)
売り上げランキング: 133969