Writing an echo server in libev and c++

Looking at event IO frameworks and found some really good comments about the “ev” library, the challenge is that I would rather work in C++ than pure C – I like methods.. Found that the documentation is lacking for the C++ side of the library and needed to build a test harness.

This is a very basic TCP Echo server using libev in c++. The C++ side is only using std::list and the ev side is playing with loop, signals and basic socket IO. It should be a useful bread crumb for anybody trying to build their own service, which of course is my next big activity.

Update: Fixed the small file descriptor leak and now available as a Gist – https://gist.github.com/3364414

  1#include <unistd.h>
  2#include <fcntl.h>
  3#include <string.h>
  4#include <stdlib.h>
  5#include <ev++.h>
  6#include <netinet/in.h>
  7#include <sys/socket.h>
  8#include <resolv.h>
  9#include <errno.h>
 10#include <list>
 11
 12//
 13//   Buffer class - allow for output buffering such that it can be written out
 14//                                 into async pieces
 15//
 16struct Buffer {
 17        char       *data;
 18        ssize_t len;
 19        ssize_t pos;
 20
 21        Buffer(const char *bytes, ssize_t nbytes) {
 22                pos = 0;
 23                len = nbytes;
 24                data = new char[nbytes];
 25                memcpy(data, bytes, nbytes);
 26        }
 27
 28        virtual ~Buffer() {
 29                delete [] data;
 30        }
 31
 32        char *dpos() {
 33                return data + pos;
 34        }
 35
 36        ssize_t nbytes() {
 37                return len - pos;
 38        }
 39};
 40
 41//
 42//   A single instance of a non-blocking Echo handler
 43//
 44class EchoInstance {
 45private:
 46        ev::io           io;
 47        static int total_clients;
 48        int              sfd;
 49
 50        // Buffers that are pending write
 51        std::list<Buffer*>     write_queue;
 52
 53        // Generic callback
 54        void callback(ev::io &watcher, int revents) {
 55                if (EV_ERROR & revents) {
 56                        perror("got invalid event");
 57                        return;
 58                }
 59
 60                if (revents & EV_READ)
 61                        read_cb(watcher);
 62
 63                if (revents & EV_WRITE)
 64                        write_cb(watcher);
 65
 66                if (write_queue.empty()) {
 67                        io.set(ev::READ);
 68                } else {
 69                        io.set(ev::READ|ev::WRITE);
 70                }
 71        }
 72
 73        // Socket is writable
 74        void write_cb(ev::io &watcher) {
 75                if (write_queue.empty()) {
 76                        io.set(ev::READ);
 77                        return;
 78                }
 79
 80                Buffer* buffer = write_queue.front();
 81
 82                ssize_t written = write(watcher.fd, buffer->dpos(), buffer->nbytes());
 83                if (written < 0) {
 84                        perror("read error");
 85                        return;
 86                }
 87
 88                buffer->pos += written;
 89                if (buffer->nbytes() == 0) {
 90                        write_queue.pop_front();
 91                        delete buffer;
 92                }
 93        }
 94
 95        // Receive message from client socket
 96        void read_cb(ev::io &watcher) {
 97                char       buffer[1024];
 98
 99                ssize_t   nread = recv(watcher.fd, buffer, sizeof(buffer), 0);
100
101                if (nread < 0) {
102                        perror("read error");
103                        return;
104                }
105
106                if (nread == 0) {
107                        // Gack - we're deleting ourself inside of ourself!
108                        delete this;
109                } else {
110                        // Send message bach to the client
111                        write_queue.push_back(new Buffer(buffer, nread));
112                }
113        }
114
115        // effictivly a close and a destroy
116        virtual ~EchoInstance() {
117                // Stop and free watcher if client socket is closing
118                io.stop();
119
120                close(sfd);
121
122                printf("%d client(s) connected.\n", --total_clients);
123        }
124
125public:
126        EchoInstance(int s) : sfd(s) {
127                fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
128
129                printf("Got connection\n");
130                total_clients++;
131
132                io.set<EchoInstance, &EchoInstance::callback>(this);
133
134                io.start(s, ev::READ);
135        }
136};
137
138class EchoServer {
139private:
140        ev::io           io;
141        ev::sig         sio;
142        int                 s;
143
144public:
145
146        void io_accept(ev::io &watcher, int revents) {
147                if (EV_ERROR & revents) {
148                        perror("got invalid event");
149                        return;
150                }
151
152                struct sockaddr_in client_addr;
153                socklen_t client_len = sizeof(client_addr);
154
155                int client_sd = accept(watcher.fd, (struct sockaddr *)&client_addr, &client_len);
156
157                if (client_sd < 0) {
158                        perror("accept error");
159                        return;
160                }
161
162                EchoInstance *client = new EchoInstance(client_sd);
163        }
164
165        static void signal_cb(ev::sig &signal, int revents) {
166                signal.loop.break_loop();
167        }
168
169        EchoServer(int port) {
170                printf("Listening on port %d\n", port);
171
172                struct sockaddr_in addr;
173
174                s = socket(PF_INET, SOCK_STREAM, 0);
175
176                addr.sin_family = AF_INET;
177                addr.sin_port     = htons(port);
178                addr.sin_addr.s_addr = INADDR_ANY;
179
180                if (bind(s, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
181                        perror("bind");
182                }
183
184                fcntl(s, F_SETFL, fcntl(s, F_GETFL, 0) | O_NONBLOCK);
185
186                listen(s, 5);
187
188                io.set<EchoServer, &EchoServer::io_accept>(this);
189                io.start(s, ev::READ);
190
191                sio.set<&EchoServer::signal_cb>();
192                sio.start(SIGINT);
193        }
194
195        virtual ~EchoServer() {
196                shutdown(s, SHUT_RDWR);
197                close(s);
198        }
199};
200
201int EchoInstance::total_clients = 0;
202
203int main(int argc, char **argv)
204{
205        int         port = 8192;
206
207        if (argc > 1)
208            port = atoi(argv[1]);
209
210        ev::default_loop       loop;
211        EchoServer             echo(port);
212
213        loop.run(0);
214
215        return 0;
216}