#!/bin/env python3 ''' lljobstat command. Read job_stats files, parse and aggregate data of every job, show top jobs ''' import argparse import errno import subprocess import sys import time import yaml import signal try: from yaml import CLoader as Loader except ImportError: print("Install libyaml-dev for faster processing", file=sys.stderr) from yaml import Loader signal.signal(signal.SIGINT, signal.default_int_handler) class ArgParser: # pylint: disable=too-few-public-methods ''' Class to define lljobstat command arguments and parse the real command line arguments. ''' def __init__(self): self.args = None def run(self): ''' define and parse arguments ''' parser = argparse.ArgumentParser(prog='lljobstat', description='List top jobs.') parser.add_argument('-c', '--count', type=int, default=5, help='the number of top jobs to be listed (default 5).') parser.add_argument('-i', '--interval', type=int, default=10, help='the interval in seconds to check job stats again (default 10).') parser.add_argument('-n', '--repeats', type=int, default=-1, help='the times to repeat the parsing (default unlimited).') parser.add_argument('--param', type=str, default='*.*.job_stats', help='the param path to be checked (default *.*.job_stats).') parser.add_argument('-o', '--ost', dest='param', action='store_const', const='obdfilter.*.job_stats', help='check only OST job stats.') parser.add_argument('-m', '--mdt', dest='param', action='store_const', const='mdt.*.job_stats', help='check only MDT job stats.') parser.add_argument('--fullname', action='store_true', default=False, help='show full operation name (default False).') parser.add_argument('--no-fullname', dest='fullname', action='store_false', help='show abbreviated operations name.') self.args = parser.parse_args() class JobStatsParser: ''' Class to get/parse/aggregate/sort/print top jobs in job_stats ''' op_keys = { 'ops': 'ops', 'cr' : 'create', 'op' : 'open', 'cl' : 'close', 'mn' : 'mknod', 'ln' : 'link', 'ul' : 'unlink', 'mk' : 'mkdir', 'rm' : 'rmdir', 'mv' : 'rename', 'ga' : 'getattr', 'sa' : 'setattr', 'gx' : 'getxattr', 'sx' : 'setxattr', 'st' : 'statfs', 'sy' : 'sync', 'rd' : 'read', 'wr' : 'write', 'pu' : 'punch', 'mi' : 'migrate', 'fa' : 'fallocate', 'dt' : 'destroy', 'gi' : 'get_info', 'si' : 'set_info', 'qc' : 'quotactl', 'pa' : 'prealloc' } def __init__(self): self.args = None def list_param(self, param_pattern): # pylint: disable=no-self-use ''' list param paths with given param pattern ''' cmd = ['lctl', 'list_param', param_pattern] try: output = subprocess.check_output(cmd).decode() return output.splitlines() except subprocess.CalledProcessError as err: if err.returncode == errno.ENOENT: return [] def parse_single_job_stats(self, param): # pylint: disable=no-self-use ''' read single job_stats file, parse it and return an object ''' cmd = ['lctl', 'get_param', '-n', param] out = subprocess.check_output(cmd).decode() output = out.replace('job_id: @', f'job_id: .') try: yaml_obj = yaml.load(output, Loader=Loader) # need several seconds... except yaml.scanner.ScannerError: # only print the file name here print("failed to parse the content of %s" % param, file=sys.stdout) raise return yaml_obj def merge_job(self, jobs, job): ''' merge stats data of job to jobs ''' job2 = jobs.get(job['job_id'], {}) for key in job.keys(): if key not in self.op_keys.values(): continue if job[key]['samples'] == 0: continue job2[key] = job2.get(key, 0) + job[key]['samples'] job2['ops'] = job2.get('ops', 0) + job[key]['samples'] job2['job_id'] = job['job_id'] jobs[job['job_id']] = job2 def insert_job_sorted(self, top_jobs, count, job): # pylint: disable=no-self-use ''' insert job to top_jobs in descending order by the key job['ops']. top_jobs is an array with at most count elements ''' top_jobs.append(job) for i in range(len(top_jobs) - 2, -1, -1): if job['ops'] > top_jobs[i]['ops']: top_jobs[i + 1] = top_jobs[i] top_jobs[i] = job else: break if len(top_jobs) > count: top_jobs.pop() def pick_top_jobs(self, jobs, count): ''' choose at most count elements from jobs, put them in an array in descending order by the key job['ops']. ''' top_jobs = [] for _, job in jobs.items(): self.insert_job_sorted(top_jobs, count, job) return top_jobs def print_job(self, job): ''' print single job ''' print('- %-16s {' % (job['job_id'] + ':'), end='') first = True for key, val in self.op_keys.items(): if not val in job.keys(): continue if not first: print(", ", end='') opname = key if self.args.fullname: opname = self.op_keys[key] print('%s: %d' % (opname, job[val]), end='') if first: first = False print('}') def print_top_jobs(self, top_jobs): ''' print top_jobs in YAML ''' print('---') # mark the begining of YAML doc in stream print("timestamp: %d" % int(time.time())) print("top_jobs:") for job in top_jobs: self.print_job(job) print('...') # mark the end of YAML doc in stream def run_once(self): ''' scan/parse/aggregate/print top jobs in given job_stats pattern/path(s) ''' jobs = {} for param in self.list_param(self.args.param): obj = self.parse_single_job_stats(param) if obj['job_stats'] is None: continue for job in obj['job_stats']: self.merge_job(jobs, job) top_jobs = self.pick_top_jobs(jobs, self.args.count) self.print_top_jobs(top_jobs) def run_once_retry(self): ''' Call run_once. If run_once succeeds, return. If run_once throws an exception, retry for few times. ''' for i in range(2, -1, -1): # 2, 1, 0 try: return self.run_once() except: # pylint: disable=bare-except if i == 0: raise def run(self): ''' run task periodically or for some times with given interval ''' argparser = ArgParser() argparser.run() self.args = argparser.args i = 0 try: while True: self.run_once_retry() i += 1 if self.args.repeats != -1 and i >= self.args.repeats: break time.sleep(self.args.interval) except (KeyboardInterrupt): print("\nReceived KeyboardInterrupt - stopping") sys.exit() if __name__ == "__main__": JobStatsParser().run()