Inexor
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
RpcServer.hpp
Go to the documentation of this file.
1 
4 #pragma once
5 
6 #include <memory>
7 #include <string>
8 #include <exception>
9 #include <queue>
10 #include <functional>
11 #include <chrono>
12 #include <thread>
13 
14 #include <grpc/grpc.h>
15 #include <grpc++/grpc++.h>
16 
17 #include "inexor/io/Logging.hpp"
18 
19 // size is important for us, proto explicitly specifies int64
20 typedef int64_t int64;
21 
22 namespace inexor {
23 namespace rpc {
24 
27 {
28  t_cstring = 0,
31 };
32 
33 // These functions need to be implemented by the Context Provider (acquiring this submodule):
34 extern void set_on_change_functions();
35 extern void send_all_vars();
36 
37 template<typename MSG_TYPE>
38 bool handle_index(int index, const MSG_TYPE &tree_event);
39 
40 #define MAX_RPC_EVENT_CHECKS_PER_TICK 100
41 #define MAX_RPC_CLIENTS 128 // possible highest value is 255, see encode_signal()
42 
45 {
51 };
53 {
54  const int type;
55  const int client_id;
56  callback_event(const int type_, int client_id_) : type(type_), client_id(client_id_) {}
57 };
58 const inline void *encode_signal(const int event_type, int clientid)
59 {
60  return reinterpret_cast<void*>(new callback_event(event_type, clientid));
61 }
62 
63 template<typename MSG_TYPE, typename ASYNC_SERVICE_TYPE>
64 class RpcServer
65 {
66  typedef grpc::ServerAsyncReaderWriter<MSG_TYPE, MSG_TYPE> stream_type;
67 
69  std::unique_ptr<grpc::Server> grpc_server;
70 
72  grpc::ServerContext server_context;
73 
74  ASYNC_SERVICE_TYPE service;
75 
77  std::unique_ptr<grpc::ServerCompletionQueue> cq;
78 
79 public:
80 
82  bool initialized = false;
83 
85  {
87  std::unique_ptr<stream_type> stream;
88 
90  MSG_TYPE read_buffer;
91 
93  bool writer_busy = false;
94 
96  std::queue<MSG_TYPE> outstanding_writes;
97 
99  grpc::Status disconnect_status;
100  public:
101 
103  int id;
104 
105  clienthandler(int id_, std::unique_ptr<stream_type> &&stream_) : id(id_), stream(std::move(stream_)) {}
106 
108  void request_read();
110  const MSG_TYPE &get_read_result() { return read_buffer; }
111 
113  void write(const MSG_TYPE &msg) { outstanding_writes.push(msg); }
114 
116  bool has_writes() { return !outstanding_writes.size(); }
117  bool currently_writing() { return writer_busy; }
118 
120  void request_send_one();
122  void finished_send_one();
123 
125  void request_disconnect();
127  void finished_disconnect();
128  };
129  static std::vector<clienthandler> clients;
130 
131 private:
133  std::unique_ptr<stream_type> connect_slot;
134 public:
136 
137  RpcServer(const char *address);
138  ~RpcServer();
139 
143  void process_queue();
144 
148 
151  static void send_msg(const MSG_TYPE &msg, int excluded_id = -1)
152  {
153  for(clienthandler &ci: clients)
154  {
155  if(ci.id != excluded_id) ci.write(msg);
156  }
157  }
158 
159 
160 private:
161 
162  void open_connect_slot();
163  void handle_new_connection();
164 
165  void request_disconnect_client(int id);
166  void finish_disconnect_client(int id);
167 
169  {
170  for(clienthandler &ci : clients)
171  if(ci.id == id) return &ci;
172  return nullptr;
173  }
174 
175  bool any_writes_outstanding();
176 
177  void kickoff_writes();
178 
179  void handle_queue_event(callback_event *encoded_callback, bool broadcast, std::function<void(const MSG_TYPE &)> receive_handler);
180 
181  int pick_unused_id();
182  bool change_variable(const MSG_TYPE &receivedval);
183 
184 
185 };
186 
187 
188 
191 
192 
193 
194 template<typename MSG_TYPE, typename U> inline
196 {
197  const void *cq_id = encode_signal(EVENT_TYPE::E_READ, id);
198  stream->Read(&read_buffer, (void *)cq_id);
199 }
200 
201 template<typename MSG_TYPE, typename U> inline
203 {
204  if(writer_busy || has_writes()) return;
205  writer_busy = true;
206  const void* cq_id = encode_signal(EVENT_TYPE::E_WRITE, id);
207  stream->Write(outstanding_writes.front(), (void *)cq_id);
208  outstanding_writes.pop();
209 }
210 
211 template<typename MSG_TYPE, typename U> inline
213 {
214  writer_busy = false;
215  request_send_one();
216 }
217 
218 template<typename MSG_TYPE, typename U> inline
220 {
221  const void* cq_id = encode_signal(E_DISCONNECT, id);
222  stream->Finish(disconnect_status, (void *)cq_id);
223 }
224 
226 template<typename MSG_TYPE, typename U> inline
228 {
229  if(!disconnect_status.ok())
230  Log.sync->error("Disconnecting client errored (msg: {})", disconnect_status.error_message());
231 }
232 
233 template<typename MSG_TYPE, typename U> inline
234 RpcServer<MSG_TYPE, U>::RpcServer(const char *address) : server_address(address)
235 {
236  grpc::ServerBuilder builder;
237  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
238  builder.RegisterService(&service);
239  cq = builder.AddCompletionQueue();
240  grpc_server = builder.BuildAndStart();
241 
242  open_connect_slot(); // wait for the first client to connect.
243 }
244 
245 template<typename MSG_TYPE, typename U> inline
247 {
248  // TODO we should also receive whether disconnect was successfully
249  for(auto &client : clients) client.request_disconnect();
250  grpc_server->Shutdown();
251  // Always shutdown the completion queue after the server.
252  cq->Shutdown();
253 }
254 
255 template<typename MSG_TYPE, typename U> inline
257 {
258  connect_slot = std::make_unique<stream_type>(&server_context);
259  service.RequestSynchronize(&server_context, connect_slot.get(), cq.get(), cq.get(), (void *)encode_signal(E_CONNECT, -1));
260 }
261 
262 template<typename MSG_TYPE, typename U> inline
264 {
265  int n_id = pick_unused_id();
266  Log.sync->info("RPC Server: New client connected id {}", n_id);
267 
268  clients.push_back(clienthandler(n_id, std::move(connect_slot)));
269  clients.back().request_read();
270 
271  connect_slot = nullptr;
272  if(clients.size() < MAX_RPC_CLIENTS) open_connect_slot();
273 
274  // TODO: we should send those only to the newly connected client.
275  //send_all_vars();
276 }
277 
278 template<typename MSG_TYPE, typename U> inline
280 {
281  clienthandler *client = get_client(id);
282  if(client) client->request_disconnect();
283 }
284 
285 template<typename MSG_TYPE, typename U> inline
287 {
288  for(auto ci = clients.begin(); ci != clients.end(); ++ci)
289  if(ci->id == id)
290  {
291  ci->finished_disconnect();
292  clients.erase(ci);
293  }
294  if(!connect_slot) open_connect_slot(); // a slot just became free.
295 }
296 
297 template<typename MSG_TYPE, typename U> inline
299 {
300  for(clienthandler &client : clients)
301  if(client.has_writes() || client.currently_writing())
302  return true;
303  return false;
304 }
305 
306 template<typename MSG_TYPE, typename U> inline
308 {
309  for(clienthandler &client : clients)
310  client.request_send_one();
311 }
312 
313 template<typename MSG_TYPE, typename U> inline
315 {
316  kickoff_writes();
317 
318  using grpc::CompletionQueue;
319 
320  for(int i = 0; i < MAX_RPC_EVENT_CHECKS_PER_TICK; i++)
321  {
322  callback_event *callback_value;
323  bool no_internal_grpc_error = false;
324 
325  // read the next event from the completion queue (in nonblocking fashion)
326  // TODO: adapt blocking time to the time budget and whether or not writes are outstanding
327  CompletionQueue::NextStatus stat = cq->AsyncNext((void **)(&callback_value), &no_internal_grpc_error, gpr_inf_past(GPR_CLOCK_REALTIME));
328 
329  //if(!) break;// throw std::runtime_error("GRPC had an internal error: Shutting down.");
330 
331  if(no_internal_grpc_error && stat == CompletionQueue::NextStatus::GOT_EVENT)
332  handle_queue_event(callback_value, true, [=](const MSG_TYPE &msg) {
333  this->change_variable(msg);
334  });
335  else if(stat == CompletionQueue::NextStatus::TIMEOUT)
336  {
337  if(!any_writes_outstanding()) break;
338  }
339  else if(stat == CompletionQueue::NextStatus::SHUTDOWN)
340  {
341  std::string error_message("[GRPC Server] Completion Queue Shutdown status received..");
342  throw std::runtime_error(error_message);
343  }
344  }
345 }
346 
347 template<typename MSG_TYPE, typename ASYNC_SERVICE_TYPE>
349 {
350  if(initialized) return; // We only init from the first client.
351 
352  using grpc::CompletionQueue;
353  using std::chrono::steady_clock;
354  using std::chrono::seconds;
355  using std::chrono::duration_cast;
356 
357  auto time_start = steady_clock::now();
358  while(initialized != true)
359  {
360  callback_event *callback_value;
361  bool no_internal_grpc_error = false;
362  bool regularEvent = cq->Next((void **)(&callback_value), &no_internal_grpc_error);
363  if(no_internal_grpc_error && regularEvent)
364  {
365  handle_queue_event(callback_value, false, [&](const MSG_TYPE &msg) {
366  int64 index = msg.key_case();
367  // FINISHED_TREE_INTRO_SEND
368  if(msg.general_event() == 1)
369  {
370  initialized = true;
371  return;
372  }
373  this->change_variable(msg);
374  });
375  }
376  else if(!regularEvent)
377  {
378  std::string error_message("[GRPC Server] Completion Queue Shutdown status received (in init)..");
379  throw std::runtime_error(error_message);
380  }
381  }
382 }
383 
384 template<typename MSG_TYPE, typename U> inline
385 void RpcServer<MSG_TYPE, U>::handle_queue_event(callback_event *encoded_callback, bool broadcast, std::function<void(const MSG_TYPE &)> receive_handler)
386 {
387  switch(encoded_callback->type)
388  {
389  case E_READ:
390  {
391  clienthandler *ci = get_client(encoded_callback->client_id);
392  if(!ci) break; // TODO we should better process its last messages, but we dont have the clients read_buffer anymore.
393 
394  const MSG_TYPE msg = ci->get_read_result();
395  ci->request_read();
396  receive_handler(msg);
397  if(broadcast) send_msg(msg, ci->id); //broadcast changes from one client to other clients.
398  break;
399  }
400  case E_WRITE:
401  {
402  clienthandler *ci = get_client(encoded_callback->client_id);
403  if(!ci) break;
404  ci->finished_send_one();
405  break;
406  }
407  case E_CONNECT:
408  handle_new_connection();
409  break;
410  case E_DISCONNECT:
411  finish_disconnect_client(encoded_callback->client_id);
412  break;
413  }
414  delete encoded_callback;
415 }
416 
417 template<typename T, typename U>
418 int RpcServer<T, U>::pick_unused_id() // TODO get some uuid thing here.
419 {
420  for(int id_try = 0; id_try < INT_MAX; id_try++)
421  {
422  bool hasid = false;
423  for(auto &ci: clients)
424  if(ci.id == id_try)
425  {
426  hasid = true; break;
427  }
428  if(!hasid) return id_try;
429  }
430  std::string error_message("grpc system: no client id could be generated.");
431  throw std::runtime_error(error_message);
432  return -1;
433 }
434 
435 template<typename MSG_TYPE, typename U> inline
436 bool RpcServer<MSG_TYPE, U>::change_variable(const MSG_TYPE &receivedval)
437 {
438  int64 index = receivedval.key_case();
439 
440  if(index <= 0)
441  {
442  Log.sync->error("[Server] Received illegal message index (none was set)");
443  return false;
444  }
445 
446  if(!handle_index<MSG_TYPE>(index, receivedval))
447  {
448  Log.sync->error("network: received non-supported index: {0}", index); // -> to debug
449  return false;
450  }
451  return true;
452 }
453 
454 
455 } // namespace inexor
456 } // namespace rpc
bool currently_writing()
Definition: RpcServer.hpp:117
Definition: RpcServer.hpp:30
bool handle_index(int index, const MSG_TYPE &tree_event)
static std::vector< clienthandler > clients
Definition: RpcServer.hpp:129
void finish_disconnect_client(int id)
Definition: RpcServer.hpp:286
void kickoff_writes()
Definition: RpcServer.hpp:307
callback_event(const int type_, int client_id_)
Definition: RpcServer.hpp:56
Definition: RpcServer.hpp:46
Definition: RpcServer.hpp:52
void finished_send_one()
Handle a completed write, send the next item if our queue isnt empty.
Definition: RpcServer.hpp:212
const MSG_TYPE & get_read_result()
Get the message the last read spit out.
Definition: RpcServer.hpp:110
void write(const MSG_TYPE &msg)
Add message to the queue of to-be-sent messages.
Definition: RpcServer.hpp:113
Definition: RpcServer.hpp:49
cpp_type_t
Known C++ SharedVar types.
Definition: RpcServer.hpp:26
EVENT_TYPE
The events we request GRPC to do.
Definition: RpcServer.hpp:44
bool writer_busy
There's always only one write allowed at a time.
Definition: RpcServer.hpp:93
static Logger sync
Logger for synchronization messages.
Definition: Logging.hpp:96
Definition: RpcServer.hpp:48
Definition: RpcServer.hpp:47
#define MAX_RPC_EVENT_CHECKS_PER_TICK
Definition: RpcServer.hpp:40
Logging stuff including the ingame console logging functionality.
std::string server_address
Definition: RpcServer.hpp:135
ASYNC_SERVICE_TYPE service
Definition: RpcServer.hpp:74
const void * encode_signal(const int event_type, int clientid)
Definition: RpcServer.hpp:58
void request_read()
Start an asynchronous read.
Definition: RpcServer.hpp:195
GLuint index
Definition: glexts.hpp:412
std::queue< MSG_TYPE > outstanding_writes
Keep a list of outstanding writes, since GRPC limits us to only one outstanding write per stream/clie...
Definition: RpcServer.hpp:96
clienthandler * get_client(int id)
Definition: RpcServer.hpp:168
void handle_queue_event(callback_event *encoded_callback, bool broadcast, std::function< void(const MSG_TYPE &)> receive_handler)
Definition: RpcServer.hpp:385
void set_on_change_functions()
grpc::ServerAsyncReaderWriter< MSG_TYPE, MSG_TYPE > stream_type
Definition: RpcServer.hpp:66
std::unique_ptr< grpc::Server > grpc_server
GRPC server instance, running while this class is alive.
Definition: RpcServer.hpp:69
bool any_writes_outstanding()
Definition: RpcServer.hpp:298
void request_send_one()
Send the next item from the queue, needs to be called only if the queue was empty in the last finishe...
Definition: RpcServer.hpp:202
void request_disconnect_client(int id)
Definition: RpcServer.hpp:279
inexor::util::log_manager Log
Definition: Logging.cpp:241
void block_until_initialized()
This is used during the startup, we process_queue() until we receive a special event.
Definition: RpcServer.hpp:348
grpc::Status disconnect_status
Did disconnecting work? If it didn't, why?
Definition: RpcServer.hpp:99
std::unique_ptr< stream_type > stream
The stream we write into / receive data from (asynchronously).
Definition: RpcServer.hpp:87
Definition: RpcServer.hpp:64
int pick_unused_id()
Definition: RpcServer.hpp:418
int id
The clients identifier number.
Definition: RpcServer.hpp:103
grpc::ServerContext server_context
settings like the ip/port of the server are saved in here.
Definition: RpcServer.hpp:72
Legacy file system streams.
Definition: stream.hpp:22
vector< fpsent * > clients
other clients connected to this server
Definition: fps.cpp:581
bool move(physent *d, vec &dir)
Definition: physics.cpp:1342
void process_queue()
This is essentially doing the sending/receiving.
Definition: RpcServer.hpp:314
std::unique_ptr< stream_type > connect_slot
Client which isn't connected yet, a buffer caused by the async API.
Definition: RpcServer.hpp:133
char string[MAXSTRLEN]
cube-strings (char arrays of fixed size).
Definition: cube_types.hpp:18
Definition: RpcServer.hpp:84
void request_disconnect()
Tell the client he gets disconnected.
Definition: RpcServer.hpp:219
Definition: RpcServer.hpp:50
Definition: RpcServer.hpp:28
bool change_variable(const MSG_TYPE &receivedval)
Definition: RpcServer.hpp:436
const int type
Definition: RpcServer.hpp:54
void open_connect_slot()
Definition: RpcServer.hpp:256
Definition: RpcServer.hpp:29
~RpcServer()
Definition: RpcServer.hpp:246
void send_all_vars()
MSG_TYPE read_buffer
We read asynchronously into this and we always just read one at a time.
Definition: RpcServer.hpp:90
int64_t int64
This file contains the Remote Procedure Call Server which synchronizes our shared variables with the ...
Definition: RpcServer.hpp:20
void finished_disconnect()
Prints out any error info.
Definition: RpcServer.hpp:227
RpcServer(const char *address)
Definition: RpcServer.hpp:234
bool initialized
As soon as the tree arrived from the first client, this will be set to true.
Definition: RpcServer.hpp:82
std::unique_ptr< grpc::ServerCompletionQueue > cq
The completion queue (where notifications of the succcess of a network commands get retrieved)...
Definition: RpcServer.hpp:77
const int client_id
Definition: RpcServer.hpp:55
bool has_writes()
Whether or not writes are outstanding.
Definition: RpcServer.hpp:116
#define MAX_RPC_CLIENTS
Definition: RpcServer.hpp:41
void handle_new_connection()
Definition: RpcServer.hpp:263
static void send_msg(const MSG_TYPE &msg, int excluded_id=-1)
Send any variable changes in the core to all clients.
Definition: RpcServer.hpp:151
clienthandler(int id_, std::unique_ptr< stream_type > &&stream_)
Definition: RpcServer.hpp:105