From b88c7acdf87743ac7ec8c5e86156dbde030cb860 Mon Sep 17 00:00:00 2001 From: Ludovic Pouzenc Date: Sun, 2 Dec 2018 20:36:47 +0100 Subject: Inital import. No options supported except -j. Parallel "du -sk --apparent-size" --- pidu | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100755 pidu (limited to 'pidu') diff --git a/pidu b/pidu new file mode 100755 index 0000000..eef6ca3 --- /dev/null +++ b/pidu @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +import sys, os, argparse +from multiprocessing import Process, JoinableQueue, Queue, cpu_count + +# Forked process main proc +def proc_du(q_in,counters_count, q_out): + # count size in local process memory + local_counters = [0] * counters_count + # consume messages from q_in until special 'done' message + while True: + (path, counter_indice, special) = q_in.get() + if special == 'done': + break + try: + for it in os.scandir(path): + # Sum up file or folder physicial size (in blocks) to local_counters + # Note : this is --apparent-size mode of du, not default mode + local_counters[counter_indice] += it.stat(follow_symlinks=False).st_blocks + # Put more work on queue (could be taken by an other process) + if it.is_dir(follow_symlinks=False): + q_in.put( (it.path, counter_indice, None) ) + except: + # Don't display big traceback on file permission problems and so + (exception_type, exception, traceback) = sys.exc_info() + print("%s: %s" % (exception_type.__name__, exception), file=sys.stderr) + finally: + # Go next file or folder even if current can't be treated + q_in.task_done() + # After receiving special 'done' message, put results back into q_out + q_out.put(local_counters) + +def main(args): + q_in = JoinableQueue() + q_out = Queue() + + counters_count = len(args.path) + main_counters = [0] * counters_count + + for i in range(counters_count): + q_in.put( (os.path.realpath(args.path[i]), i, None) ) + + process_count = args.jobs[0] + process_list = [ Process(target=proc_du, args=(q_in,counters_count, q_out)) for j in range(0, process_count) ] + + try: + # Start all process + list(map(lambda p: p.start(), process_list)) + # Wait until q_in is fully consumed (all tree walked) + q_in.join() + # Inject one special message for each process (each of them will eat only one) + list(map(lambda p: q_in.put( (None, None, 'done') ), process_list)) + # Consume results and sum them + for j in range(0, process_count): + local_counters = q_out.get() + for i in range(0, counters_count): + main_counters[i] += local_counters[i] + # Display counters in KiB + for i in range(0, counters_count): + print("%i\t%s"%(main_counters[i]/2, args.path[i])) + except KeyboardInterrupt: + pass + finally: + # Forcibly kill all processes (should not be usefull in normal conditions) + list(map(lambda p: p.terminate(), process_list)) + +def check_positive(value): + try: + ivalue = int(value) + if ivalue <= 0: + raise ValueError() + except ValueError: + raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value) + return ivalue + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-j', '--jobs', nargs=1, type=check_positive, default=cpu_count(), help='Use N parallel process to stat() the filesystem' ) + parser.add_argument('path', nargs='*', default=os.getcwd()) + args = parser.parse_args() + main(args) -- cgit v1.2.3