/*
 * mcastleech.c - Multicast client for huge streams to be piped to other programs (partitions cloning)
 *
 * Copyright 2016 by Ludovic Pouzenc <ludovic@pouzenc.fr>
 *
 * Greatly inspired from examples written by tmouse, July 2005
 * http://cboard.cprogramming.com/showthread.php?t=67469
 */
#define _GNU_SOURCE /* See feature_test_macros(7) */
#include "config.h"

#include <unistd.h> /* close() */
#include <stdio.h> /* fprintf(), stderr */
#include <stdlib.h> /* EXIT_SUCCESS */
#include <string.h> /* strncmp() */
#include <fcntl.h> /* fcntl() */
#include "sockets.h"
#include "dgrambuf.h"

#define MTU 1500
#define MULTICAST_RECV_BUF (MTU-20-8)
#define MULTICAST_SO_RCVBUF_WANTED 425984
#define MAX_IOVEC (MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF)
#define DGRAM_HEADER_SIZE 8

#define DEFAULT_MCAST_IP_STR "ff02::114"
#define DEFAULT_PORT_STR "9000"

/* Cmdline Arguments */
char *prog_name = NULL;
char *mcast_ip = NULL;
char *port = NULL;

/* Sockets as global, used everywhere, even in die() */
int mcast_sock = -1; /* Multicast socket for receiving data */
int ucast_sock = -1; /* Unicast socket for give feedback to server */

/* Buffer used for earch recvfrom() */
char recvbuf[MULTICAST_RECV_BUF];
/* Huge ring buffer to absorb consumer speed variations without loosing datagrams */
dgrambuf_t dgrambuf;

/* Strings to print out representation of various states of the program */
const char * const state_str[] = {
	"start",
	"wait_hello_and_connect_back",
	"wait_start_and_start_job",
	"receive_data",
	"finalize_job",
	"is_there_more_job"
};

/* Some boring funcs you didn't want to read now */
void die(char* msg);
void usage(char *msg);
void arg_parse(int argc, char* argv[]);
void fsm_trace(int state);
int get_available_mem_kb();
void set_O_NONBLOCK(int fd, int set);
void dgrambuf_init();
int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq);
int send_status(int state, int info_r, int info_w);

/* Parts of the "protocol", definitions are after main() */
int wait_hello_and_connect_back();
int wait_start_and_start_job();
int receive_data();
int finalize_job();
int is_there_more_job();

int main(int argc, char* argv[]) {
	int state = 1; /* state of the "protocol" state machine */
	int res;

	arg_parse(argc, argv);
	dgrambuf_init();

	/*XXX Maybe elsewhere, when popen'ing target program */
	set_O_NONBLOCK(1, 1);

/*	XXX Dummy */
	fcntl(1, F_SETPIPE_SZ, 4096);
	fprintf(stderr, "pipe_size==%i\n", fcntl(1, F_GETPIPE_SZ));
	/* Finite state machine */
	while ( state > 0 ) {
		fsm_trace(state);
		switch ( state ) {
			case 1: state = (wait_hello_and_connect_back() == 0)?2:1; break;
			case 2: state = (wait_start_and_start_job() == 0)?2:3; break;
			case 3:
				res = receive_data();
				if (res==0) state = 4;
				else if (res==1) state=3;
				else state = -1;
				break;
			case 4: state = (finalize_job() == 0)?5:-2; break;
			case 5: state = (is_there_more_job() == 0)?2:0; break; /* XXX Should retry recv ? */
		}
	}
	fsm_trace(state);

	if ( mcast_sock > 0 ) {
		close(mcast_sock);
		mcast_sock = -1;
	}

	dgrambuf_free(&dgrambuf);

	if ( state < 0 ) {
		return -state;
	}

	return EXIT_SUCCESS;
}


