From 0545a7e105633763507c24cc45ac03942fb271b3 Mon Sep 17 00:00:00 2001
From: Ludovic Pouzenc <ludovic@pouzenc.fr>
Date: Sat, 2 Jul 2016 20:31:40 +0200
Subject: dgrambuf: full scatter/gather, no ringbuffer. Dummy data to check
 some code paths.

---
 mcastseed/src/dgrambuf.c   | 217 +++++++++++++++++++++++++++++++++------------
 mcastseed/src/dgrambuf.h   |   3 +-
 mcastseed/src/mcastleech.c |  85 ++++++------------
 mcastseed/src/mcastseed.c  |  17 ++--
 4 files changed, 199 insertions(+), 123 deletions(-)

diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 47c6a68..b07ba1f 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -7,107 +7,192 @@
 
 #include "dgrambuf.h"
 
-#include <sys/socket.h> /* recvmmsg() */
-#include <stdlib.h> /* calloc(), free() */
+#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
+#include <stdlib.h> /* calloc(), free(), qsort() */
 #include <stdio.h> /* perror() */
 #include <string.h> /* memset() */
 #include <sys/uio.h> /* writev() */
 
+struct uint_pair {
+	unsigned int index;
+	unsigned int value;
+};
+
 struct dgrambuf_t {
 	size_t dgram_count;
 	size_t dgram_max_size;
+	size_t dgram_header_size;
 
 	struct iovec *recv_iovecs;
 	struct iovec *write_iovecs;
 	struct mmsghdr *msgs;
 
-	int buf_full;
-	size_t buf_head;
-	size_t buf_tail;
+	unsigned int win_base;
+	unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+	unsigned int *dgram_len;
+	struct uint_pair *dgram_ordered_seq_numbers;
+
 	void *buf;
 
 	unsigned int (*validate_func)(unsigned int, void *);
 	//TODO pthread_mutex_lock
 };
 
+int _compare_uint_pair(const void *pa, const void *pb);
+
 void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
 	dbuf->validate_func = func;
 }
 
 int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
-	size_t vlen, s, i;
-	int recv_msg_count;
-	unsigned int seq;
+  void *dgram_base;
+	size_t vlen, i, dgram_index;
+	int recv_msg_count, res;
+	unsigned int seq, dgram_len;
 
-	if (dbuf->buf_full) {
-		return -1; //TODO block until write
+	if ( !dbuf->validate_func ) {
+		return -1;
 	}
-	/* Determine how many message we can read at once */
-	if ( dbuf->buf_head < dbuf->buf_tail ) {
-		vlen = dbuf->buf_tail - dbuf->buf_head - 1;
-	} else {
-		vlen = dbuf->dgram_count - dbuf->buf_head;
+
+	/* Initialize recvmmsg() syscall arguments */
+	for (i=0, vlen=0; i < dbuf->dgram_count; i++) {
+		if ( dbuf->dgram_seq_numbers[i] == 0 ) {
+			dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
+			dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size;
+			memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr));
+			dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen;
+			dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1;
+			vlen++;
+		}
 	}
 
