summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-03 10:46:30 +0200
committerLudovic Pouzenc <ludovic@pouzenc.fr>2016-07-03 10:46:30 +0200
commit4e05e2ffe67e922980dd9efda6790ccdfcda6ac4 (patch)
tree3939788d2c674981d894b24e7335c445d7f97d24
parent0545a7e105633763507c24cc45ac03942fb271b3 (diff)
downloadeficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.tar.gz
eficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.tar.bz2
eficast-4e05e2ffe67e922980dd9efda6790ccdfcda6ac4.zip
Refactor, keep tracing on stderr, corrections for iovec size and dup dgram handling.
-rw-r--r--.gitignore2
-rw-r--r--mcastseed/src/dgrambuf.c169
-rw-r--r--mcastseed/src/dgrambuf.h3
-rw-r--r--mcastseed/src/mcastleech.c63
-rw-r--r--mcastseed/src/mcastseed.c94
-rw-r--r--mcastseed/src/msock.c2
6 files changed, 207 insertions, 126 deletions
diff --git a/.gitignore b/.gitignore
index 2919f2e..9eaae60 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@ mcastseed/src/mcastleech
mcastseed/src/Makefile
mcastseed/src/Makefile.in
mcastseed/src/*.o
+mcastseed/src/gmon.out
+mcastseed/src/gmon.txt
diff --git a/mcastseed/src/dgrambuf.c b/mcastseed/src/dgrambuf.c
index b07ba1f..b19b698 100644
--- a/mcastseed/src/dgrambuf.c
+++ b/mcastseed/src/dgrambuf.c
@@ -12,6 +12,7 @@
#include <stdio.h> /* perror() */
#include <string.h> /* memset() */
#include <sys/uio.h> /* writev() */
+#include <sys/param.h> /* MIN() */
struct uint_pair {
unsigned int index;
@@ -19,17 +20,19 @@ struct uint_pair {
};
struct dgrambuf_t {
- size_t dgram_count;
+ size_t dgram_slots;
+ size_t dgram_free_count;
size_t dgram_max_size;
size_t dgram_header_size;
- struct iovec *recv_iovecs;
- struct iovec *write_iovecs;
+ size_t iovec_slots;
+ struct iovec *iov_recv;
+ struct iovec *iov_write;
struct mmsghdr *msgs;
- unsigned int win_base;
- unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
+ unsigned int dgram_seq_base;
unsigned int *dgram_len;
+ unsigned int *dgram_seq_numbers; /* Stores the decoded datagram sequence number for each dgram slot of buf */
struct uint_pair *dgram_ordered_seq_numbers;
void *buf;
@@ -44,33 +47,40 @@ void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned i
dbuf->validate_func = func;
}
+size_t dgrambuf_free_count(const dgrambuf_t dbuf) {
+ return dbuf->dgram_free_count;
+}
+
int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
void *dgram_base;
size_t vlen, i, dgram_index;
int recv_msg_count, res;
unsigned int seq, dgram_len;
- if ( !dbuf->validate_func ) {
+ /* Buffer is full, can't receive */
+ if ( dbuf->dgram_free_count == 0 ) {
return -1;
}
+ /* Validate function is mandatory */
+ if ( !dbuf->validate_func ) {
+ return -2;
+ }
+
/* Initialize recvmmsg() syscall arguments */
- for (i=0, vlen=0; i < dbuf->dgram_count; i++) {
+ for (i=0, vlen=0; i < dbuf->dgram_slots; i++) {
if ( dbuf->dgram_seq_numbers[i] == 0 ) {
- dbuf->recv_iovecs[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
- dbuf->recv_iovecs[vlen].iov_len = dbuf->dgram_max_size;
+ dbuf->iov_recv[vlen].iov_base = dbuf->buf + i*dbuf->dgram_max_size;
+ dbuf->iov_recv[vlen].iov_len = dbuf->dgram_max_size;
memset(dbuf->msgs + vlen, 0, sizeof(struct mmsghdr));
- dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->recv_iovecs + vlen;
+ dbuf->msgs[vlen].msg_hdr.msg_iov = dbuf->iov_recv + vlen;
dbuf->msgs[vlen].msg_hdr.msg_iovlen = 1;
vlen++;
+ if ( vlen == dbuf->iovec_slots )
+ break;
}
}
- /* Buffer is full, can't receive */
- if ( vlen==0 ) {
- return -2;
- }
-
/* Do the syscall */
recv_msg_count = recvmmsg(sockfd, dbuf->msgs, vlen, MSG_WAITFORONE, NULL);
if (recv_msg_count < 0) {
@@ -81,28 +91,29 @@ int dgrambuf_recvmmsg(dgrambuf_t dbuf, int sockfd) {
/* Check all received messages */
res = 1;
for (i=0; i<recv_msg_count; i++) {
- dgram_base = dbuf->recv_iovecs[i].iov_base;
+ dgram_base = dbuf->iov_recv[i].iov_base;
dgram_index = (dgram_base - dbuf->buf) / dbuf->dgram_max_size;
dgram_len = dbuf->msgs[i].msg_len;
seq = dbuf->validate_func(dgram_len, dgram_base);
// TODO better feedback
if ( seq == 0 ) {
- printf("#%zi invalid (%u)\n", i, seq);
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zi invalid (%u)\n", i, seq);
dbuf->dgram_seq_numbers[dgram_index] = 0;
} else if ( seq == -1 ) {
- printf("#%zi end\n", i);
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zi end\n", i);
dbuf->dgram_seq_numbers[dgram_index] = 0;
res = 0;
- } else if ( seq < dbuf->win_base ) {
- printf("#%zi past (%u)\n", i, seq);
+ } else if ( seq < dbuf->dgram_seq_base ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zi past (%u)\n", i, seq);
dbuf->dgram_seq_numbers[dgram_index] = 0;
- } else if ( seq >= dbuf->win_base + dbuf->dgram_count ) {
- printf("#%zi future (%u)\n", i, seq);
+ } else if ( seq >= dbuf->dgram_seq_base + dbuf->dgram_slots ) {
+ fprintf(stderr, "dgrambuf_recvmmsg(): #%zi future (%u)\n", i, seq);
dbuf->dgram_seq_numbers[dgram_index] = 0;
} else {
- printf("#%zi valid (%u)\n", i, seq);
+ //fprintf(stderr, "dgrambuf_recvmmsg(): #%zi valid (%u)\n", i, seq);
dbuf->dgram_seq_numbers[dgram_index] = seq;
dbuf->dgram_len[dgram_index] = dgram_len;
+ dbuf->dgram_free_count--;
}
}
@@ -114,40 +125,52 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
unsigned int curr_seq, prev_seq, dgram_len;
ssize_t nwrite, total;
+ /* Buffer is empty, nothing to write */
+ if ( dbuf->dgram_free_count == dbuf->dgram_slots ) {
+ return -1;
+ }
+
/* Initialize dgram_ordered_seq_numbers from dgram_seq_numbers */
- for (i=0; i < dbuf->dgram_count; i++) {
+ for (i=0; i < dbuf->dgram_slots; i++) {
dbuf->dgram_ordered_seq_numbers[i].index = i;
dbuf->dgram_ordered_seq_numbers[i].value = dbuf->dgram_seq_numbers[i];
}
/* Inplace sorting of dgram_ordered_seq_numbers */
- qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_count, sizeof(struct uint_pair), _compare_uint_pair);
+ qsort(dbuf->dgram_ordered_seq_numbers, dbuf->dgram_slots, sizeof(struct uint_pair), _compare_uint_pair);
/* Initialize iovecs for writev, take dgram payloads following the sequence numbers */
- for (prev_seq=0, vlen=0, total=0, i=0; i< dbuf->dgram_count; i++) {
+ prev_seq=0, vlen=0, total=0;
+ for (i=dbuf->dgram_free_count; i < dbuf->dgram_slots; i++) {
curr_seq = dbuf->dgram_ordered_seq_numbers[i].value;
/* Skip empty dgram slot */
- if ( curr_seq == 0 )
+ if ( curr_seq == 0 ) {
+ fprintf(stderr, "Oops : found empty slot (i==%zi)\n", i);
+ continue;
+ }
+
+ /* Skip if current dgram is a dup of the previous */
+ if ( curr_seq == prev_seq ) {
+ dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
+ /* Mark slot as empty */
+ dbuf->dgram_seq_numbers[dgram_index] = 0;
+ dbuf->dgram_free_count++;
continue;
+ }
/* Skip dgram comming from the past */
- if ( curr_seq < dbuf->win_base ) {
+ if ( curr_seq < dbuf->dgram_seq_base ) {
fprintf(stderr, "Oops : found dgram from past in buffer (%u)\n", curr_seq);
continue;
}
- /* Break if first dgram to write is not in buffer at all */
- if ( ( vlen==0 ) && (curr_seq != dbuf->win_base) ) {
- fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->win_base);
+ /* Stop if first dgram to write is not in buffer at all */
+ if ( ( vlen==0 ) && (curr_seq != dbuf->dgram_seq_base) ) {
+ fprintf(stderr, "Oops : nothing to write, missing %u seq\n", dbuf->dgram_seq_base);
break;
}
- /* Skip if next dgram is a dup */
- if ( ( vlen > 0 ) && (curr_seq == prev_seq) ) {
- continue;
- }
-
- /* Break if next seq dgram is missing */
+ /* Stop if current seq dgram is missing */
if ( ( vlen > 0 ) && (curr_seq > prev_seq+1 ) ) {
break;
}
@@ -156,87 +179,91 @@ ssize_t dgrambuf_write(dgrambuf_t dbuf, int fd) {
dgram_index = dbuf->dgram_ordered_seq_numbers[i].index;
dgram_len = dbuf->dgram_len[dgram_index] - dbuf->dgram_header_size;
- dbuf->write_iovecs[vlen].iov_len = dgram_len; /* Setup iovecs */
- dbuf->write_iovecs[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
- dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as reusable */
+ dbuf->iov_write[vlen].iov_len = dgram_len; /* Setup iovecs */
+ dbuf->iov_write[vlen].iov_base = dbuf->buf + dgram_index*dbuf->dgram_max_size + dbuf->dgram_header_size;
+ dbuf->dgram_seq_numbers[dgram_index] = 0; /* Mark dgram slots about to be written out as empty for next read */
total += dgram_len; /* Update counters */
- vlen++;
- dbuf->win_base = curr_seq;
+ dbuf->dgram_free_count++;
+ dbuf->dgram_seq_base = curr_seq + 1;
prev_seq = curr_seq;
+ vlen++;
+ /* Don't plan to write more than iovec_slots slots */
+ if ( vlen == dbuf->iovec_slots )
+ break;
}
- /* If nothing valid to write out */
+ /* Nothing valid to write out (but buffer not empty, missing the next dgram) */
if ( vlen == 0 ) {
return -1;
}
- nwrite = writev(fd, dbuf->write_iovecs, vlen);
+ nwrite = writev(fd, dbuf->iov_write, vlen);
if ( nwrite < 0 ) {
perror("writev()");
- return nwrite;
- }
-
- if ( nwrite != total ) {
+ } else if ( nwrite != total ) {
+ //FIXME : everything break if there because all non writed data will be overwritted at next read
+ // Make a loop here could make dgrambuf_writev() unbounded in run time
fprintf(stderr, "writev() short\n");
- return nwrite;
}
return nwrite;
}
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size) {
+dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots) {
dgrambuf_t dbuf = calloc(1, sizeof(struct dgrambuf_t));
if (!dbuf) goto fail0;
- dbuf->dgram_count = dgram_count;
+ dbuf->dgram_slots = dgram_slots;
+ dbuf->dgram_free_count = dgram_slots;
dbuf->dgram_max_size = dgram_max_size;
dbuf->dgram_header_size = dgram_header_size;
+ dbuf->iovec_slots = MIN(iovec_slots,dgram_slots);
- dbuf->recv_iovecs = calloc(dgram_count, sizeof(struct iovec));
- if (!dbuf->recv_iovecs) goto fail1;
+ dbuf->iov_recv = calloc(iovec_slots, sizeof(struct iovec));
+ if (!dbuf->iov_recv) goto fail1;
- dbuf->write_iovecs = calloc(dgram_count, sizeof(struct iovec));
- if (!dbuf->write_iovecs) goto fail2;
+ dbuf->iov_write = calloc(iovec_slots, sizeof(struct iovec));
+ if (!dbuf->iov_write) goto fail2;
- dbuf->msgs = calloc(dgram_count, sizeof(struct mmsghdr));
+ dbuf->msgs = calloc(iovec_slots, sizeof(struct mmsghdr));
if (!dbuf->msgs) goto fail3;
- dbuf->win_base = 1;
- dbuf->dgram_seq_numbers = calloc(dgram_count, sizeof(unsigned int));
- if (!dbuf->dgram_seq_numbers) goto fail4;
+ dbuf->dgram_seq_base = 1;
+ dbuf->dgram_len = calloc(dgram_slots, sizeof(unsigned int));
+ if (!dbuf->dgram_len) goto fail4;
- dbuf->dgram_len = calloc(dgram_count, sizeof(ssize_t));
- if (!dbuf->dgram_len) goto fail5;
+ dbuf->dgram_seq_numbers = calloc(dgram_slots, sizeof(unsigned int));
+ if (!dbuf->dgram_seq_numbers) goto fail5;
- dbuf->dgram_ordered_seq_numbers = calloc(dgram_count, sizeof(struct uint_pair));
+ dbuf->dgram_ordered_seq_numbers = calloc(dgram_slots, sizeof(struct uint_pair));
if (!dbuf->dgram_ordered_seq_numbers) goto fail6;
- dbuf->buf = calloc(dgram_count, dgram_max_size);
+ dbuf->buf = calloc(dgram_slots, dgram_max_size);
if (!dbuf->buf) goto fail7;
return dbuf;
fail7: free(dbuf->dgram_ordered_seq_numbers);
-fail6: free(dbuf->dgram_len);
-fail5: free(dbuf->dgram_seq_numbers);
+fail6: free(dbuf->dgram_seq_numbers);
+fail5: free(dbuf->dgram_len);
fail4: free(dbuf->msgs);
-fail3: free(dbuf->write_iovecs);
-fail2: free(dbuf->recv_iovecs);
+fail3: free(dbuf->iov_write);
+fail2: free(dbuf->iov_recv);
fail1: free(dbuf);
-fail0: return 0;
+fail0: return NULL;
}
void dgrambuf_free(dgrambuf_t *dbuf) {
if (dbuf && *dbuf) {
free((*dbuf)->buf);
free((*dbuf)->dgram_ordered_seq_numbers);
- free((*dbuf)->dgram_len);
free((*dbuf)->dgram_seq_numbers);
+ free((*dbuf)->dgram_len);
free((*dbuf)->msgs);
- free((*dbuf)->write_iovecs);
- free((*dbuf)->recv_iovecs);
+ free((*dbuf)->iov_write);
+ free((*dbuf)->iov_recv);
free(*dbuf);
}
*dbuf = NULL;
diff --git a/mcastseed/src/dgrambuf.h b/mcastseed/src/dgrambuf.h
index b74625d..3a94eee 100644
--- a/mcastseed/src/dgrambuf.h
+++ b/mcastseed/src/dgrambuf.h
@@ -9,9 +9,10 @@
typedef struct dgrambuf_t *dgrambuf_t;
-dgrambuf_t dgrambuf_new(size_t dgram_count, size_t dgram_max_size, size_t dgram_header_size);
+dgrambuf_t dgrambuf_new(size_t dgram_slots, size_t dgram_max_size, size_t dgram_header_size, size_t iovec_slots);
void dgrambuf_free(dgrambuf_t *dbuf);
+size_t dgrambuf_free_count(const dgrambuf_t);
void dgrambuf_set_validate_func(dgrambuf_t dbuf, unsigned int (*func)(unsigned int, void *) );
diff --git a/mcastseed/src/mcastleech.c b/mcastseed/src/mcastleech.c
index 9315992..6760451 100644
--- a/mcastseed/src/mcastleech.c
+++ b/mcastseed/src/mcastleech.c
@@ -1,23 +1,26 @@
-/* client.c
+/*
+ * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning)
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
* Greatly inspired from examples written by tmouse, July 2005
* http://cboard.cprogramming.com/showthread.php?t=67469
- * Modified to run multi-platform by Christian Beier <dontmind@freeshell.org>.
*/
+#define _GNU_SOURCE /* See feature_test_macros(7) */
-#ifndef __MINGW32__
-#include <unistd.h>
-#endif
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <time.h>
+#include <unistd.h> /* close() */
+#include <stdio.h> /* fprintf(), stderr */
+#include <stdlib.h> /* EXIT_SUCCESS */
#include "msock.h"
#include "dgrambuf.h"
#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
-#define MULTICAST_SO_RCVBUF 425984
+#define MULTICAST_SO_RCVBUF_WANTED 425984
+//XXX Make it dynamic, with the effective value of so_rcvbuf
+#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
#define DGRAM_HEADER_SIZE 8
+
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
@@ -37,7 +40,7 @@ dgrambuf_t dgrambuf;
/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
- "exiting",
+ "start",
"wait_hello_and_connect_back",
"wait_start_and_start_job",
"receive_data",
@@ -49,6 +52,7 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
+void fsm_trace(int state);
int get_available_mem_kb();
void dgrambuf_init();
uint32_t validate_data_dgram(unsigned int nread, void *recvbuf);
@@ -61,7 +65,6 @@ int receive_data();
int finalize_job();
int is_there_more_job();
-
int main(int argc, char* argv[]) {
int state = 1; /* state of the "protocol" state machine */
int res;
@@ -71,7 +74,7 @@ int main(int argc, char* argv[]) {
/* Finite state machine */
while ( state > 0 ) {
- fprintf(stderr, "Now in %s state\n", state_str[state]);
+ fsm_trace(state);
switch ( state ) {
case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
@@ -84,6 +87,7 @@ int main(int argc, char* argv[]) {
case 5: state = (is_there_more_job() == 0)?2:0; break;
}
}
+ fsm_trace(state);
if ( mcast_sock > 0 ) {
close(mcast_sock);
@@ -115,7 +119,7 @@ int wait_hello_and_connect_back() {
close(mcast_sock);
mcast_sock = (SOCKET) -1;
}
- mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF);
+ mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED);
if(mcast_sock < 0) {
usage("Could not setup multicast socket. Wrong args given ?");
}
@@ -187,7 +191,10 @@ void ack(uint32_t seq) {
int finalize_job() {
//XXX Dummy test
- dgrambuf_write(dgrambuf, 2);
+ ssize_t res;
+ while ( (res=dgrambuf_write(dgrambuf, 1)) > 0 ) {
+ fprintf(stderr, "dgrambuf_write => %zi\n", res);
+ }
return 0;
}
int is_there_more_job() {
@@ -223,6 +230,21 @@ void arg_parse(int argc, char* argv[]) {
mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}
+void fsm_trace(int state) {
+ static int prev_state = 0;
+
+ if ( state < 0 ) {
+ fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]);
+ } else if ( prev_state != state) {
+ if ( state == 0 ) {
+ fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
+ } else {
+ fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
+ }
+ prev_state = state;
+ }
+}
+
int get_available_mem_kb() {
char key[64];
int res, value, found=0;
@@ -249,23 +271,22 @@ void dgrambuf_init() {
size_t dgram_count;
int avail_mem = get_available_mem_kb();
- if ( avail_mem < MULTICAST_SO_RCVBUF ) {
- dgram_count = MULTICAST_SO_RCVBUF / MULTICAST_RECV_BUF;
+ if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) {
+ dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF;
} else {
dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
}
//XXX Dummy
- dgram_count = 3;
- fprintf(stderr, "avail_mem == %i kb, dgram_count == %zi\n", avail_mem, dgram_count);
+ //dgram_count = 3;
/* Allocate dgrambuf */
- dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE);
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
if ( dgrambuf == NULL ) {
perror("dgrambuf_new/malloc");
exit(EXIT_FAILURE);
}
- //printf("dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
+ fprintf(stderr, "dgrambuf_free_count() => %zi\n", dgrambuf_free_count(dgrambuf));
dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}
diff --git a/mcastseed/src/mcastseed.c b/mcastseed/src/mcastseed.c
index f86af84..276ed92 100644
--- a/mcastseed/src/mcastseed.c
+++ b/mcastseed/src/mcastseed.c
@@ -1,19 +1,22 @@
-/* server.c
+/*
+ * mcastseed.c - Multicast sender for huge streams to be piped to other programs (partitions cloning)
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
* Greatly inspired from examples written by tmouse, July 2005
* http://cboard.cprogramming.com/showthread.php?t=67469
- * Modified to run multi-platform by Christian Beier <dontmind@freeshell.org>.
*/
+#define _GNU_SOURCE /* See feature_test_macros(7) */
-#ifndef __MINGW32__
-#include <unistd.h> /* for usleep() */
-#endif
-#include <stdio.h>
-#include <stdlib.h>
+#include <unistd.h> /* close() */
+#include <stdio.h> /* fprintf(), stderr */
+#include <stdlib.h> /* atoi(), EXIT_SUCCESS */
#include "msock.h"
#define READ_BUF_LEN 256
#define MAX_PENDING_CONNECTIONS 256
#define MAX_CLIENTS 256
+
#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"
#define DEFAULT_MCAST_TTL 1
@@ -42,7 +45,7 @@ char readbuf[READ_BUF_LEN];
/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
- "exiting",
+ "start",
"send_hello",
"accept_pending_clients_or_wait_a_bit",
"start_job",
@@ -55,8 +58,9 @@ const char * const state_str[] = {
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
-void unsetup_sockets();
+void fsm_trace(int state);
void setup_sockets();
+void unsetup_sockets();
/* Parts of the "protocol", definitions are after main() */
int send_hello();
@@ -74,7 +78,7 @@ int main(int argc, char *argv[]) {
/* Finite state machine */
while ( state > 0 ) {
- fprintf(stderr, "Now in %s state\n", state_str[state]);
+ fsm_trace(state);
switch ( state ) {
case 1: res = send_hello(); state = (res==0)?2:-1; break;
case 2: res = accept_pending_clients_or_wait_a_bit();
@@ -105,6 +109,7 @@ int main(int argc, char *argv[]) {
break;
}
}
+ fsm_trace(state);
unsetup_sockets();
@@ -261,24 +266,34 @@ int start_job() {
return 0;
}
+void send_fake(char buf[], int paylen, int i) {
+ *( (uint32_t *) buf+1 ) = htonl(i);
+ snprintf(buf+29, 5, "%04i", i);
+ *( (char *) buf+33 ) = ')';
+ sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
+}
int send_data() {
ssize_t nwrite;
- char buf[] = "dataXXXXJe suis à la plage.";
+ char buf[] = "dataXXXXJe suis à la plage (XXXX).\n";
int paylen = strlen(buf)-8;
- int seq = 1;
+ int i;
//XXX Dummy
- *( (uint32_t *) buf+1 ) = htonl(3);
- sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
- *( (uint32_t *) buf+1 ) = htonl(4);
- sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
- *( (uint32_t *) buf+1 ) = htonl(2);
- sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
-
-
- *( (uint32_t *) buf+1 ) = htonl(seq);
+ send_fake(buf, paylen, 5);
+ send_fake(buf, paylen, 4);
+ for (i=6; i<=300; i+=2) {
+ send_fake(buf, paylen, i);
+ }
+ for (i=7; i<=300; i+=2) {
+ send_fake(buf, paylen, i);
+ }
+ send_fake(buf, paylen, 1);
+ send_fake(buf, paylen, 1);
+ send_fake(buf, paylen, 2);
+ *( (uint32_t *) buf+1 ) = htonl(3);
+ buf[22]='m', buf[23]='e', buf[24]='r'; buf[25]='.'; buf[26]='\n'; paylen = 19;
nwrite = sendto(mcast_sock, buf, 8+paylen, 0, mcast_addr->ai_addr, mcast_addr->ai_addrlen);
if ( nwrite < 0 ) {
perror("sendto() failed");
@@ -398,19 +413,18 @@ void arg_parse(int argc, char* argv[]) {
mcast_ttl = 1;
}
-void unsetup_sockets() {
- if ( ucast_sock > 0 ) {
- close(ucast_sock);
- ucast_sock = 0;
- }
+void fsm_trace(int state) {
+ static int prev_state = 0;
- if ( mcast_sock > 0 ) {
- close(mcast_sock);
- mcast_sock = 0;
- if ( mcast_addr ) {
- freeaddrinfo(mcast_addr);
- mcast_addr = 0;
+ if ( state < 0 ) {
+ fprintf(stderr, "Abnormal exit condition %i (from %s)", state, state_str[prev_state]);
+ } else if ( prev_state != state) {
+ if ( state == 0 ) {
+ fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
+ } else {
+ fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
}
+ prev_state = state;
}
}
@@ -426,3 +440,19 @@ void setup_sockets() {
usage("Could not setup multicast socket. Wrong args given ?");
}
+void unsetup_sockets() {
+ if ( ucast_sock > 0 ) {
+ close(ucast_sock);
+ ucast_sock = 0;
+ }
+
+ if ( mcast_sock > 0 ) {
+ close(mcast_sock);
+ mcast_sock = 0;
+ if ( mcast_addr ) {
+ freeaddrinfo(mcast_addr);
+ mcast_addr = 0;
+ }
+ }
+}
+
diff --git a/mcastseed/src/msock.c b/mcastseed/src/msock.c
index 8274710..e5df8d6 100644
--- a/mcastseed/src/msock.c
+++ b/mcastseed/src/msock.c
@@ -178,7 +178,7 @@ SOCKET mcast_recv_socket(char* multicastIP, char* multicastPort, int multicastRe
perror("getsockopt");
goto error;
}
- printf("tried to set socket receive buffer from %d to %d, got %d\n",
+ fprintf(stderr, "tried to set socket receive buffer from %d to %d, got %d\n",
dfltrcvbuf, multicastRecvBufSize, optval);