int wait_hello_and_connect_back() {
	/* Buffers for host and service strings after resolve */
	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
	/* Server address, filled by system after first recvfrom */
	struct sockaddr_storage peer_addr;
	socklen_t peer_addr_len;
	/* Various needed variables */
	ssize_t nread;
	int res;

	/* Setup mcast_sock */
	if ( mcast_sock > 0 ) {
		close(mcast_sock);
		mcast_sock = -1;
	}
	mcast_sock = mcast_recv_socket(mcast_ip, port, MULTICAST_SO_RCVBUF_WANTED);
	if(mcast_sock < 0) {
		usage("Could not setup multicast socket. Wrong args given ?");
	}

	/* Wait for a single datagram from the server (for sync, no check on contain) */
	peer_addr_len = sizeof(struct sockaddr_storage);
	nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, (struct sockaddr *) &peer_addr, &peer_addr_len);
 	if (nread < 0 ) {
		perror("recvfrom() failed");
		return -1;
	}
	/* Get peer informations as strings from peer_addr */
	res = getnameinfo((struct sockaddr *) &peer_addr, peer_addr_len,
			 hbuf, NI_MAXHOST, sbuf, NI_MAXSERV, NI_NUMERICSERV);
	if ( res != 0 ) {
		fprintf(stderr, "getnameinfo: %s\n", gai_strerror(res));
		return -2;
	}
	/* Connect back to the server, with reliable unicast */
	if ( ucast_sock > 0 ) {
		close(ucast_sock);
	}
	/* FIXME : ucast_client_socket() use DNS resolver and could block */
	ucast_sock = ucast_client_socket(hbuf,port);
	if(ucast_sock < 0) {
		fprintf(stderr, "Could not setup unicast socket or connect to %s:%s\n", hbuf, port);
		return -3;
	}

	return 0;
}

int wait_start_and_start_job() {
	ssize_t nread, nwrite;

	/* Wait for a "start" datagram from the server */
	nread = recvfrom(mcast_sock, recvbuf, MULTICAST_RECV_BUF, 0, NULL, 0);
 	if (nread < 0 ) {
		perror("recvfrom() failed");
		return -1;
	}
	if ( nread >= 5 && strncmp("start", recvbuf, 5) == 0 ) {
		/* Reply "ready" through unicast stream socket */
		nwrite = write(ucast_sock, "ready", 5);
		if ( nwrite < 0 ) {
			fprintf(stderr, "write() failed\n");
			return -2;
		}
		if (nwrite != 5) {
			fprintf(stderr, "write() short\n");
			return -3;
		}

		return 1;
	}

	return 0;
}

/*
#define DGRAMBUF_RECV_OVERWRITE 1 << 1
#define DGRAMBUF_RECV_EINTR 1 << 2
#define DGRAMBUF_RECV_IOVEC_FULL 1 << 3
#define DGRAMBUF_RECV_FINALIZE 1 << 4
#define DGRAMBUF_RECV_VALID_DGRAM 1 << 5

#define DGRAMBUF_WRITE_PARTIAL 1 << 1
#define DGRAMBUF_WRITE_EWOULDBLOCK_OR_EINTR 1 << 2
#define DGRAMBUF_WRITE_IOVEC_FULL 1 << 3
#define DGRAMBUF_WRITE_SUCCESS 1 << 4
*/
int receive_data() {
	int info_r, info_w, res;
	ssize_t nread, nwrite;
	static int noop_calls_count = 0;

	/* Read (blocking, timeout = 1 sec) */
	nread = dgrambuf_recvmmsg(dgrambuf, mcast_sock, 1, &info_r);
	if ( nread < 0 ) {
		return nread;
	}

	/* Write (non-blocking) */
	nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
	if ( nwrite < 0 ) {
		return nwrite;
	}

	/*fprintf(stderr, "receive_data(): nread == %zi, nwrite == %zi\n", nread, nwrite);*/

	/* XXX Crapy dead state detection */
	if ( nread == 0 /* TEST && nwrite == 0 */ ) {
		if ( noop_calls_count > 10 ) {
			return 0;
		}
		noop_calls_count++;
	} else {
		noop_calls_count = 0;
	}

	/* Consider sending status back to seeder */
	res = send_status(1, info_r, info_w);
	if ( res < 0 ) {
		return res;
	}

	if ( dgrambuf_have_received_everything(dgrambuf) ) {
		return 0;
	}
	return 1;
}


int finalize_job() {
	ssize_t nwrite;
	int info_w, res;
	char *stats;

	/* Don't eat reources in a pooling fashion, blocking IO is fine when no more recv to do */
	set_O_NONBLOCK(1, 0);

	/* Flush the whole buffer */
	do {
		nwrite = dgrambuf_write(dgrambuf, 1, &info_w);
		if ( nwrite < 0 ) {
			return nwrite;
		}
		fprintf(stderr, "finalize_job(): nwrite == %zi\n", nwrite);
	} while ( nwrite > 0);

	/* Inform the seeder that have have finished */
	res = send_status(2, 0, info_w);
	if ( res < 0 ) {
		return res;
	}

	res = dgrambuf_stats(dgrambuf, &stats);
	if ( res != - 1 ) {
		fprintf(stderr, "finalize_job(): dgrambuf_stats : %s\n",stats);
		free(stats);
	}
	return 0;
}