-	/* Initialize recvmmsg arguments */
-	s = dbuf->buf_head;
-	memset(dbuf->msgs + s, 0, vlen * sizeof(struct mmsghdr));
-	for (i=0; i<vlen; i++) {
-		dbuf->recv_iovecs[s+i].iov_base = dbuf->buf + (s+i)*dbuf->dgram_count;
-		dbuf->recv_iovecs[s+i].iov_len = dbuf->dgram_max_size;
-		dbuf->msgs[s+i].msg_hdr.msg_iov = &dbuf->recv_iovecs[s+i];
-		dbuf->msgs[s+i].msg_hdr.msg_iovlen = 1;
+	/* Buffer is full, can't receive */
+	if ( vlen==0 ) {
+		return -2;
 	}
 
 	/* Do the syscall */
-	recv_msg_count = recvmmsg(sockfd, dbuf->msgs + s, vlen, MSG_WAITFORONE, NULL);
-	if (recv_msg_count == -1) {
+	recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
+	if (recv_msg_count < 0) {
 		perror("recvmmsg()");
-	} else {
-		/* Update structure values accordingly */
-		dbuf->buf_head = ( dbuf->buf_head + recv_msg_count ) % dbuf->dgram_count;
-		dbuf->buf_full = ( dbuf->buf_head == dbuf->buf_tail );
+		return recv_msg_count;
 	}
 
 	/* Check all received messages */
-	if ( dbuf->validate_func ) {
-		for (i=0; i<recv_msg_count; i++) {
-			seq = dbuf->validate_func(dbuf->msgs[s+i].msg_len, dbuf->recv_iovecs[s+i].iov_base);
-			if ( seq > 0 ) {
-				// TODO Valid
-				printf("#%i valid\n", s+i);
-			} else {
-				// TODO Invalid
-				printf("#%i invalid\n", s+i);
-			}
+	res = 1;
+	for (i=0; i<recv_msg_count; i++) {
+		dgram_base = dbuf->recv_iovecs[i].iov_base;
+		dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
+		dgram_len = dbuf->msgs[i].msg_len;
+		seq = dbuf->validate_func(dgram_len, dgram_base);
+		// TODO better feedback
+		if ( seq == 0 ) {
+			printf("#%zi invalid (%u)\n", i, seq);
+			dbuf->dgram_seq_numbers[dgram_index] = 0;
+		} else if ( seq == -1 ) {
+			printf("#%zi end\n", i);
+			dbuf->dgram_seq_numbers[dgram_index] = 0;
+			res = 0;
+		} else if ( seq < dbuf->win_base ) {
+			printf("#%zi past (%u)\n", i, seq);
+			dbuf->dgram_seq_numbers[dgram_index] = 0;
+		} else if ( seq >= dbuf->win_base + dbuf->dgram_count ) {
+			printf("#%zi future (%u)\n", i, seq);
+			dbuf->dgram_seq_numbers[dgram_index] = 0;
+		} else {
+			printf("#%zi valid (%u)\n", i, seq);
+			dbuf->dgram_seq_numbers[dgram_index] = seq;
+			dbuf->dgram_len[dgram_index] = dgram_len;
 		}
 	}
 
-	return recv_msg_count;
+	return res;
 }
 
 ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
-	int i, s, vlen;
+	size_t dgram_index, i, vlen;
+	unsigned int curr_seq, prev_seq, dgram_len;
+	ssize_t nwrite, total;
+
+	/* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
+	for (i=0; i < dbuf->dgram_count; i++) {
+		dbuf->dgram_ordered_seq_numbers[i].index = i;
+		dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
+	}
+	/* Inplace sorting of dgram_ordered_seq_numbers */
+	qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair);
+	
+	/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
+	for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) {
+		curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+
+		/* Skip empty dgram slot */
+		if ( curr_seq == 0 )
+			continue;
+
+		/* Skip dgram comming from the past */
+		if ( curr_seq < dbuf->win_base ) {
+			fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
+			continue;
+		}
+
+		/* Break if first dgram to write is not in buffer at all */
+		if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) {
+			fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base);
+			break;
+		}
+
+		/* Skip if next dgram is a dup */
+		if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) {
+			continue;
+		}
 
-	//TODO
-	s = 0;
-	vlen = 0;
+		/* Break if next seq dgram is missing */
+		if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+			break;
+		}
+
+		/* Normal case : curr_seq is the next dgram to write */
+		dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+		dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
+
+		dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */
+		dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+		dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */
+
+		total += dgram_len; /* Update counters */
+		vlen++;
+		dbuf->win_base = curr_seq;
+		prev_seq = curr_seq;
+	}
+
+	/* If nothing valid to write out */
+	if ( vlen == 0 ) {
+		return -1;
+	}
+
+	nwrite = writev(fd, dbuf->write_iovecs, vlen);
+	if ( nwrite < 0 ) {
+		perror("writev()");
+		return nwrite;
+	}
 
