LibeventのRPCフレームワークによるC/Sプログラミング

前回の投稿ではlibeventのHTTP処理機能evhttpを使ったHTTPサーバを紹介したが今回はlibeventのRPCフレームワーク evrpcを使ったC/Sプログラミング方法を紹介してみる。このevrpcはかなりマイナーな部類のフレームワークといえる。これに関するドキュメントは少なく、検索してみるとまともなページがヒットしない。ヘッダやテストコードを読まない人にとってはちょっと厳しいかもしれない。 巷にはハイパフォーマンスで使いやすく少し検索すれば豊富なドキュメントやブログ記事がわさわさと出てくるようなフレームワークがごろごろしているのでこんなに地味で愛想ないフレームワークをふつーは選ばない。 そんな人気のないevrpcだけど実際に使ってみると思ったより悪くない。 動作は安定しているしAPIはシンプルで使いやすく拡張性も考慮されていてHook関数、Callback関数を駆使すれば好きに振る舞いを制御することができる。 少々粗挽きなところは全く気にしないでこれをベースに自分なりに使いやすいフレームワークを作ってやろうくらいに思ってる人には向いているかもしれない。

RPC marshalling用コードのジェネレート – event_rpcgen.py

marshallingとはRPC C/S処理で内部的に使用するデータ構造をネットワーク転送用のフォーマットに変換することで、その逆をdemarshallingと呼ぶ。厳密には意味が違うらしいがserializationとdeserializationみたいなもの。 libeventにはデータ構造をmarshalling、demarshallingするコードをジェネレートするためのスクリプトが用意されている。それがevent_rpcgen.py。RPC C/S処理で使用するデータ構造を定められたフォーマットで定義してやりそれをevent_rpcgen.pyに食わせてやることでコードが生成される。 データ構造の定義フォーマットは次のとおり。

[データ構造の定義フォーマット]

struct <data struct name> {
    [optional] <type> <varname> = <index>;
     ...
}

<type> データ型
  - string  文字列
  - bytes  uint_tベクター。 [lenght]で長さを指定。 例、bytes hoge[10]
  - int  uint32_t
  - struct[structname]  構造体
  - array <type>  typeで指定されたデータ型のベクター。 typeはstring, bytes, int, または struct[structname]

<varname> 変数名
<index> インデックス番号。フィールド番号。

[optional]  typeの前にoptionalを指定するとリクエスト、レスポンス処理時にそのフィールドへのデータセットを省略することができる。

※  定義ファイルの拡張子は.rpcである必要がある
※  <varname> と<index>の間は1半角スペースずつを開ける。 そうでない場合はevent_rpcgen.pyによるジェネレートが失敗するので要注意!
   ×    int errcode =1;
   ×    int errcode   = 1;
   ×    int errcode=1;
   ○    int errcode = 1

今回のサンプルで使用するデータ構造を定義する。 サンプルはユーザ情報を取得するRPCでありユーザ情報取得のためのリクエスト、レスポンス用データ構造が以下のGetUserRequest、GetUserResponse。

サンプルデータ構造の定義フォーマットファイル – simple.rpc

struct GetUserRequest {
    string id = 1;
}

struct GetUserResponse {
    int errcode = 1;
    optional string name = 2;
    optional string email = 3;
}

これを次のようにevent_rpcgen.pyを使ってデータ構造定義からコードをジェネレートする。 event_rpcgen.pyはlibeventをインストールするとデフォルトで/usr/local/bin/配下にコピーされる。

$ /usr/local/bin/event_rpcgen.py simple.rpc

Reading "./simple.rpc"
  Created struct: GetUserRequest
    Added entry: id
  Created struct: GetUserResponse
    Added entry: errcode
    Added entry: name
    Added entry: email
... creating "./simple.gen.h"
... creating "./simple.gen.c"

simple.gen.hsimple.gen.cの2つのファイルがジェネレートされる。 C/S両方でこのデータ構造を使用する。

RPCサーバの実装 – simple_rpc_server.cpp

http://github.com/yokawasa/any/blob/master/libevent_rpc/simple_rpc_server.cpp

