#!/usr/bin/env python3 import sys, os, argparse from multiprocessing import Process, JoinableQueue, Queue, cpu_count # Forked process main proc def proc_du(q_in,counters_count, q_out): # count size in local process memory local_counters = [0] * counters_count # consume messages from q_in until special 'done' message while True: (path, counter_indice, special) = q_in.get() if special == 'done': break try: for it in os.scandir(path): # Sum up file or folder physicial size (in blocks) to local_counters # Note : this is --apparent-size mode of du, not default mode local_counters[counter_indice] += it.stat(follow_symlinks=False).st_blocks # Put more work on queue (could be taken by an other process) if it.is_dir(follow_symlinks=False): q_in.put( (it.path, counter_indice, None) ) except: # Don't display big traceback on file permission problems and so (exception_type, exception, traceback) = sys.exc_info() print("%s: %s" % (exception_type.__name__, exception), file=sys.stderr) finally: # Go next file or folder even if current can't be treated q_in.task_done() # After receiving special 'done' message, put results back into q_out q_out.put(local_counters) def main(args): q_in = JoinableQueue() q_out = Queue() counters_count = len(args.path) main_counters = [0] * counters_count for i in range(counters_count): q_in.put( (os.path.realpath(args.path[i]), i, None) ) process_count = args.jobs[0] process_list = [ Process(target=proc_du, args=(q_in,counters_count, q_out)) for j in range(0, process_count) ] try: # Start all process list(map(lambda p: p.start(), process_list)) # Wait until q_in is fully consumed (all tree walked) q_in.join() # Inject one special message for each process (each of them will eat only one) list(map(lambda p: q_in.put( (None, None, 'done') ), process_list)) # Consume results and sum them for j in range(0, process_count): local_counters = q_out.get() for i in range(0, counters_count): main_counters[i] += local_counters[i] # Display counters in KiB for i in range(0, counters_count): print("%i\t%s"%(main_counters[i]/2, args.path[i])) except KeyboardInterrupt: pass finally: # Forcibly kill all processes (should not be usefull in normal conditions) list(map(lambda p: p.terminate(), process_list)) def check_positive(value): try: ivalue = int(value) if ivalue <= 0: raise ValueError() except ValueError: raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value) return ivalue if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('-j', '--jobs', nargs=1, type=check_positive, default=cpu_count(), help='Use N parallel process to stat() the filesystem' ) parser.add_argument('path', nargs='*', default=os.getcwd()) args = parser.parse_args() main(args)