From fb33e6b84719746d22938e2e79c57b5954f63fa4 Mon Sep 17 00:00:00 2001
From: Ludovic Pouzenc <ludovic@pouzenc.fr>
Date: Sat, 25 Jun 2016 23:40:57 +0200
Subject: receive_data : use some ring buffer to batch recv, reorder, validate

---
 mcastseed/src/mcastleech.c | 121 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 104 insertions(+), 17 deletions(-)

(limited to 'mcastseed/src/mcastleech.c')

diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index d19bff9..c832489 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -12,8 +12,10 @@
 #include <string.h>
 #include <time.h>
 #include "msock.h" 
+#include "dgrambuf.h" 
 
-#define MULTICAST_RECV_BUF 10240
+#define MTU 1500
+#define MULTICAST_RECV_BUF (MTU-20-8)
 #define MULTICAST_SO_RCVBUF 425984
 #define DEFAULT_MCAST_IP_STR "ff02::114"
 #define DEFAULT_PORT_STR "9000"
@@ -29,6 +31,8 @@ SOCKET ucast_sock = (SOCKET) -1; /* Unicast socket for give feedback to server *
 
 /* Buffer used for earch recvfrom() */
 char recvbuf[MULTICAST_RECV_BUF];
+/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
+dgrambuf_t dgrambuf;
 
 /* Strings to print out representation of various states of the program */
 const char * const state_str[] = {
@@ -44,6 +48,10 @@ const char * const state_str[] = {
 void die(char* msg);
 void usage(char *msg);
 void arg_parse(int argc, char* argv[]);
+size_t get_available_mem();
+void dgrambuf_init();
+uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
+void ack(uint32_t seq);
 
 /* Parts of the "protocol", definitions are after main() */
 int wait_hello_and_connect_back();
@@ -58,6 +66,7 @@ int main(int argc, char* argv[]) {
 	int res;
 
 	arg_parse(argc, argv);
+	dgrambuf_init();
 
 	/* Finite state machine */
 	while ( state > 0 ) {
@@ -77,10 +86,14 @@ int main(int argc, char* argv[]) {
 
 	if ( mcast_sock > 0 ) {
 		close(mcast_sock);
+		mcast_sock = (SOCKET) -1;
 	}
 
-	if ( state < 0 )
+	dgrambuf_free(&dgrambuf);
+
+	if ( state < 0 ) {
 		return -state;
+	}
 
 	return EXIT_SUCCESS;
 }
@@ -99,11 +112,12 @@ int wait_hello_and_connect_back() {
 	/* Setup mcast_sock */
 	if ( mcast_sock > 0 ) {
 		close(mcast_sock);
-		mcast_sock = 0;
+		mcast_sock = (SOCKET) -1;
 	}
 	mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF);
-	if(mcast_sock < 0)
-			usage("Could not setup multicast socket. Wrong args given ?");
+	if(mcast_sock < 0) {
+		usage("Could not setup multicast socket. Wrong args given ?");
+	}	
 
 	/* Wait for a single datagram from the server (for sync, no check on contain) */
 	peer_addr_len = sizeof(struct sockaddr_storage);
@@ -142,7 +156,7 @@ int wait_start_and_start_job() {
 		return -1;	
 	}
 	if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
-
+		/* Reply "ready" through unicast stream socket */
 		nwrite = write(ucast_sock, "ready", 5);
 		if ( nwrite < 0 ) {
 			fprintf(stderr, "write() failed\n");
@@ -159,12 +173,14 @@ int wait_start_and_start_job() {
 	return 0;	
 }
 
+
 int receive_data() {
+	/*
 	ssize_t nread;
 	uint32_t seq;
 	uint16_t datalen;
 
-	/* Wait for a "dataN" datagram from the server */
+	// Wait for a "data" datagram from the server
 	nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
  	if (nread < 0 ) {	
 		perror("recvfrom() failed");
@@ -173,22 +189,33 @@ int receive_data() {
 	if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
 		seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
 		datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
-		//fprintf(stderr, "debug seq==%i, datalen==%hi\n", seq, datalen);
-		if ( nread != (10 + datalen) ) {
-			fprintf(stderr, "debug nread==%zi, (10 + datalen)==%i\n", nread, (10 + datalen));
-			//TODO nack ?
-			return -2;
+		if ( nread == (10 + datalen) ) {
+			ack(seq);
+			dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen);
+		} else {
+			fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen));
 		}
-		fprintf(stdout, "data #%i, ", seq);
-		fwrite(recvbuf+10, datalen, 1, stdout);
-		fflush(stdout);
-		//TODO buffer zero copy, ack
-		return 1;
+	}
+
+	return 1;
+	*/
+
+	unsigned int count;
+
+	count = dgrambuf_recvmmsg(dgrambuf, mcast_sock);
+	if (count < 0) {
+		return -1;
 	}
 
 	return 0;
 }
 
+
+void ack(uint32_t seq) {
+	//TODO
+}
+
+
 int finalize_job() {
 	return 0;	
 }
@@ -225,3 +252,63 @@ void arg_parse(int argc, char* argv[]) {
 	mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
 }
 
+size_t get_available_mem() {
+	char key[64];
+	int value;
+	int found=0;
+	unsigned long int mem_avail;
+	FILE * fh = fopen("/proc/meminfo", "r");
+	if ( fh ) {
+		while (!found && !feof(fh)) {
+			fscanf(fh, "%63s %i kB\n", key, &value);
+			found = ( strncmp("MemAvailable:", key, 12) == 0 );
+		}
+	}
+
+	if ( found ) {
+		mem_avail = value * 1024;
+		if ( mem_avail > (size_t)-1 ) {
+			return -1;
+		} else {
+			return mem_avail;
+		}
+	}
+
+	return 0;
+}
+
+void dgrambuf_init() {
+	/* Guess dgrambuf size from global free memory */
+	size_t dgram_count;
+	size_t avail_mem = get_available_mem();
+	if ( avail_mem < MULTICAST_SO_RCVBUF ) {
+		dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
+	} else {
+		dgram_count = avail_mem / MULTICAST_RECV_BUF / 2;
+	}
+
+	/* Allocate dgrambuf */
+	dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF);
+	if ( dgrambuf == NULL ) {
+		perror("dgrambuf_new/malloc");
+		exit(EXIT_FAILURE);
+	} 
+
+	printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+	dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
+}
+
+unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
+	uint32_t seq;
+	uint16_t datalen;
+
+	if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
+		seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+		datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
+		if ( nread == (10 + datalen) ) {
+			return seq;
+		}
+	}
+
+	return 0;
+}
-- 
cgit v1.2.3