まずは利用するRPCメソッドの登録を行う。 EVRPC_HEADERでRPCコマンドのデータ構造とプロトタイプを作成してEVRPC_GENERATEでRPCメッセージを送受信するためのコードを作成する。

#ifdef __cplusplus
extern "C" {
#endif
#include "simple.gen.h"
#ifdef __cplusplus
}
#endif
#include "evrpc.h"
   
/* registe RPC : GetUser */
EVRPC_HEADER(GetUser, GetUserRequest, GetUserResponse);
/* generate RPC code : GetUser */
EVRPC_GENERATE(GetUser, GetUserRequest, GetUserResponse);

RPCサーバの実装について説明する。実装は既に存在するHTTPサーバの実装をベースとする。RPCサーバの基本コードは以下の通り。

int main(int argc, char **argv) {
    if (argc < 3) {
        fprintf(stdout, "%s <server> <port>\n", argv[0]);
        return 1;
    }
    char *svr_addr = argv[1];
    short svr_port = atoi(argv[2]);

    struct evhttp *ev_http = NULL;
    struct evrpc_base *rpc_base = NULL;

    /* initialize base event mechanism */
    event_init();
    ev_http = evhttp_start(svr_addr, svr_port);
    /* create a base for the RPC protocol */
    rpc_base = evrpc_init(ev_http);

    /* register all of the handlers for all RPCs */
    EVRPC_REGISTER(rpc_base, GetUser, GetUserRequest, GetUserResponse, GetUserCallback, NULL);

    /* loop and dispatch events */
    event_dispatch();

    EVRPC_UNREGISTER(rpc_base, GetUser);
    evrpc_free(rpc_base);
    evhttp_free(ev_http);
    return 0;
}
  • event_init()でlibeventライブラリ初期化。とりあえずlibeventを使う際は最初に一度実行しておく。
  • evhttp_start(svr_addr, svr_port)にサーバアドレス、ポートを指定してHTTPサーバ処理開始準備。
  • evrpc_init()でrpc処理ハンドル用ポインタを作成。
  • EVRPC_REGISTERで特定のRPCコマンドをサーバに登録する。ここではさきほどEVRPC_HEADERとEVRPC_GENERATEで登録されたRPCコマンドGetUser(リクエスト用データ構造GetUserRequest, レスポンス用データ構造GetUserResponse)をRPCサーバに登録する。また同時にGetUserCallbackというRPCリクエストを受け取った際に呼び出されるコールバック関数を登録する。GetUserCallbackは次で説明する。
  • event_dispatch()でloop開始しイベントdispatchを開始する。
  • 後処理。サーバ処理を終えるときEVRPC_UNREGISTER、evrpc_free, evhttp_freeの順で後処理をする。

GetUserCallbackはさきほどEVRPC_REGISTERでサーバに登録されたRPCコマンドのコールバック関数。 RPCクライアントから送信されるリクエストデータからidを取得。そのidに対するnameとemailデータをレスポンスデータにセットしている。

static void
GetUserCallback(EVRPC_STRUCT(GetUser)* rpc, void *arg)
{
    struct GetUserRequest* req = rpc->request;
    struct GetUserResponse* res = rpc->reply;

    /* get request info */
    char *id;
    if(!EVTAG_HAS(req, id)) {
        fprintf(stderr, "failed to get request info\n");
        EVTAG_ASSIGN(res, errcode, -1);
    }
    EVTAG_GET(req, id, &id);
    fprintf(stdout, "REQUEST id = %s\n", id);

    /* do somthing due to request info */
    /* set response info */
    EVTAG_ASSIGN(res, errcode, 0);
    EVTAG_ASSIGN(res, name, "Fooo Baaar");
    EVTAG_ASSIGN(res, email, "baz@foo.bar");

    /* send the reply to the RPC */
    EVRPC_REQUEST_DONE(rpc);
}
  • EVTAG_HASでリクエストデータ構造中のあるフィールドにデータがセットされているかをチェック(ここではid)してEVTAG_GETで実際にそのフィールドデータを取得。
  • EVTAG_ASSIGNでレスポンスデータ構造のフィールド(ここではerrcode, name, email)にデータをセット。
  • EVRPC_REQUEST_DONEでレスポンス送信。

