summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-17 14:21:26 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-17 14:21:26 +0200
commit3f0a442799955f56b2c77aabd6bc7aa4458718b4 (patch)
tree34504e5a01f755be3e3130578d6c0be86cf6b67a
parent96e0fd1e6571c41e102d4780f6e67c0736beef35 (diff)
downloadeficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.tar.gz
eficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.tar.bz2
eficast-3f0a442799955f56b2c77aabd6bc7aa4458718b4.zip
API changes, pedandic fixes, dgrambuf stats & info field, recvmmsg() with alarm(), partial writev() support.
-rw-r--r--mcastseed/src/dgrambuf.c425
-rw-r--r--mcastseed/src/dgrambuf.h24
-rw-r--r--mcastseed/src/dgrambuf_test.c16
-rw-r--r--mcastseed/src/mcastleech.c133
-rw-r--r--mcastseed/src/mcastseed.c86
5 files changed, 486 insertions, 198 deletions
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index 41ebc8a..75b82a6 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -10,67 +10,107 @@
#include <sys/socket.h> /* recvmmsg() _GNU_SOURCE */
#include <stdlib.h> /* calloc(), free(), qsort() */
#include <stdio.h> /* perror() */
+#include <errno.h> /* errno */
#include <string.h> /* memset() */
#include <sys/uio.h> /* writev() */
-#include <sys/param.h> /* MIN() */
+#include <stdint.h> /* uint8_t, uint64_t */
+#include <signal.h> /* sigaction() */
+#include <unistd.h> /* alarm() */
+#include <limits.h> /* SSIZE_MAX */
struct uint_pair {
unsigned int index;
unsigned int value;
};
+struct dgrambuf_stats_t {
+ uint64_t dgrambuf_read_on_full;
+ uint64_t recvmmsg_calls, recv_dgrams, recv_byte;
+ uint64_t dgram_invalid, dgram_past, dgram_future, dgram_dup, dgram_end_marker;
+ uint64_t qsort_calls;
+ uint64_t writev_calls, write_partial, write_byte;
+};
+
struct dgrambuf_t {
+ /* dgram validation after receive, takes dgram len and a pointer to the start of dgram data
+ Must returns dgram seq number or 0 if invalid dgram */
+ int (*validate_func)(unsigned int, void *, unsigned int*);
+
+ struct dgrambuf_stats_t stats;
+ struct sigaction sa_sigalrm;
+
size_t dgram_slots;
size_t dgram_free_count;
size_t dgram_max_size;
size_t dgram_header_size;
size_t iovec_slots;
- struct iovec *iov_recv;
- struct iovec *iov_write;
struct mmsghdr *msgs;
+ struct iovec *iov_recv;
+ struct iovec *iov_write; /* malloc'ed array */
+
+ struct iovec *partial_write_iov; /* Pointer to an item of iov_write[] */
+ size_t partial_write_remaining_iovcnt;
+ size_t partial_write_remaining_bytes;
+ unsigned int dgram_seq_last;
unsigned int dgram_seq_base;
unsigned int *dgram_len;
- unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+ unsigned int *dgram_seq_numbers; /* Decoded datagram sequence number for each dgram_slot of buf */
int dgram_ordered_seq_numbers_is_dirty;
- struct uint_pair *dgram_ordered_seq_numbers;
-
- void *buf;
+ struct uint_pair *dgram_ordered_seq_numbers; /* Pairs to track original items ranks after qsort() */
- unsigned int (*validate_func)(unsigned int, void *);
- //TODO pthread_mutex_lock
+ uint8_t *buf; /* malloc-ed 2d byte array : buf[dgram_slots][dgram_max_size] */
};
-int _compare_uint_pair(const void *pa, const void *pb);
+void _sigalrm_handler(int signum);
+int _compare_uint_pair(const void *pa, const void *pb);
void _update_ordered_seq_numbers(dgrambuf_t dbuf);
-void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) ) {
- dbuf->validate_func = func;
+#ifndef HAVE_MIN_SIZE_T
+size_t min_size_t(size_t a, size_t b) { return (a<b)?a:b; }
+#endif /*HAVE_MIN_SIZE_T*/
+
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *)) {
+ dbuf->validate_func = validate_func;
}
-size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
+size_t dgrambuf_get_free_count(const dgrambuf_t dbuf) {
return dbuf->dgram_free_count;
}
-int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
- void *dgram_base;
- size_t vlen, i, dgram_index;
- int recv_msg_count, res;
+int dgrambuf_everything_was_received(dgrambuf_t dbuf) {
+ /*TODO really implement this */
+ return dbuf->dgram_seq_last && ( dbuf->dgram_seq_base - 1 == dbuf->dgram_seq_last );
+}
+
+ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info) {
+ uint8_t *dgram_base;
+ ssize_t recv_byte;
+ size_t i, vlen, dgram_index, recv_msg_count;
+ int res;
unsigned int seq, dgram_len;
+ struct sigaction sa_old;
- /* Buffer is full, can't receive */
- if ( dbuf->dgram_free_count == 0 ) {
- return -1;
- }
+ /* Info ptr is mandatory */
+ *info = 0;
/* Validate function is mandatory */
if ( !dbuf->validate_func ) {
- return -2;
+ return -3;
+ }
+
+ /* Buffer is full, can't receive */
+ if ( dbuf->dgram_free_count == 0 ) {
+ dbuf->stats.dgrambuf_read_on_full++;
+ *info |= DGRAMBUF_RECV_OVERWRITE;
+ /*FIXME : this locks everything if buf full + next seq missing*/
+ return 0;
}
/* Initialize recvmmsg() syscall arguments */
for (i=0, vlen=0; i < dbuf->dgram_slots; i++) {
+ /*XXX linear search is not optimal, notably if is_dirty == 0*/
if ( dbuf->dgram_seq_numbers[i] == 0 ) {
dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size;
@@ -83,48 +123,92 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
}
}
- /* Do the syscall */
- recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
- if (recv_msg_count < 0) {
- perror("recvmmsg()");
- return recv_msg_count;
+ /* Do the syscall with alarm() to circumvent bad behavior in recvmmsg(2) timeout */
+ if (timeout) {
+ sigaction(SIGALRM, &(dbuf->sa_sigalrm), &sa_old);
+ alarm(timeout);
+ }
+ res = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
+ if (timeout) {
+ alarm(0);
+ sigaction(SIGALRM, &sa_old, NULL);
+ }
+ dbuf->stats.recvmmsg_calls++;
+
+ if (res < 0) {
+ if ( errno == EINTR ) {
+ recv_msg_count = 0;
+ *info |= DGRAMBUF_RECV_EINTR;
+ } else {
+ perror("recvmmsg()");
+ return -1;
+ }
+ } else {
+ recv_msg_count = res;
}
+
if (recv_msg_count > 0) {
dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+ dbuf->stats.recv_dgrams += recv_msg_count;
+ if ( recv_msg_count == vlen ) { /* XXX -Wsigncompare hints problems here and above */
+ *info |= DGRAMBUF_RECV_IOVEC_FULL;
+ }
}
/* Check all received messages */
- res = 1;
- for (i=0; i<recv_msg_count; i++) {
+ for (i=0, recv_byte=0; i<recv_msg_count; i++) {
dgram_base = dbuf->iov_recv[i].iov_base;
dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
dgram_len = dbuf->msgs[i].msg_len;
- seq = dbuf->validate_func(dgram_len, dgram_base);
-
- // TODO better feedback
- if ( seq == -1 ) {
- fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i);
- res = 0;
- } else if ( seq == 0 ) {
- fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq);
- } else if ( seq < dbuf->dgram_seq_base ) {
- fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq);
- } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
- fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq);
- } else {
- //fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq);
- dbuf->dgram_seq_numbers[dgram_index] = seq;
- dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
- dbuf->dgram_len[dgram_index] = dgram_len;
- dbuf->dgram_free_count--;
+
+ /* dgrambuf_new() adjust iovec_len to prevent overflows on ssize_t*/
+ recv_byte += dgram_len;
+
+ res = dbuf->validate_func(dgram_len, dgram_base, &seq);
+ switch (res) {
+ case 1:
+ if ( seq < dbuf->dgram_seq_base ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zu past (%u)\n", i, seq);
+ dbuf->stats.dgram_past++;
+ } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zu future (%u)\n", i, seq);
+ dbuf->stats.dgram_future++;
+ *info |= DGRAMBUF_RECV_FUTURE_DGRAM;
+ } else {
+ /*fprintf(stderr, "dgrambuf_recvmmsg(): #%zu valid (%u)\n", i, seq);*/
+ *info |= DGRAMBUF_RECV_VALID_DGRAM;
+ dbuf->dgram_seq_numbers[dgram_index] = seq;
+ dbuf->dgram_ordered_seq_numbers_is_dirty = 1;
+ dbuf->dgram_len[dgram_index] = dgram_len;
+ dbuf->dgram_free_count--;
+ }
+ break;
+ case 2:
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zu finalize (%u)\n", i, seq);
+ dbuf->stats.dgram_end_marker++;
+ dbuf->dgram_seq_last = seq;
+ *info |= DGRAMBUF_RECV_FINALIZE;
+ break;
+ default:
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zu invalid\n", i);
+ dbuf->stats.dgram_invalid++;
+ break;
}
}
- return res;
+ dbuf->stats.recv_byte += recv_byte;
+
+ return recv_byte;
}
int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
unsigned int next_dgram_seq;
+
+ /* Last write was partial, so there is more to write */
+ if ( dbuf->partial_write_remaining_bytes > 0 ) {
+ return 1;
+ }
+
/* Buffer is empty, nothing to write */
if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
return 0;
@@ -144,80 +228,158 @@ int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf) {
return 1;
}
-ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
+int dgrambuf_have_received_everything(dgrambuf_t dbuf) {
+ if (dbuf) {};
+ return 0; /*FIXME to be implemented*/
+}
+
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info) {
size_t dgram_index, i, vlen;
unsigned int curr_seq, prev_seq, dgram_len;
- ssize_t nwrite, total;
-
- /* Write needs up to date ordered_seq_numbers */
- if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
- _update_ordered_seq_numbers(dbuf);
- }
- /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
- prev_seq=0, total=0;
- for (i=dbuf->dgram_free_count, vlen=0; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
- curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
-
- /* Skip empty dgram slot */
- if ( curr_seq == 0 ) {
- fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i);
- continue;
- }
- /* Skip if current dgram is a dup of the previous */
- if ( curr_seq == prev_seq ) {
- goto mark_empty;
- }
- /* Skip dgram comming from the past */
- if ( curr_seq < dbuf->dgram_seq_base ) {
- fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
- goto mark_empty;
- }
- /* Stop if first dgram to write is not in buffer at all */
- if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
- fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
- break;
- }
- /* Stop if current seq dgram is missing */
- if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
- break;
+ ssize_t nwrite, total, remain, len;
+ struct iovec *iov;
+
+ /* FIXME Info ptr is mandatory */
+ *info = 0;
+
+ if ( dbuf->partial_write_remaining_bytes > 0 ) {
+ /* Previous writev() was partial, continue it */
+ iov = dbuf->partial_write_iov;
+ vlen = dbuf->partial_write_remaining_iovcnt;
+ total = dbuf->partial_write_remaining_bytes;
+ } else if ( ! dgrambuf_have_data_ready_to_write(dbuf) ) {
+ return 0; /* XXX Inline code ? */
+ } else {
+ /* Prepare a write batch, buffer state is in dgram_seq_numbers */
+ iov = dbuf->iov_write;
+ vlen = 0;
+ total = 0;
+ /* Write needs up to date ordered_seq_numbers (dgrams could be unsorted or some are lost)*/
+ if ( dbuf->dgram_ordered_seq_numbers_is_dirty ) {
+ _update_ordered_seq_numbers(dbuf);
}
+ /* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
+ prev_seq = 0;
+ for (i = dbuf->dgram_free_count; i < dbuf->dgram_slots && vlen < dbuf->iovec_slots; i++) {
+ curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
+
+ /* Skip empty dgram slot */
+ if ( curr_seq == 0 ) {
+ fprintf(stderr, "Oops : found empty slot (i==%zu)\n", i);
+ continue;
+ }
+ /* Skip if current dgram is a dup of the previous */
+ if ( curr_seq == prev_seq ) {
+ dbuf->stats.dgram_dup++;
+ goto mark_empty;
+ }
+ /* Skip dgram comming from the past */
+ if ( curr_seq < dbuf->dgram_seq_base ) {
+ fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
+ goto mark_empty;
+ }
+ /* Stop if first dgram to write is not in buffer at all */
+ if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
+ fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
+ break;
+ }
+ /* Stop if current seq dgram is missing */
+ if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
+ break;
+ }
- /* Normal case : curr_seq is the next dgram to write */
- dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
- dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
+ /* Normal case : curr_seq is the next dgram to write */
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
- /* Setup iovecs */
- dbuf->iov_write[vlen].iov_len = dgram_len;
- dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+ /* Setup iovecs */
+ dbuf->iov_write[vlen].iov_len = dgram_len;
+ dbuf->iov_write[vlen].iov_base = dbuf->buf
+ + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
- /* Update counters */
- total += dgram_len;
- dbuf->dgram_seq_base = curr_seq + 1;
- prev_seq = curr_seq;
- vlen++;
+ /* Update counters */
+ total += dgram_len;
+ prev_seq = curr_seq;
+ vlen++;
- /* Mark dgram slot about to be written out as empty for next read */
- //XXX These cause harm if writev() is incomplete
+ /* Mark dgram slot about to be written out as empty for next read */
+ /*FIXME These cause harm if writev() is incomplete*/
+ dbuf->dgram_seq_base = curr_seq + 1;
mark_empty:
- /* Mark slot as empty */
- dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
- dbuf->dgram_seq_numbers[dgram_index] = 0;
- dbuf->dgram_free_count++;
- }
+ /* Mark slot as empty */
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ dbuf->dgram_free_count++;
+ }
- /* Nothing valid to write out (but buffer not empty, missing the next dgram) */
- if ( vlen == 0 ) {
- fprintf(stderr, "Oops : nothing to write at all\n");
- return -1;
+ /* Nothing valid to write out (but buffer not empty, missing the next dgram) */
+ if ( vlen == 0 ) {
+ fprintf(stderr, "Oops : nothing to write at all\n");
+ return -2;
+ }
+
+ if ( vlen == dbuf->iovec_slots ) {
+ *info |= DGRAMBUF_WRITE_IOVEC_FULL;
+ }
}
- nwrite = writev(fd, dbuf->iov_write, vlen);
+ nwrite = writev(fd, iov, vlen);
+ dbuf->stats.writev_calls++;
if ( nwrite < 0 ) {
+ /* Treat non fatal errors */
+ if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ /* Keeps some state informations for retry */
+ dbuf->partial_write_remaining_bytes = total;
+ dbuf->partial_write_remaining_iovcnt = vlen;
+ dbuf->partial_write_iov = iov;
+ *info |= DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR;
+ return 0;
+ }
+ /* Print fatal errors and bail out */
perror("writev()");
- } else if ( nwrite != total ) {
- //FIXME : everything break if there because all non writed data will be overwritted at next read
- // Make a loop here could make dgrambuf_writev() unbounded in run time
- fprintf(stderr, "writev() short\n");
+ return -1;
+ }
+
+ /* XXX Remove me when code is correct */
+ if ( nwrite > total ) {
+ fprintf(stderr, "Fatal bug : nwrite > total\n");
+ return -3;
+ }
+ if ( nwrite > 0 ) {
+ dbuf->stats.write_byte += nwrite;
+ *info |= DGRAMBUF_WRITE_SUCCESS;
+ }
+
+ /* Check if the write was partially done */
+ dbuf->partial_write_remaining_bytes = total - nwrite;
+ if ( dbuf->partial_write_remaining_bytes > 0 ) {
+ *info |= DGRAMBUF_WRITE_PARTIAL;
+ dbuf->stats.write_partial++;
+ /* Find the partially written iov and update it */
+ remain = nwrite;
+ for (i=0; i<vlen; i++) {
+ len = dbuf->iov_write[i].iov_len;
+ if ( remain < len ) {
+ dbuf->partial_write_remaining_iovcnt = vlen - i;
+ dbuf->partial_write_iov = dbuf->iov_write + i;
+
+ dbuf->iov_write[i].iov_base =
+ (uint8_t *) dbuf->iov_write[i].iov_base + remain;
+ dbuf->iov_write[i].iov_len -= remain;
+ break;
+ }
+ remain -= len;
+ }
+ if ( i == vlen ) {
+ /* FIXME : this happens */
+ fprintf(stderr, "Fatal bug, failed to find partial iov after partial write\n");
+ return -3;
+ }
+
+ } else {
+ /* Wipe outdated values for clarity in debug mode (only _bytes is use on branching) */
+ dbuf->partial_write_iov = NULL;
+ dbuf->partial_write_remaining_iovcnt = 0;
}
return nwrite;
@@ -228,21 +390,38 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
if (!dbuf) goto fail0;
+ dbuf->validate_func = NULL;
+ /* Implicit with dbuf = calloc(...)
+ memset(&(dbuf->stats), 0, sizeof(struct dgrambuf_stats_t));
+ memset(&(dbuf->sa_sigalrm), 0, sizeof(struct sigaction));
+ */
+ dbuf->sa_sigalrm.sa_handler = _sigalrm_handler;
+
dbuf->dgram_slots = dgram_slots;
dbuf->dgram_free_count = dgram_slots;
dbuf->dgram_max_size = dgram_max_size;
dbuf->dgram_header_size = dgram_header_size;
- dbuf->iovec_slots = MIN(iovec_slots,dgram_slots);
+
+ /* writev() and dgrambuf_recvmmsg accumulates read/write bytes in ssize_t */
+ iovec_slots = min_size_t(iovec_slots, SSIZE_MAX/dgram_max_size);
+ dbuf->iovec_slots = iovec_slots;
+
+ dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr));
+ if (!dbuf->msgs) goto fail1;
dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec));
- if (!dbuf->iov_recv) goto fail1;
+ if (!dbuf->iov_recv) goto fail2;
dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec));
- if (!dbuf->iov_write) goto fail2;
+ if (!dbuf->iov_write) goto fail3;
- dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr));
- if (!dbuf->msgs) goto fail3;
+ /* Implicit with dbuf = calloc(...)
+ dbuf->partial_write_iov = NULL;
+ dbuf->partial_write_remaining_iovcnt = 0;
+ dbuf->partial_write_remaining_bytes = 0;
+ dbuf->dgram_seq_last = 0;
+ */
dbuf->dgram_seq_base = 1;
dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int));
if (!dbuf->dgram_len) goto fail4;
@@ -262,9 +441,9 @@ dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_
fail7: free(dbuf->dgram_ordered_seq_numbers);
fail6: free(dbuf->dgram_seq_numbers);
fail5: free(dbuf->dgram_len);
-fail4: free(dbuf->msgs);
-fail3: free(dbuf->iov_write);
-fail2: free(dbuf->iov_recv);
+fail4: free(dbuf->iov_write);
+fail3: free(dbuf->iov_recv);
+fail2: free(dbuf->msgs);
fail1: free(dbuf);
fail0: return NULL;
}
@@ -275,12 +454,17 @@ void dgrambuf_free(dgrambuf_t *dbuf) {
free((*dbuf)->dgram_ordered_seq_numbers);
free((*dbuf)->dgram_seq_numbers);
free((*dbuf)->dgram_len);
- free((*dbuf)->msgs);
free((*dbuf)->iov_write);
free((*dbuf)->iov_recv);
+ free((*dbuf)->msgs);
free(*dbuf);
+ *dbuf = NULL;
}
- *dbuf = NULL;
+}
+
+void _sigalrm_handler(int signum) {
+ /* Nothing to do except interrupting the pending syscall */
+ if (signum) {} /* Avoid compiler warning */
}
int _compare_uint_pair(const void *pa, const void *pb) {
@@ -295,7 +479,7 @@ int _compare_uint_pair(const void *pa, const void *pb) {
}
void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
- ssize_t i;
+ size_t i;
/* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
for (i=0; i < dbuf->dgram_slots; i++) {
dbuf->dgram_ordered_seq_numbers[i].index = i;
@@ -303,6 +487,7 @@ void _update_ordered_seq_numbers(dgrambuf_t dbuf) {
}
/* Inplace sorting of dgram_ordered_seq_numbers */
qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
+ dbuf->stats.qsort_calls++;
dbuf->dgram_ordered_seq_numbers_is_dirty = 0;
}
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index fab7649..f405757 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -7,17 +7,31 @@
*/
#include <stdlib.h> /* size_t */
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_FUTURE_DGRAM 1 << 5
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 6
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+
typedef struct dgrambuf_t *dgrambuf_t;
dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots);
void dgrambuf_free(dgrambuf_t *dbuf);
-size_t dgrambuf_free_count(const dgrambuf_t);
-void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
+size_t dgrambuf_get_free_count(const dgrambuf_t);
+int dgrambuf_everything_was_received(dgrambuf_t dbuf);
+void dgrambuf_set_validate_func(dgrambuf_t dbuf, int (*validate_func)(unsigned int, void *, unsigned int *));
int dgrambuf_have_data_ready_to_write(dgrambuf_t dbuf);
+int dgrambuf_have_received_everything(dgrambuf_t dbuf);
-
-int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd);
-ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd);
+/* Warning : dgrambuf_recvmmsg sets and restore SIGALRM handler if timeout != 0 */
+ssize_t dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd, int timeout, int *info);
+ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd, int *info);
#endif /* DGRAMBUF_H */
diff --git a/mcastseed/src/dgrambuf_test.c b/mcastseed/src/dgrambuf_test.c
index 1b96e3d..6f9ef22 100644
--- a/mcastseed/src/dgrambuf_test.c
+++ b/mcastseed/src/dgrambuf_test.c
@@ -15,13 +15,17 @@ int open_test_socket();
*/
int main() {
- int res=1, sockfd=open_test_socket();
- dgrambuf_t dgb=dgrambuf_new(3, 50);
- while (res > 0) {
- res = dgrambuf_recvmmsg(dgb, sockfd);
+ int res, sockfd, info;
+ dgrambuf_t dgb;
+
+ sockfd = open_test_socket();
+ dgb = dgrambuf_new(3, 50, 8, 8);
+
+ do {
+ res = dgrambuf_recvmmsg(dgb, sockfd, 1, &info);
printf("dgrambuf_recvmmsg() => %i\n", res);
- printf("dgrambuf_free_count => %zi\n", dgrambuf_free_count(dgb));
- }
+ printf("dgrambuf_free_count => %zu\n", dgrambuf_get_free_count(dgb));
+ } while ( res > 0 );
return 0;
}
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index cdd0d9c..df069ac 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -11,13 +11,14 @@
#include <unistd.h> /* close() */
#include <stdio.h> /* fprintf(), stderr */
#include <stdlib.h> /* EXIT_SUCCESS */
+#include <fcntl.h> /* fcntl() */
#include "msock.h"
#include "dgrambuf.h"
#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF_WANTED 425984
-//XXX Make it dynamic, with the effective value of so_rcvbuf
+/*XXX Make it dynamic, with the effective value of so_rcvbuf */
#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
#define DGRAM_HEADER_SIZE 8
@@ -54,9 +55,10 @@ void usage(char *msg);
void arg_parse(int argc, char* argv[]);
void fsm_trace(int state);
int get_available_mem_kb();
+void set_O_NONBLOCK(int fd, int set);
void dgrambuf_init();
-uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
-void ack(uint32_t seq);
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
+int send_status(int state, int info_r, int info_w);
/* Parts of the "protocol", definitions are after main() */
int wait_hello_and_connect_back();
@@ -72,6 +74,12 @@ int main(int argc, char* argv[]) {
arg_parse(argc, argv);
dgrambuf_init();
+ /*XXX Maybe elsewhere, when popen'ing target program */
+ set_O_NONBLOCK(1, 1);
+
+/* XXX Dummy */
+ fcntl(1, F_SETPIPE_SZ, 4096);
+ fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
/* Finite state machine */
while ( state > 0 ) {
fsm_trace(state);
@@ -85,7 +93,7 @@ int main(int argc, char* argv[]) {
else state = -1;
break;
case 4: state = (finalize_job() == 0)?5:-2; break;
- case 5: state = (is_there_more_job() == 0)?2:0; break;
+ case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
}
}
fsm_trace(state);
@@ -143,6 +151,7 @@ int wait_hello_and_connect_back() {
if ( ucast_sock > 0 ) {
close(ucast_sock);
}
+ /* FIXME : ucast_client_socket() use DNS resolver and could block */
ucast_sock = ucast_client_socket(hbuf,port);
if(ucast_sock < 0) {
fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
@@ -179,30 +188,75 @@ int wait_start_and_start_job() {
return 0;
}
-
+/*
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+*/
int receive_data() {
- ssize_t nwrite;
- if ( dgrambuf_have_data_ready_to_write(dgrambuf) ) {
- nwrite=dgrambuf_write(dgrambuf, 1);
- fprintf(stderr, "dgrambuf_write => %zi\n", nwrite);
+ int info_r, info_w, res;
+ ssize_t nread, nwrite;
+
+ /* Read (blocking, timeout = 1 sec) */
+ nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
+ if ( nread < 0 ) {
+ return nread;
+ }
+
+ /* Write (non-blocking) */
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
}
- return dgrambuf_recvmmsg(dgrambuf, mcast_sock);
-}
+ fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);
+
+ /* Consider sending status back to seeder */
+ res = send_status(1, info_r, info_w);
+ if ( res < 0 ) {
+ return res;
+ }
+
+ if ( dgrambuf_everything_was_received(dgrambuf) ) {
+ return 0;
+ }
-void ack(uint32_t seq) {
- //TODO
+ return 1;
}
int finalize_job() {
- //XXX Dummy test
- ssize_t res;
- while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) {
- fprintf(stderr, "dgrambuf_write => %zi\n", res);
+ ssize_t nwrite;
+ int info_w, res;
+
+ /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
+ set_O_NONBLOCK(1, 0);
+
+ /* Flush the whole buffer */
+ do {
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
+ }
+ fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
+ } while ( nwrite > 0);
+
+ /* Inform the seeder that have have finished */
+ res = send_status(2, 0, info_w);
+ if ( res < 0 ) {
+ return res;
}
- return 0;
+
+ return 0;
}
+
int is_there_more_job() {
return 1;
}
@@ -272,6 +326,23 @@ int get_available_mem_kb() {
return 0;
}
+void set_O_NONBLOCK(int fd, int set) {
+ int res, flags;
+
+ flags = fcntl(fd, F_GETFL);
+ if ( flags == -1 ) {
+ perror("fcntl(1, F_GETFL)");
+ }
+ if ( set ) {
+ res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ } else {
+ res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
+ }
+ if ( res == -1 ) {
+ perror("fcntl(1, F_SETFL)");
+ }
+}
+
void dgrambuf_init() {
/* Guess dgrambuf size from global free memory */
size_t dgram_count;
@@ -282,8 +353,9 @@ void dgrambuf_init() {
} else {
dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
- //XXX Dummy
+ /* XXX Dummy
dgram_count = 5;
+ */
/* Allocate dgrambuf */
dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
@@ -292,17 +364,28 @@ void dgrambuf_init() {
exit(EXIT_FAILURE);
}
- fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}
-unsigned int validate_data_dgram(unsigned int nread, void *recvbuf ) {
- if ( nread >= DGRAM_HEADER_SIZE && strncmp("data", recvbuf, 4) == 0 ) {
- return ntohl( *( (uint32_t *) recvbuf+1 ) );
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {
+
+ if ( nread < DGRAM_HEADER_SIZE ) {
+ return 0;
}
- if ( nread >= 5 && strncmp("final", recvbuf, 5) == 0 ) {
- return -1;
+ if ( strncmp("data", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 1;
+ }
+ if ( strncmp("end:", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 2;
}
+ return 0;
+}
+int send_status(int state, int info_r, int info_w) {
+ if ( state && info_r && info_w ) {}
+ /* TODO Implement it */
return 0;
}
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index 09cadac..6440fc6 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -82,31 +82,31 @@ int main(int argc, char *argv[]) {
switch ( state ) {
case 1: res = send_hello(); state = (res==0)?2:-1; break;
case 2: res = accept_pending_clients_or_wait_a_bit();
- if (res==0) state = 2; // Some clients has just come in, try to get more
- else if (res==1) state = 1; // Nothing new. Keep accepting clients after another hello
- else if (res==2) state = 3; // Wanted clients are accepted
- else state = -2;
- break;
+ if (res==0) state = 2; /* Some clients has just come in, try to get more */
+ else if (res==1) state = 1; /* Nothing new. Keep accepting clients after another hello */
+ else if (res==2) state = 3; /* Wanted clients are accepted */
+ else state = -2;
+ break;
case 3: res = start_job();
- if (res==0) state = 3; // Keep trying to convince every client to start
- else if (res==1) state = 4; // All clients have started the job pipe
- else if (res==2) state = 4; // There is dead clients but all alive are ready to go
- else state = -3;
- break;
+ if (res==0) state = 3; /* Keep trying to convince every client to start */
+ else if (res==1) state = 4; /* All clients have started the job pipe */
+ else if (res==2) state = 4; /* There is dead clients but all alive are ready to go */
+ else state = -3;
+ break;
case 4: res = send_data();
- if (res==0) state = 4;
- else if (res==1) state = 5; // All data sent
- else state = -4;
- break;
+ if (res==0) state = 4;
+ else if (res==1) state = 5; /* All data sent */
+ else state = -4;
+ break;
case 5: res = wait_all_finalize_job();
- if (res==0) state = 5;
- else if (res==1) state = 6;
- else state = -5;
+ if (res==0) state = 5;
+ else if (res==1) state = 6;
+ else state = -5;
case 6: res = is_there_more_job();
- if (res==0) state = 0;
- else if (res==1) state = 3;
- else state = -6;
- break;
+ if (res==0) state = 0;
+ else if (res==1) state = 3;
+ else state = -6;
+ break;
}
}
fsm_trace(state);
@@ -144,7 +144,7 @@ int accept_pending_clients_or_wait_a_bit() {
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
- FD_SET(0,&readfds); // Read from stdin. Will never work as is on Windows, requires threads and so.
+ FD_SET(0,&readfds);
FD_SET(ucast_sock,&readfds);
FD_SET(ucast_sock,&exceptfds);
timeout.tv_sec = 2;
@@ -158,7 +158,7 @@ int accept_pending_clients_or_wait_a_bit() {
if ( res > 0 ) {
if (FD_ISSET(ucast_sock, &readfds)) {
- //TODO : this assumes that the event is an accept() while ones could be send data there
+ /*TODO : this assumes that the event is an accept() while ones could be send data there */
if ( clients_next >= MAX_CLIENTS ) {
fprintf(stderr, "%s\n", "Bouncing client, MAX_CLIENTS reached");
close(accept(ucast_sock, NULL, 0));
@@ -170,13 +170,13 @@ int accept_pending_clients_or_wait_a_bit() {
clients_next++;
}
}
- //TODO : drop this keybord read with accept(), this is not portable
+ /*TODO : drop this keybord read with accept(), this is not portable */
if ( FD_ISSET(0, &readfds)) {
nread = read(0, readbuf, READ_BUF_LEN);
if ( nread <= 0 ) {
fprintf(stderr, "%s\n", "lost stdin");
}
- // User wants to go now
+ /* User wants to go now */
return 2;
}
if (FD_ISSET(ucast_sock, &exceptfds)) {
@@ -185,7 +185,7 @@ int accept_pending_clients_or_wait_a_bit() {
}
}
if (res == 0 ) {
- // nothing happened before timeout
+ /* Nothing happened before timeout */
return 1;
}
return 0;
@@ -243,7 +243,7 @@ int start_job() {
fprintf(stderr, "unexpected data from %i\n", i);
clients[i].state = 2;
} else {
- // Received "ready" ack from client
+ /* Received "ready" ack from client */
clients[i].state = 1;
}
}
@@ -256,7 +256,7 @@ int start_job() {
all_non_dead_ready &= (clients[i].state == 1);
}
}
- // (res == 0 ) nothing happened before timeout
+ /* (res == 0 ) nothing happened before timeout */
if ( all_ready )
return 1;
@@ -270,33 +270,34 @@ void send_fake(char buf[], int paylen, int i) {
*( (uint32_t *) buf+1 ) = htonl(i);
snprintf(buf+29, 5, "%04i", i);
*( (char *) buf+33 ) = ')';
- sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
}
int send_data() {
ssize_t nwrite;
char buf[] = "dataXXXXJe suis à la plage (XXXX).\n";
- int paylen = strlen(buf)-8;
+ int paylen = strlen(buf);
int i;
- //XXX Dummy
+ /* XXX Dummy */
send_fake(buf, paylen, 5);
send_fake(buf, paylen, 4);
-/*
- for (i=6; i<=300; i+=2) {
+ send_fake(buf, paylen, 3);
+
+ for (i=6; i<=300; i+=2) {
send_fake(buf, paylen, i);
}
- for (i=7; i<=300; i+=2) {
+ for (i=7; i<=300; i+=2) {
send_fake(buf, paylen, i);
}
-*/
+
send_fake(buf, paylen, 1);
send_fake(buf, paylen, 1);
send_fake(buf, paylen, 2);
*( (uint32_t *) buf+1 ) = htonl(3);
- buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 19;
- nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 27;
+ nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
if ( nwrite < 0 ) {
perror("sendto() failed");
return -1;
@@ -316,10 +317,11 @@ int wait_all_finalize_job() {
int all_non_dead_done;
int i, res;
SOCKET client_sock;
- const char *payload = "final";
- int paylen = strlen(payload);
+ char buf[] = "end:XXXX";
+ int paylen = strlen(buf);
- nwrite = sendto(mcast_sock, payload, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+ *( (uint32_t *) buf+1 ) = htonl(300);
+ nwrite = sendto(mcast_sock, buf, paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
if ( nwrite < 0 ) {
perror("sendto() failed");
return -1;
@@ -360,7 +362,7 @@ int wait_all_finalize_job() {
fprintf(stderr, "unexpected data from %i\n", i);
clients[i].state = 2;
} else {
- // Received "done." ack from client
+ /* Received "done." ack from client */
clients[i].state = 3;
}
}
@@ -372,7 +374,7 @@ int wait_all_finalize_job() {
all_non_dead_done &= (clients[i].state == 3);
}
}
- // (res == 0 ) nothing happened before timeout
+ /* (res == 0 ) nothing happened before timeout */
if ( all_non_dead_done )
return 1;