int is_there_more_job() {
	return 1;
}




void die(char* msg) {
	fprintf(stderr, "%s\n", msg);
	if (mcast_sock > 0)
		close(mcast_sock);
	if (ucast_sock > 0)
		close(ucast_sock);
	exit(EXIT_FAILURE);
}

void usage(char *msg) {
	char ubuf[256];
	if ( msg != NULL )
		fprintf(stderr, "%s\n", msg);
	ubuf[0] = '\0';
	snprintf(ubuf, 255, "Usage: %s [port] [mcast_ip]\n", prog_name);
	die(ubuf);
}

void arg_parse(int argc, char* argv[]) {
	prog_name = argv[0];
	if ( argc > 3 )
		usage("Too many arguments");
	port = (argc >= 2)?argv[1]:DEFAULT_PORT_STR;
	mcast_ip = (argc >= 3)?argv[2]:DEFAULT_MCAST_IP_STR;
}

void fsm_trace(int state) {
	static int prev_state = 0;

	if ( state < 0 ) {
		fprintf(stderr, "Abnormal exit condition %i (from %s)\n", state, state_str[prev_state]);
	} else	if ( prev_state != state) {
		if ( state == 0 ) {
			fprintf(stderr, "Normal exit (from %s)\n",  state_str[prev_state]);
		} else {
			fprintf(stderr, "Now in %s (from %s)\n", state_str[state], state_str[prev_state]);
		}
		prev_state = state;
	}
}

int get_available_mem_kb() {
	char key[64];
	int res, value, found=0;
	FILE * fh = fopen("/proc/meminfo", "r");
	if ( fh ) {
		while (!found && !feof(fh)) {
			res = fscanf(fh, "%63s %i kB\n", key, &value);
			if ( res < 0 )
				break;
			found = ( strncmp("MemAvailable:", key, 12) == 0 );
		}
		fclose(fh);
	}

	if ( found && value > 0 ) {
		return value;
	}

	return 0;
}

void set_O_NONBLOCK(int fd, int set) {
	int res, flags;

	flags = fcntl(fd, F_GETFL);
	if ( flags == -1 ) {
		perror("fcntl(1, F_GETFL)");
	}
	if ( set ) {
		res = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
	} else {
		res = fcntl(fd, F_SETFL, flags & !O_NONBLOCK);
	}
	if ( res == -1 ) {
		perror("fcntl(1, F_SETFL)");
	}
}

void dgrambuf_init() {
	/* Guess dgrambuf size from global free memory */
	size_t dgram_count;
	int avail_mem = get_available_mem_kb();

	if ( avail_mem < MULTICAST_SO_RCVBUF_WANTED ) {
		dgram_count = MULTICAST_SO_RCVBUF_WANTED / MULTICAST_RECV_BUF;
	} else {
		dgram_count = avail_mem / MULTICAST_RECV_BUF / 2 * 1024;
	}
	/* XXX Dummy
	dgram_count = 5;
	*/

	/* Allocate dgrambuf */
	dgrambuf = dgrambuf_new(dgram_count, MULTICAST_RECV_BUF, DGRAM_HEADER_SIZE, MAX_IOVEC);
	if ( dgrambuf == NULL ) {
		perror("dgrambuf_new/malloc");
		exit(EXIT_FAILURE);
	}

	fprintf(stderr, "dgrambuf_get_free_count() => %zu\n", dgrambuf_get_free_count(dgrambuf));
	dgrambuf_set_validate_func(dgrambuf, validate_data_dgram);
}

int validate_data_dgram(unsigned int nread, void *recvbuf, unsigned int *seq) {

	if ( nread < DGRAM_HEADER_SIZE ) {
		return 0;
	}
	if ( strncmp("data", recvbuf, 4) == 0 ) {
		*seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
		return 1;
	}
	if ( strncmp("end:", recvbuf, 4) == 0 ) {
		*seq = ntohl( *( (uint32_t *) recvbuf+1 ) );
		return 2;
	}
	return 0;
}

int send_status(int state, int info_r, int info_w) {
	if ( state && info_r && info_w ) {}
	/* TODO Implement it */
	return 0;
}