RPCクライアントの実装 – simple_rpc_client.cpp

http://github.com/yokawasa/any/blob/master/libevent_rpc/simple_rpc_client.cpp

RPCクライアントの実装について説明する。RPCクライアントでもサーバと同じようにEVRPC_HEADERとEVRPC_GENERATEとで利用するRPCメソッドの登録を行う。以下RPCクライアントの基本コード。

int main(int argc, char **argv) {
    if (argc < 3) {
        fprintf(stdout, "%s <server> <port>\n", argv[0]);
        return 1;
    }
    char *svr_addr = argv[1];
    short svr_port = atoi(argv[2]);
    struct event_base *ev_base = NULL;
    struct evhttp *ev_http = NULL;
    struct evrpc_base *rpc_base = NULL;
    struct evrpc_pool *rpc_pool = NULL;
    struct GetUserRequest *req;
    struct GetUserResponse *res;

    /* initialize base event mechanism */
    ev_base = event_init();
    /* initialize http internal buffer */
    ev_http = evhttp_new(ev_base);
    /* create a base for the RPC protocol */
    rpc_base = evrpc_init(ev_http);

    rpc_pool = get_rpc_pool(ev_base, svr_addr, svr_port);
    if(!rpc_pool) {
        return -1;
    }
    /* set up request data */
    req = GetUserRequest_new();
    EVTAG_ASSIGN(req, id, "foobar");

    /* create a response structure */
    res = GetUserResponse_new();
    /* set request, response, callback func */
    EVRPC_MAKE_REQUEST(GetUser, rpc_pool, req, res, GetUserCallback, NULL);
    /* process request and response */
    event_dispatch();

    GetUserRequest_free(req);
    GetUserResponse_free(res);

    evrpc_pool_free(rpc_pool);
    evrpc_free(rpc_base);
    evhttp_free(ev_http);
    return 0;
}
  • event_init()でlibeventライブラリ初期化。
  • evhttp_new()でhttp処理用内部バッファーを初期化。
  • evrpc_init()でrpc処理ハンドル用ポインタを作成。
  • get_rpc_pool()よりevrpc_poolポインタを取得。get_rpc_pool()は次で説明。 evrpc_poolは内部にリクエストとして送信するための接続ポインタ(evhttp_connection)を持っている。
  • GetUserRequest_new()とGetUserResponse_new()によりRPCリクエストとレスポンスのためのリソース確保。
  • EVTAG_ASSIGNによりリクエスト用データ構造のidフィールドに値(”foobar”)を設定。
  • EVRPC_MAKE_REQUESTでサーバへのリクエスト送信に使うRPCコマンドとリクエスト、レスポンスオブジェクトポインタ、evrpc_poolポインタ、コールバック関数をセットする。コールバック関数GetUserCallbackは後で説明する。
  • event_dispatch()を呼んで実際にリクエスト送信、レスポンス受信を行う。
  • 後処理。GetUserRequest_free()、GetUserResponse_free()でそれぞれリクエストとレスポンスのために確保されたリソースを開放。 次にevrpc_pool_free、evrpc_free, evhttp_freeの順で後処理実行。

RPCリクエスト処理で使用するevrpc_poolポインタを取得するための関数。get_rpc_poolがコールされる度にサーバ接続ポインタ (evhttp_connection)を生成しevrpc_poolの接続プールに追加している。 ちなみにevrpc_poolとはいわゆるコネクションプールのこと。仕組みとしては内部に接続とリクエストを管理するQUEUEを持っていてリクエストごとに接続QUEUEから空いている接続をリクエストに割り当てる。もし割り当てる接続がなければそのリクエストをリクエストQUEUEに追加する。