-	for (i=0; i<vlen; i++) {
-		dbuf->write_iovecs[i].iov_base = dbuf->recv_iovecs[s+i].iov_base + 10;
-		dbuf->write_iovecs[i].iov_len = dbuf->msgs[s+i].msg_len - 10;
+	if ( nwrite != total ) {
+		fprintf(stderr, "writev() short\n");
+		return nwrite;
 	}
 
-	return writev(fd, dbuf->write_iovecs, vlen);
+	return nwrite;
 }
 
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) {
 
 	dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
 	if (!dbuf) goto fail0;
 
 	dbuf->dgram_count = dgram_count;
 	dbuf->dgram_max_size = dgram_max_size;
+	dbuf->dgram_header_size = dgram_header_size;
 
 	dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
 	if (!dbuf->recv_iovecs) goto fail1;
@@ -118,11 +203,24 @@ dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size) {
 	dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
 	if (!dbuf->msgs) goto fail3;
 
+	dbuf->win_base = 1;
+	dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int));
+	if (!dbuf->dgram_seq_numbers) goto fail4;
+
+	dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t));
+	if (!dbuf->dgram_len) goto fail5;
+
+	dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair));
+	if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
+
 	dbuf->buf = calloc(dgram_count, dgram_max_size);
-	if (!dbuf->buf) goto fail4;
+	if (!dbuf->buf) goto fail7;
 
 	return dbuf;
 
+fail7:  free(dbuf->dgram_ordered_seq_numbers);
+fail6:	free(dbuf->dgram_len);
+fail5:	free(dbuf->dgram_seq_numbers);
 fail4:	free(dbuf->msgs);
 fail3:	free(dbuf->write_iovecs);
 fail2:	free(dbuf->recv_iovecs);
@@ -132,6 +230,10 @@ fail0:	return 0;
 
 void dgrambuf_free(dgrambuf_t *dbuf) {
 	if (dbuf && *dbuf) {
+		free((*dbuf)->buf);
+		free((*dbuf)->dgram_ordered_seq_numbers);
+		free((*dbuf)->dgram_len);
+		free((*dbuf)->dgram_seq_numbers);
 		free((*dbuf)->msgs);
 		free((*dbuf)->write_iovecs);
 		free((*dbuf)->recv_iovecs);
@@ -140,13 +242,14 @@ void dgrambuf_free(dgrambuf_t *dbuf) {
 	*dbuf = NULL;
 }
 
-size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
-	if (dbuf->buf_full) {
+int _compare_uint_pair(const void *pa, const void *pb) {
+	const struct uint_pair *a = pa;
+	const struct uint_pair *b = pb;
+	if (a->value < b->value)
+		return -1;
+	else if ( a->value > b->value )
+		return 1;
+	else
 		return 0;
-	} else if ( dbuf->buf_head < dbuf->buf_tail ) {
-		return dbuf->buf_tail - dbuf->buf_head - 1;
-	}// else {
-	return dbuf->dgram_count - (dbuf->buf_head - dbuf->buf_tail );
-	//}
 }
 
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index c515b8d..b74625d 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -9,10 +9,9 @@
 
 typedef struct dgrambuf_t *dgrambuf_t;
 
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size);
+dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size);
 void dgrambuf_free(dgrambuf_t *dbuf);
 
-size_t dgrambuf_free_count(const dgrambuf_t);
 void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
 
 
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index c832489..9315992 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -17,6 +17,7 @@
 #define MTU 1500
 #define MULTICAST_RECV_BUF (MTU-20-8)
 #define MULTICAST_SO_RCVBUF 425984
+#define DGRAM_HEADER_SIZE 8
 #define DEFAULT_MCAST_IP_STR "ff02::114"
 #define DEFAULT_PORT_STR "9000"
 
