From f254c7580d8639b840e955444b6df1f302f7bba4 Mon Sep 17 00:00:00 2001 From: j4nk Date: Tue, 8 Aug 2023 13:07:05 -0400 Subject: [PATCH] Redid networking to a queue-based system --- main.c | 8 +- oct_networking.c | 190 +++++++++++++++++++++++++++++++++-------------- oct_networking.h | 34 ++++++++- test_client.lua | 3 +- 4 files changed, 170 insertions(+), 65 deletions(-) diff --git a/main.c b/main.c index e1f5be1..7943a35 100644 --- a/main.c +++ b/main.c @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) { if (ev.key == TB_KEY_ESC) { finish = 1; } - oct_network_recv_msg(); + oct_network_recv_msgs(); lua_getglobal(L, "oct_loop"); lua_pushinteger(L, ev.key); lua_pushinteger(L, ev.ch); @@ -65,12 +65,12 @@ int main(int argc, char* argv[]) { oct_render_termbox_sprite(oct_tb_sprite_list.sprite_list[i]); } tb_present(); - oct_network_send_msg(); + oct_network_send_msgs(); } } else { while (!finish) { - oct_network_recv_msg(); + oct_network_recv_msgs(); lua_getglobal(L, "oct_loop"); lua_pushinteger(L, 0); lua_pushinteger(L, 0); @@ -78,7 +78,7 @@ int main(int argc, char* argv[]) { OCT_LOG_ERROR("%s", luaL_checkstring(L, -1)); finish = 1; } - oct_network_send_msg(); + oct_network_send_msgs(); } } deinitialize_everything(); diff --git a/oct_networking.c b/oct_networking.c index 7628171..8ccd306 100644 --- a/oct_networking.c +++ b/oct_networking.c @@ -7,23 +7,84 @@ #include #include #include +#include #include "oct_log.h" #include "oct_networking.h" struct { - int needs_recv; - char recv_buffer[BUFFER_SIZE]; - char recv_addr[NI_MAXHOST]; - char recv_port[NI_MAXSERV]; - - int needs_send; - char send_buffer[BUFFER_SIZE]; - char send_addr[NI_MAXHOST]; - char send_port[NI_MAXSERV]; + struct oct_network_q recv_queue; + struct oct_network_q send_queue; + int sfd; } oct_network_node; +int oct_network_q_enqueue(struct oct_network_q* q, char* addr, char* port, char* msg) { + struct oct_network_q_entry* e = (struct oct_network_q_entry*)malloc(sizeof(struct oct_network_q_entry)); + if (!e) { + OCT_LOG_ERROR("Could not initialize new send queue entry"); + return 0; + } + strncpy(e->message.addr, addr, NI_MAXHOST); + strncpy(e->message.port, port, NI_MAXSERV); + strncpy(e->message.buffer, msg, BUFFER_SIZE); + e->next = NULL; + e->prev = NULL; + + // If the queue is empty + if (q->size == 0) { + q->first = e; + q->last = e; + } + else { + e->next = q->first; + q->first = e; + e->next->prev = e; + } + + q->size++; + return 1; +} + +struct oct_network_q_message oct_network_q_dequeue(struct oct_network_q* q) { + // Nothing left in queue signalled by a struct with the following addr, port, buffer + struct oct_network_q_message ret; + strncpy(ret.addr, "0", 2); + strncpy(ret.port, "0", 2); + strncpy(ret.buffer, "0", 2); + + if (q->size == 0) { + return ret; + } + // Otherwise, first copy fields over + strncpy(ret.addr, q->last->message.addr, NI_MAXHOST); + strncpy(ret.port, q->last->message.port, NI_MAXSERV); + strncpy(ret.buffer, q->last->message.buffer, BUFFER_SIZE); + + // Then, remove the last element + struct oct_network_q_entry* temp = q->last; + q->last = q->last->prev; + if (q->last) { + q->last->next = NULL; + } + free(temp); + + q->size--; + return ret; +} + +void oct_network_q_init(struct oct_network_q* q) { + q->first = NULL; + q->last = NULL; + q->size = 0; +} + +void oct_network_q_deinit(struct oct_network_q* q) { + while (q->first) { + oct_network_q_dequeue(q); + } +} + // Address book currently implemented as a doubly-linked list with O(1) inserts, O(n) accesses // Ideal implementation is hash table with O(1) inserts, accesses // However, we'll see if this works for now @@ -45,7 +106,8 @@ static int oct_network_init_socket(char* addr, char* port) { hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = 0; hints.ai_protocol = 0; /* Any protocol */ - s = getaddrinfo(oct_network_node.send_addr, oct_network_node.send_port, &hints, &result); + //s = getaddrinfo(oct_network_node.send_addr, oct_network_node.send_port, &hints, &result); + s = getaddrinfo(addr, port, &hints, &result); if (s != 0) { OCT_LOG_WARNING("getaddrinfo failed on %s:%s\n", addr, port); return -1; @@ -77,9 +139,7 @@ static int oct_network_init_socket(char* addr, char* port) { } int oct_network_node_init(char* port, lua_State* L) { - oct_network_node.needs_recv = 1; - oct_network_node.needs_send = 0; - + // Initialize listening socket struct addrinfo hints; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; // ipv4 or ipv6 @@ -120,9 +180,13 @@ int oct_network_node_init(char* port, lua_State* L) { if (!rp) { return 0; } - OCT_LOG_INFO("Listening on 127.0.0.1 port %s", port); + // Initialize send queue + oct_network_q_init(&oct_network_node.send_queue); + oct_network_q_init(&oct_network_node.recv_queue); + + // Push lua functions lua_pushcfunction(L, oct_network_recv_msg_lua); lua_setglobal(L, "oct_recv"); lua_pushcfunction(L, oct_network_send_msg_lua); @@ -131,65 +195,75 @@ int oct_network_node_init(char* port, lua_State* L) { return 1; } - -// This runs every loop, filling the receive buffer if needed -int oct_network_recv_msg() { - if (oct_network_node.needs_recv) { - struct sockaddr_storage peer_addr; - socklen_t peer_addrlen = sizeof(peer_addr); - - // recvfrom returns -1 if nothing was received - // Need MSG_DONTWAIT flag for nonblocking - if (recvfrom(oct_network_node.sfd, oct_network_node.recv_buffer, BUFFER_SIZE, MSG_DONTWAIT, (struct sockaddr *) &peer_addr, &peer_addrlen) > 0) { - OCT_LOG_INFO("Received message!"); - int s = getnameinfo((struct sockaddr *) &peer_addr, - peer_addrlen, oct_network_node.recv_addr, NI_MAXHOST, oct_network_node.recv_port, NI_MAXSERV, NI_NUMERICSERV); - if (s != 0) { - OCT_LOG_WARNING("Received message from unknown host: %s", oct_network_node.recv_buffer); - return -1; - } - oct_network_node.needs_recv = 0; - } - } - return 0; +void oct_network_node_deinit() { + oct_network_ab_deinit(); + oct_network_q_deinit(&oct_network_node.send_queue); + oct_network_q_deinit(&oct_network_node.recv_queue); } -int oct_network_send_msg() { - if (oct_network_node.needs_send) { +// This runs every loop, filling the receive buffer if needed +int oct_network_recv_msgs() { + struct sockaddr_storage peer_addr; + socklen_t peer_addrlen = sizeof(peer_addr); + char tmp_addr[NI_MAXHOST]; + char tmp_port[NI_MAXSERV]; + char tmp_buffer[BUFFER_SIZE]; + + // Receive up to OCT_NETWORK_MAX_RECVS messages + // recvfrom returns -1 if nothing was received + // Need MSG_DONTWAIT flag for nonblocking + int i = 0; + while (recvfrom(oct_network_node.sfd, tmp_buffer, BUFFER_SIZE, MSG_DONTWAIT, (struct sockaddr *) &peer_addr, &peer_addrlen) > 0 && i < OCT_NETWORK_MAX_RECVS) { + OCT_LOG_INFO("Received message!"); + int s = getnameinfo((struct sockaddr *) &peer_addr, + peer_addrlen, tmp_addr, NI_MAXHOST, tmp_port, NI_MAXSERV, NI_NUMERICSERV); + if (s != 0) { + OCT_LOG_WARNING("Received message from unknown host: %s", tmp_buffer); + return -1; + } + oct_network_q_enqueue(&oct_network_node.recv_queue, tmp_addr, tmp_port, tmp_buffer); + i++; + } + return i; +} + + +int oct_network_send_msgs() { + while (oct_network_node.send_queue.size != 0) { + struct oct_network_q_message m = oct_network_q_dequeue(&oct_network_node.send_queue); struct oct_network_ab_entry* e; - e = oct_network_ab_find(oct_network_node.send_addr, oct_network_node.send_port); + e = oct_network_ab_find(m.addr, m.port); if (!e) { - OCT_LOG_INFO("Not in address book: %s:%s", oct_network_node.send_addr, oct_network_node.send_port); - e = oct_network_ab_insert(oct_network_node.send_addr, oct_network_node.send_port); + OCT_LOG_DEBUG("Not in address book: %s:%s", m.addr, m.port); + e = oct_network_ab_insert(m.addr, m.port); if (!e) { // Give up... - oct_network_node.needs_send = 0; + OCT_LOG_ERROR("Error: Could not insert address book entry"); return 0; } } else { - OCT_LOG_INFO("Found in address book: %s:%s", oct_network_node.send_addr, oct_network_node.send_port); + OCT_LOG_DEBUG("Found in address book: %s:%s", m.addr, m.port); } int sfd = e->sfd; - ssize_t length = strlen(oct_network_node.send_buffer); - OCT_LOG_DEBUG("Sending message: %s", oct_network_node.send_buffer); - if (write(sfd, oct_network_node.send_buffer, length) != length) { + ssize_t length = strlen(m.buffer); + OCT_LOG_DEBUG("Sending message: %s", m.buffer); + if (write(sfd, m.buffer, length) != length) { OCT_LOG_WARNING("Partial write when sending message"); return -1; } - oct_network_node.needs_send = 0; } return 1; } int oct_network_recv_msg_lua(lua_State* L) { - if (!oct_network_node.needs_recv) { - lua_pushstring(L, oct_network_node.recv_buffer); - lua_pushstring(L, oct_network_node.recv_addr); - lua_pushstring(L, oct_network_node.recv_port); - oct_network_node.needs_recv = 1; + if (oct_network_node.recv_queue.size != 0) { + struct oct_network_q_message m = oct_network_q_dequeue(&oct_network_node.recv_queue); + lua_pushstring(L, m.buffer); + lua_pushstring(L, m.addr); + lua_pushstring(L, m.port); } else { lua_pushstring(L, ""); @@ -199,12 +273,14 @@ int oct_network_recv_msg_lua(lua_State* L) { return 3; } int oct_network_send_msg_lua(lua_State* L) { - if (!oct_network_node.needs_send) { - strncpy(oct_network_node.send_buffer, luaL_checkstring(L, -3), BUFFER_SIZE); - strncpy(oct_network_node.send_addr, luaL_checkstring(L, -2), NI_MAXHOST); - strncpy(oct_network_node.send_port, luaL_checkstring(L, -1), NI_MAXSERV); - oct_network_node.needs_send = 1; - } + char tmp_addr[NI_MAXHOST]; + char tmp_port[NI_MAXSERV]; + char tmp_buffer[BUFFER_SIZE]; + strncpy(tmp_buffer, luaL_checkstring(L, -3), BUFFER_SIZE); + strncpy(tmp_addr, luaL_checkstring(L, -2), NI_MAXHOST); + strncpy(tmp_port, luaL_checkstring(L, -1), NI_MAXSERV); + + oct_network_q_enqueue(&oct_network_node.send_queue, tmp_addr, tmp_port, tmp_buffer); return 0; } diff --git a/oct_networking.h b/oct_networking.h index 272f93d..b25af3a 100644 --- a/oct_networking.h +++ b/oct_networking.h @@ -9,7 +9,34 @@ #define NUM_BUFFERS 10; #define OCT_DEFAULT_PORT "20000" +// Allowed to receive up to 10 messages per iteration of the main loop +#define OCT_NETWORK_MAX_RECVS 10 +// Queue stuff +// Send and recv queues are identical except in the functions that act on them, so we can use the same structure +// for both +struct oct_network_q_message { + char addr[NI_MAXHOST]; + char port[NI_MAXSERV]; + char buffer[BUFFER_SIZE]; +}; + +struct oct_network_q_entry { + struct oct_network_q_message message; + struct oct_network_q_entry* next; + struct oct_network_q_entry* prev; +}; + +struct oct_network_q { + size_t size; + struct oct_network_q_entry* first; + struct oct_network_q_entry* last; +}; + +int oct_network_q_enqueue(struct oct_network_q* q, char* addr, char* port, char* msg); +struct oct_network_q_message oct_network_q_dequeue(struct oct_network_q* q); +void oct_network_q_init(struct oct_network_q* q); +void oct_network_q_deinit(struct oct_network_q* q); struct oct_network_ab_entry { int sfd; @@ -20,14 +47,15 @@ struct oct_network_ab_entry { }; int oct_network_node_init(char* port, lua_State* L); -int oct_network_node_deinit(); +void oct_network_node_deinit(); // These are heavily based off of man 3 getaddrinfo, e.g. at // https://www.man7.org/linux/man-pages/man3/getaddrinfo.3.html -int oct_network_recv_msg(); -int oct_network_send_msg(); +int oct_network_recv_msgs(); +int oct_network_send_msgs(); int oct_network_recv_msg_lua(lua_State* L); int oct_network_send_msg_lua(lua_State* L); +int oct_network_broadcast_msg_lua(lua_State* L); void oct_network_ab_init(); struct oct_network_ab_entry* oct_network_ab_insert(char* addr, char* port); diff --git a/test_client.lua b/test_client.lua index d146cb4..ad038a9 100644 --- a/test_client.lua +++ b/test_client.lua @@ -16,7 +16,8 @@ end counter = 0; function oct_loop(key) if counter == 1000000 then - oct_send("Hello!", "127.0.0.1", "1234"); + oct_send("First message!", "127.0.0.1", "1234"); + oct_send("Second message!", "127.0.0.1", "1234"); counter = 0; end counter = counter + 1;