static struct evrpc_pool *
 get_rpc_pool(struct event_base *base, const char *svr_addr, short svr_port)
{
    struct evhttp_connection *ev_conn;
    struct evrpc_pool *rpc_pool;
    rpc_pool = evrpc_pool_new(base);
    if (!rpc_pool) {
        fprintf(stderr, "failed to get new rpc pool\n");
        return NULL;
    }
    // create a connection to RPC server
    ev_conn = evhttp_connection_new(svr_addr, svr_port);
    if (!ev_conn) {
        fprintf(stderr, "failed to get new evhttp connection: %s:%d\n",
            svr_addr, svr_port);
        return NULL;
    }
    // add request connection in the pool
    evrpc_pool_add_connection(rpc_pool, ev_conn);
    return (rpc_pool);
}
  • evrpc_pool_new()でrpcコネクションプールを作成。
  • evhttp_connection_newでリクエスト用接続ポインタを生成。
  • evrpc_pool_add_connectionでevrpc_poolの接続プールに生成されたリクエスト用接続ポインタを追加。

以下EVRPC_MAKE_REQUESTで指定するRPCリクエストを送信してサーバよりレスポンスが返ってきたときに呼ばれるコールバック関数。RPC クライアントからのリクエストに対するサーバからのレスポンス結果を表示している。

static void
 GetUserCallback(struct evrpc_status *status,
    struct GetUserRequest *req, struct GetUserResponse *res, void *arg)
{
    uint32_t errcode;
    char *name,*email;
    /* check return status */
    if (status->error != EVRPC_STATUS_ERR_NONE) {
        goto exitloop;
    }
    /* get response info */
    EVTAG_GET(res, errcode, &errcode);
    fprintf(stdout, "RESPONSE errcode=%d\n", errcode);
    if(EVTAG_HAS(res, name) ) {
        EVTAG_GET(res, name, &name);
        fprintf(stdout, "RESPONSE name=%s\n", name);
    }
    if(EVTAG_HAS(res, email) ) {
        EVTAG_GET(res, email, &email);
        fprintf(stdout, "RESPONSE email=%s\n", email);
    }
exitloop:
    event_loopexit(NULL);
}

ダウンロード、コンパイル、動作確認

git clone でソースコードを取得してmakeを行う。 リポジトリanyのサブディレクトリ any/libevent_rpcが今回のサンプルにあたる。

$ git clone git@github.com:yokawasa/any.git            

Initialized empty Git repository in /home/m/dev/github/any/.git/
remote: Counting objects: 33, done.
remote: Compressing objects: 100% (32/32), done.
remote: Total 33 (delta 7), reused 0 (delta 0)
Receiving objects: 100% (33/33), 11.28 KiB, done.
Resolving deltas: 100% (7/7), done.

$ cd any/libevent_rpc
$ make

g++ -I/usr/include -I/usr/local/include -Wall -g -o simple_rpc_client.o -c simple_rpc_client.cpp
gcc -I/usr/include -I/usr/local/include -Wall -g -o simple.gen.o -c simple.gen.c
g++ -L/usr/local/lib -levent -o simple_rpc_client simple_rpc_client.o simple.gen.o
g++ -I/usr/include -I/usr/local/include -Wall -g -o simple_rpc_server.o -c simple_rpc_server.cpp
g++ -L/usr/local/lib -levent -o simple_rpc_server simple_rpc_server.o simple.gen.o

これで同ディレクトリにRPCサーバsimple_rpc_server とRPCクライアントsimple_rpc_client実行ファイルのできあがり。simple_rpc_server 、simple_rpc_clientともに実行時に第一引数にサーバアドレス、第二引数にIOポートを指定する。 まずサーバから立ち上げる。

$ ./simple_rpc_server localhost 8888

サーバが立ち上がったら次のようにクライアントからリクエスト送信する。

$ ./simple_rpc_client localhost 8888
RESPONSE errcode=0
RESPONSE name=Fooo Baaar
RESPONSE email=baz@foo.bar

上のような結果が出力されればOK。

おわり。

Related posts:

  1. LibeventとAPRでイベント駆動型HTTPサーバを作成してみた
  2. cURL MultiインターフェースでHTTP Pipeliningリクエストの送信

Posted in: Programming / プログラミング

Tags: , , , , ,



DeliciousFacebookRedditTwitterGoogle

addLeave a comment