/* * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning) * * Copyright 2016 by Ludovic Pouzenc * * Greatly inspired from examples written by tmouse, July 2005 * http://cboard.cprogramming.com/showthread.php?t=67469 */ #define _GNU_SOURCE /* See feature_test_macros(7) */ #include "config.h" #include /* close() */ #include /* fprintf(), stderr */ #include /* EXIT_SUCCESS */ #include /* strncmp() */ #include /* fcntl() */ #include "sockets.h" #include "dgrambuf.h" #define MTU 1500 #define MULTICAST_RECV_BUF (MTU-20-8) #define MULTICAST_SO_RCVBUF_WANTED 425984 #define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF) #define DGRAM_HEADER_SIZE 8 #define DEFAULT_MCAST_IP_STR "ff02::114" #define DEFAULT_PORT_STR "9000" /* Cmdline Arguments */ char *prog_name = NULL; char *mcast_ip = NULL; char *port = NULL; /* Sockets as global, used everywhere, even in die() */ int mcast_sock = -1; /* Multicast socket for receiving data */ int ucast_sock = -1; /* Unicast socket for give feedback to server */ /* Buffer used for earch recvfrom() */ char recvbuf[MULTICAST_RECV_BUF]; /* Huge ring buffer to absorb consumer speed variations without loosing datagrams */ dgrambuf_t dgrambuf; /* Strings to print out representation of various states of the program */ const char * const state_str[] = { "start", "wait_hello_and_connect_back", "wait_start_and_start_job", "receive_data", "finalize_job", "is_there_more_job" }; /* Some boring funcs you didn't want to read now */ void die(char* msg); void usage(char *msg); void arg_parse(int argc, char* argv[]); void fsm_trace(int state); int get_available_mem_kb(); void set_O_NONBLOCK(int fd, int set); void dgrambuf_init(); int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq); int send_status(int state, int info_r, int info_w); /* Parts of the "protocol", definitions are after main() */ int wait_hello_and_connect_back(); int wait_start_and_start_job(); int receive_data(); int finalize_job(); int is_there_more_job(); int main(int argc, char* argv[]) { int state = 1; /* state of the "protocol" state machine */ int res; arg_parse(argc, argv); dgrambuf_init(); /*XXX Maybe elsewhere, when popen'ing target program */ set_O_NONBLOCK(1, 1); /* XXX Dummy */ fcntl(1, F_SETPIPE_SZ, 4096); fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ)); /* Finite state machine */ while ( state > 0 ) { fsm_trace(state); switch ( state ) { case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break; case 2: state = (wait_start_and_start_job() == 0)?2:3; break; case 3: res = receive_data(); if (res==0) state = 4; else if (res==1) state=3; else state = -1; break; case 4: state = (finalize_job() == 0)?5:-2; break; case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */ } } fsm_trace(state); if ( mcast_sock > 0 ) { close(mcast_sock); mcast_sock = -1; } dgrambuf_free(&dgrambuf); if ( state < 0 ) { return -state; } return EXIT_SUCCESS; } int wait_hello_and_connect_back() { /* Buffers for host and service strings after resolve */ char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; /* Server address, filled by system after first recvfrom */ struct sockaddr_storage peer_addr; socklen_t peer_addr_len; /* Various needed variables */ ssize_t nread; int res; /* Setup mcast_sock */ if ( mcast_sock > 0 ) { close(mcast_sock); mcast_sock = -1; } mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED); if(mcast_sock < 0) { usage("Could not setup multicast socket. Wrong args given ?"); } /* Wait for a single datagram from the server (for sync, no check on contain) */ peer_addr_len = sizeof(struct sockaddr_storage); nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len); if (nread < 0 ) { perror("recvfrom() failed"); return -1; } /* Get peer informations as strings from peer_addr */ res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len, hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV); if ( res != 0 ) { fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res)); return -2; } /* Connect back to the server, with reliable unicast */ if ( ucast_sock > 0 ) { close(ucast_sock); } /* FIXME : ucast_client_socket() use DNS resolver and could block */ ucast_sock = ucast_client_socket(hbuf,port); if(ucast_sock < 0) { fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port); return -3; } return 0; } int wait_start_and_start_job() { ssize_t nread, nwrite; /* Wait for a "start" datagram from the server */ nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0); if (nread < 0 ) { perror("recvfrom() failed"); return -1; } if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) { /* Reply "ready" through unicast stream socket */ nwrite = write(ucast_sock, "ready", 5); if ( nwrite < 0 ) { fprintf(stderr, "write() failed\n"); return -2; } if (nwrite != 5) { fprintf(stderr, "write() short\n"); return -3; } return 1; } return 0; } /* #define DGRAMBUF_RECV_OVERWRITE 1 << 1 #define DGRAMBUF_RECV_EINTR 1 << 2 #define DGRAMBUF_RECV_IOVEC_FULL 1 << 3 #define DGRAMBUF_RECV_FINALIZE 1 << 4 #define DGRAMBUF_RECV_VALID_DGRAM 1 << 5 #define DGRAMBUF_WRITE_PARTIAL 1 << 1 #define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2 #define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3 #define DGRAMBUF_WRITE_SUCCESS 1 << 4 */ int receive_data() { int info_r, info_w, res; ssize_t nread, nwrite; static int noop_calls_count = 0; /* Read (blocking, timeout = 1 sec) */ nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r); if ( nread < 0 ) { return nread; } /* Write (non-blocking) */ nwrite = dgrambuf_write(dgrambuf, 1, &info_w); if ( nwrite < 0 ) { return nwrite; } /*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/ /* XXX Crapy dead state detection */ if ( nread == 0 /* TEST && nwrite == 0 */ ) { if ( noop_calls_count > 10 ) { return 0; } noop_calls_count++; } else { noop_calls_count = 0; } /* Consider sending status back to seeder */ res = send_status(1, info_r, info_w); if ( res < 0 ) { return res; } if ( dgrambuf_have_received_everything(dgrambuf) ) { return 0; } return 1; } int finalize_job() { ssize_t nwrite; int info_w, res; char *stats; /* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */ set_O_NONBLOCK(1, 0); /* Flush the whole buffer */ do { nwrite = dgrambuf_write(dgrambuf, 1, &info_w); if ( nwrite < 0 ) { return nwrite; } fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite); } while ( nwrite > 0); /* Inform the seeder that have have finished */ res = send_status(2, 0, info_w); if ( res < 0 ) { return res; } res = dgrambuf_stats(dgrambuf, &stats); if ( res != - 1 ) { fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats); free(stats); } return 0; } int is_there_more_job() { return 1; } void die(char* msg) { fprintf(stderr, "%s\n", msg); if (mcast_sock > 0) close(mcast_sock); if (ucast_sock > 0) close(ucast_sock); exit(EXIT_FAILURE); } void usage(char *msg) { char ubuf[256]; if ( msg != NULL ) fprintf(stderr, "%s\n", msg); ubuf[0] = '\0'; snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name); die(ubuf); } void arg_parse(int argc, char* argv[]) { prog_name = argv[0]; if ( argc > 3 ) usage("Too many arguments"); port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR; mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR; } void fsm_trace(int state) { static int prev_state = 0; if ( state < 0 ) { fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]); } else if ( prev_state != state) { if ( state == 0 ) { fprintf(stderr, "Normal exit (from %s)\n", state_str[prev_state]); } else { fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]); } prev_state = state; } } int get_available_mem_kb() { char key[64]; int res, value, found=0; FILE * fh = fopen("/proc/meminfo", "r"); if ( fh ) { while (!found && !feof(fh)) { res = fscanf(fh, "%63s %i kB\n", key, &value); if ( res < 0 ) break; found = ( strncmp("MemAvailable:", key, 12) == 0 ); } fclose(fh); } if ( found && value > 0 ) { return value; } return 0; } void set_O_NONBLOCK(int fd, int set) { int res, flags; flags = fcntl(fd, F_GETFL); if ( flags == -1 ) { perror("fcntl(1, F_GETFL)"); } if ( set ) { res = fcntl(fd, F_SETFL, flags | O_NONBLOCK); } else { res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK); } if ( res == -1 ) { perror("fcntl(1, F_SETFL)"); } } void dgrambuf_init() { /* Guess dgrambuf size from global free memory */ size_t dgram_count; int avail_mem = get_available_mem_kb(); if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) { dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF; } else { dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024; } /* XXX Dummy dgram_count = 5; */ /* Allocate dgrambuf */ dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC); if ( dgrambuf == NULL ) { perror("dgrambuf_new/malloc"); exit(EXIT_FAILURE); } fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf)); dgrambuf_set_validate_func(dgrambuf, validate_data_dgram); } int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) { if ( nread < DGRAM_HEADER_SIZE ) { return 0; } if ( strncmp("data", recvbuf, 4) == 0 ) { *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); return 1; } if ( strncmp("end:", recvbuf, 4) == 0 ) { *seq = ntohl( *( (uint32_t *) recvbuf+1 ) ); return 2; } return 0; } int send_status(int state, int info_r, int info_w) { if ( state && info_r && info_w ) {} /* TODO Implement it */ return 0; }