summaryrefslogtreecommitdiff
path: root/draft/mcastseed/src/mcastleech.c
diff options
context:
space:
mode:
Diffstat (limited to 'draft/mcastseed/src/mcastleech.c')
-rw-r--r--draft/mcastseed/src/mcastleech.c408
1 files changed, 408 insertions, 0 deletions
diff --git a/draft/mcastseed/src/mcastleech.c b/draft/mcastseed/src/mcastleech.c
new file mode 100644
index 0000000..3345665
--- /dev/null
+++ b/draft/mcastseed/src/mcastleech.c
@@ -0,0 +1,408 @@
+/*
+ * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning)
+ *
+ * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * Greatly inspired from examples written by tmouse, July 2005
+ * http://cboard.cprogramming.com/showthread.php?t=67469
+ */
+#define _GNU_SOURCE /* See feature_test_macros(7) */
+#include "config.h"
+
+#include <unistd.h> /* close() */
+#include <stdio.h> /* fprintf(), stderr */
+#include <stdlib.h> /* EXIT_SUCCESS */
+#include <string.h> /* strncmp() */
+#include <fcntl.h> /* fcntl() */
+#include "sockets.h"
+#include "dgrambuf.h"
+
+#define MTU 1500
+#define MULTICAST_RECV_BUF (MTU-20-8)
+#define MULTICAST_SO_RCVBUF_WANTED 425984
+#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
+#define DGRAM_HEADER_SIZE 8
+
+#define DEFAULT_MCAST_IP_STR "ff02::114"
+#define DEFAULT_PORT_STR "9000"
+
+/* Cmdline Arguments */
+char *prog_name = NULL;
+char *mcast_ip = NULL;
+char *port = NULL;
+
+/* Sockets as global, used everywhere, even in die() */
+int mcast_sock = -1; /* Multicast socket for receiving data */
+int ucast_sock = -1; /* Unicast socket for give feedback to server */
+
+/* Buffer used for earch recvfrom() */
+char recvbuf[MULTICAST_RECV_BUF];
+/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
+dgrambuf_t dgrambuf;
+
+/* Strings to print out representation of various states of the program */
+const char * const state_str[] = {
+ "start",
+ "wait_hello_and_connect_back",
+ "wait_start_and_start_job",
+ "receive_data",
+ "finalize_job",
+ "is_there_more_job"
+};
+
+/* Some boring funcs you didn't want to read now */
+void die(char* msg);
+void usage(char *msg);
+void arg_parse(int argc, char* argv[]);
+void fsm_trace(int state);
+int get_available_mem_kb();
+void set_O_NONBLOCK(int fd, int set);
+void dgrambuf_init();
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
+int send_status(int state, int info_r, int info_w);
+
+/* Parts of the "protocol", definitions are after main() */
+int wait_hello_and_connect_back();
+int wait_start_and_start_job();
+int receive_data();
+int finalize_job();
+int is_there_more_job();
+
+int main(int argc, char* argv[]) {
+ int state = 1; /* state of the "protocol" state machine */
+ int res;
+
+ arg_parse(argc, argv);
+ dgrambuf_init();
+
+ /*XXX Maybe elsewhere, when popen'ing target program */
+ set_O_NONBLOCK(1, 1);
+
+/* XXX Dummy */
+ fcntl(1, F_SETPIPE_SZ, 4096);
+ fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
+ /* Finite state machine */
+ while ( state > 0 ) {
+ fsm_trace(state);
+ switch ( state ) {
+ case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
+ case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
+ case 3:
+ res = receive_data();
+ if (res==0) state = 4;
+ else if (res==1) state=3;
+ else state = -1;
+ break;
+ case 4: state = (finalize_job() == 0)?5:-2; break;
+ case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
+ }
+ }
+ fsm_trace(state);
+
+ if ( mcast_sock > 0 ) {
+ close(mcast_sock);
+ mcast_sock = -1;
+ }
+
+ dgrambuf_free(&dgrambuf);
+
+ if ( state < 0 ) {
+ return -state;
+ }
+
+ return EXIT_SUCCESS;
+}
+
+
+int wait_hello_and_connect_back() {
+ /* Buffers for host and service strings after resolve */
+ char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
+ /* Server address, filled by system after first recvfrom */
+ struct sockaddr_storage peer_addr;
+ socklen_t peer_addr_len;
+ /* Various needed variables */
+ ssize_t nread;
+ int res;
+
+ /* Setup mcast_sock */
+ if ( mcast_sock > 0 ) {
+ close(mcast_sock);
+ mcast_sock = -1;
+ }
+ mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED);
+ if(mcast_sock < 0) {
+ usage("Could not setup multicast socket. Wrong args given ?");
+ }
+
+ /* Wait for a single datagram from the server (for sync, no check on contain) */
+ peer_addr_len = sizeof(struct sockaddr_storage);
+ nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len);
+ if (nread < 0 ) {
+ perror("recvfrom() failed");
+ return -1;
+ }
+ /* Get peer informations as strings from peer_addr */
+ res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len,
+ hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV);
+ if ( res != 0 ) {
+ fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res));
+ return -2;
+ }
+ /* Connect back to the server, with reliable unicast */
+ if ( ucast_sock > 0 ) {
+ close(ucast_sock);
+ }
+ /* FIXME : ucast_client_socket() use DNS resolver and could block */
+ ucast_sock = ucast_client_socket(hbuf,port);
+ if(ucast_sock < 0) {
+ fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
+ return -3;
+ }
+
+ return 0;
+}
+
+int wait_start_and_start_job() {
+ ssize_t nread, nwrite;
+
+ /* Wait for a "start" datagram from the server */
+ nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
+ if (nread < 0 ) {
+ perror("recvfrom() failed");
+ return -1;
+ }
+ if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
+ /* Reply "ready" through unicast stream socket */
+ nwrite = write(ucast_sock, "ready", 5);
+ if ( nwrite < 0 ) {
+ fprintf(stderr, "write() failed\n");
+ return -2;
+ }
+ if (nwrite != 5) {
+ fprintf(stderr, "write() short\n");
+ return -3;
+ }
+
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+#define DGRAMBUF_RECV_OVERWRITE 1 << 1
+#define DGRAMBUF_RECV_EINTR 1 << 2
+#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
+#define DGRAMBUF_RECV_FINALIZE 1 << 4
+#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5
+
+#define DGRAMBUF_WRITE_PARTIAL 1 << 1
+#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
+#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
+#define DGRAMBUF_WRITE_SUCCESS 1 << 4
+*/
+int receive_data() {
+ int info_r, info_w, res;
+ ssize_t nread, nwrite;
+ static int noop_calls_count = 0;
+
+ /* Read (blocking, timeout = 1 sec) */
+ nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
+ if ( nread < 0 ) {
+ return nread;
+ }
+
+ /* Write (non-blocking) */
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
+ }
+
+ /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/
+
+ /* XXX Crapy dead state detection */
+ if ( nread == 0 /* TEST && nwrite == 0 */ ) {
+ if ( noop_calls_count > 10 ) {
+ return 0;
+ }
+ noop_calls_count++;
+ } else {
+ noop_calls_count = 0;
+ }
+
+ /* Consider sending status back to seeder */
+ res = send_status(1, info_r, info_w);
+ if ( res < 0 ) {
+ return res;
+ }
+
+ if ( dgrambuf_have_received_everything(dgrambuf) ) {
+ return 0;
+ }
+ return 1;
+}
+
+
+int finalize_job() {
+ ssize_t nwrite;
+ int info_w, res;
+ char *stats;
+
+ /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
+ set_O_NONBLOCK(1, 0);
+
+ /* Flush the whole buffer */
+ do {
+ nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
+ if ( nwrite < 0 ) {
+ return nwrite;
+ }
+ fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
+ } while ( nwrite > 0);
+
+ /* Inform the seeder that have have finished */
+ res = send_status(2, 0, info_w);
+ if ( res < 0 ) {
+ return res;
+ }
+
+ res = dgrambuf_stats(dgrambuf, &stats);
+ if ( res != - 1 ) {
+ fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats);
+ free(stats);
+ }
+ return 0;
+}
+
+int is_there_more_job() {
+ return 1;
+}
+
+
+
+
+void die(char* msg) {
+ fprintf(stderr, "%s\n", msg);
+ if (mcast_sock > 0)
+ close(mcast_sock);
+ if (ucast_sock > 0)
+ close(ucast_sock);
+ exit(EXIT_FAILURE);
+}
+
+void usage(char *msg) {
+ char ubuf[256];
+ if ( msg != NULL )
+ fprintf(stderr, "%s\n", msg);
+ ubuf[0] = '\0';
+ snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name);
+ die(ubuf);
+}
+
+void arg_parse(int argc, char* argv[]) {
+ prog_name = argv[0];
+ if ( argc > 3 )
+ usage("Too many arguments");
+ port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR;
+ mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
+}
+
+void fsm_trace(int state) {
+ static int prev_state = 0;
+
+ if ( state < 0 ) {
+ fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]);
+ } else if ( prev_state != state) {
+ if ( state == 0 ) {
+ fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]);
+ } else {
+ fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
+ }
+ prev_state = state;
+ }
+}
+
+int get_available_mem_kb() {
+ char key[64];
+ int res, value, found=0;
+ FILE * fh = fopen("/proc/meminfo", "r");
+ if ( fh ) {
+ while (!found && !feof(fh)) {
+ res = fscanf(fh, "%63s %i kB\n", key, &value);
+ if ( res < 0 )
+ break;
+ found = ( strncmp("MemAvailable:", key, 12) == 0 );
+ }
+ fclose(fh);
+ }
+
+ if ( found && value > 0 ) {
+ return value;
+ }
+
+ return 0;
+}
+
+void set_O_NONBLOCK(int fd, int set) {
+ int res, flags;
+
+ flags = fcntl(fd, F_GETFL);
+ if ( flags == -1 ) {
+ perror("fcntl(1, F_GETFL)");
+ }
+ if ( set ) {
+ res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ } else {
+ res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
+ }
+ if ( res == -1 ) {
+ perror("fcntl(1, F_SETFL)");
+ }
+}
+
+void dgrambuf_init() {
+ /* Guess dgrambuf size from global free memory */
+ size_t dgram_count;
+ int avail_mem = get_available_mem_kb();
+
+ if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) {
+ dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF;
+ } else {
+ dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
+ }
+ /* XXX Dummy
+ dgram_count = 5;
+ */
+
+ /* Allocate dgrambuf */
+ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
+ if ( dgrambuf == NULL ) {
+ perror("dgrambuf_new/malloc");
+ exit(EXIT_FAILURE);
+ }
+
+ fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
+ dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
+}
+
+int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {
+
+ if ( nread < DGRAM_HEADER_SIZE ) {
+ return 0;
+ }
+ if ( strncmp("data", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 1;
+ }
+ if ( strncmp("end:", recvbuf, 4) == 0 ) {
+ *seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
+ return 2;
+ }
+ return 0;
+}
+
+int send_status(int state, int info_r, int info_w) {
+ if ( state && info_r && info_w ) {}
+ /* TODO Implement it */
+ return 0;
+}