@@ -48,7 +49,7 @@ const char * const state_str[] = {
 void die(char* msg);
 void usage(char *msg);
 void arg_parse(int argc, char* argv[]);
-size_t get_available_mem();
+int get_available_mem_kb();
 void dgrambuf_init();
 uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
 void ack(uint32_t seq);
@@ -175,39 +176,7 @@ int wait_start_and_start_job() {
 
 
 int receive_data() {
-	/*
-	ssize_t nread;
-	uint32_t seq;
-	uint16_t datalen;
-
-	// Wait for a "data" datagram from the server
-	nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
- 	if (nread < 0 ) {	
-		perror("recvfrom() failed");
-		return -1;	
-	}
-	if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
-		seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
-		datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
-		if ( nread == (10 + datalen) ) {
-			ack(seq);
-			dgrambuf_memcpy_into(dgrambuf, recvbuf+10, datalen);
-		} else {
-			fprintf(stderr, "Short or inconsistent data #%u packet : nread==%zi, (10 + datalen)==%i\n", seq, nread, (10 + datalen));
-		}
-	}
-
-	return 1;
-	*/
-
-	unsigned int count;
-
-	count = dgrambuf_recvmmsg(dgrambuf, mcast_sock);
-	if (count < 0) {
-		return -1;
-	}
-
-	return 0;
+	return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
 }
 
 
@@ -217,6 +186,8 @@ void ack(uint32_t seq) {
 
 
 int finalize_job() {
+	//XXX Dummy test
+	dgrambuf_write(dgrambuf, 2);
 	return 0;	
 }
 int is_there_more_job() {
@@ -252,26 +223,22 @@ void arg_parse(int argc, char* argv[]) {
 	mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
 }
 
-size_t get_available_mem() {
+int get_available_mem_kb() {
 	char key[64];
-	int value;
-	int found=0;
-	unsigned long int mem_avail;
+	int res, value, found=0;
 	FILE * fh = fopen("/proc/meminfo", "r");
 	if ( fh ) {
 		while (!found && !feof(fh)) {
-			fscanf(fh, "%63s %i kB\n", key, &value);
+			res = fscanf(fh, "%63s %i kB\n", key, &value);
+			if ( res < 0 )
+				break;
 			found = ( strncmp("MemAvailable:", key, 12) == 0 );
 		}
+		fclose(fh);
 	}
 
-	if ( found ) {
-		mem_avail = value * 1024;
-		if ( mem_avail > (size_t)-1 ) {
-			return -1;
-		} else {
-			return mem_avail;
-		}
+	if ( found && value > 0 ) {
+		return value;
 	}
 
 	return 0;
@@ -280,34 +247,34 @@ size_t get_available_mem() {
 void dgrambuf_init() {
 	/* Guess dgrambuf size from global free memory */
 	size_t dgram_count;
-	size_t avail_mem = get_available_mem();
+	int avail_mem = get_available_mem_kb();
+
 	if ( avail_mem < MULTICAST_SO_RCVBUF ) {
 		dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
 	} else {
-		dgram_count = avail_mem / MULTICAST_RECV_BUF / 2;
+		dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
 	}
+	//XXX Dummy
+	dgram_count = 3;
+	fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count);
 
 	/* Allocate dgrambuf */
-	dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF);
+	dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE);
 	if ( dgrambuf == NULL ) {
 		perror("dgrambuf_new/malloc");
 		exit(EXIT_FAILURE);
 	} 
 
-	printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+	//printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
 	dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
 }
 
 unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
-	uint32_t seq;
-	uint16_t datalen;
-
-	if ( nread >= 10 && strncmp("data", recvbuf, 4) == 0 ) {
-		seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
-		datalen = ntohs( *( (uint16_t *) recvbuf+4 ) );
-		if ( nread == (10 + datalen) ) {
-			return seq;
-		}
+	if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) {
+		return ntohl( *( (uint32_t *) recvbuf+1 ) );
+	}
+	if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) {
+		return -1;
 	}
 
 	return 0;
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index da73353..f86af84 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -264,15 +264,22 @@ int start_job() {
 
 int send_data() {
 	ssize_t nwrite;
-	char buf[] = "dataXXXXXXJe suis à la plage.";
-	int paylen = strlen(buf)-10;
+	char buf[] = "dataXXXXJe suis à la plage.";
+	int paylen = strlen(buf)-8;
 	int seq = 1;
 
-	//FIXME use http://troydhanson.github.io/tpl/index.html
+	//XXX Dummy
+	*( (uint32_t *) buf+1 ) = htonl(3);
+	sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	*( (uint32_t *) buf+1 ) = htonl(4);
+	sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	*( (uint32_t *) buf+1 ) = htonl(2);
+	sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+
+
 	*( (uint32_t *) buf+1 ) = htonl(seq);
-	*( (uint16_t *) buf+4 ) = htons(paylen);
 
-	nwrite = sendto(mcast_sock, buf, 10+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+	nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
 	if ( nwrite < 0 ) {
 		perror("sendto() failed");
 		return -1;
-- 
cgit v1.2.3