13 June 2011

Posted by: Duncan
Tags: Python

Actually, the title is a misnomer. It should really be something like "Using psutil, subprocess and threading to implement an asynchronous process control and monitoring solution with Python without throwing your chair out of the window", but that would be far too long.

I was recently tasked with developing a distributed process monitoring system - in Python - instilling that same frisson of excitement that I experienced on building my first asynchronous system trap on a VMS $QIO call (about 100 years ago). Trad SNMP solutions were out, so no Zenoss or Nagios; the system had to be wholly bespoke. It had to control application processes over a very large network and gather and report performance metrics.

There was another catch; the solution could not install any system-wide software, such as Rabbit or AMQP; all software had to rely on software already available with RHEL or packages installed via pip in a virtualenv.

My design was based on a Python server that was installed on each node on the network and which attached 'monitors' to each of the non-Python processes that implemented the actual application. A server could talk to other servers via HTTP. Each 'monitor' used the psutil package to collect statistics and to control process state.

With a list of executables derived from a local pre-defined list, the server would start each in turn;

import subprocess

for i, process in enumerate(process_list):
   try:
      process_list[i]['process'] = subprocess.Popen ( 
         args, 
         stdout=subprocess.PIPE, 
         stderr=subprocess.STDOUT, 
         stdin=subprocess.PIPE )
   except (OSError), why:
      print str(why)
      sys.exit(1)

subprocess.Popen() executes whatever path is defined in args and returns an object whose properties identify the running process. To actually monitor the process, I wanted to attach something to the standard output of the process (I could then parse that output, look for certain error states, and redirect to a log process). To do this I came up with a simple threaded listener class;

import threading
from collections import deque

class Listener(threading.Thread):
   """
   Class to listen on a process's standard output and collect into a deque. It
   cycles the deque at max_lines. Data is available in Listener.data.
   """
   def __init__(self, process):
      self.process = process
      self.line_num = 0
      self.max_lines = 200
      self.done = False
      self.data = deque()
      self._stop = threading.Event()
      threading.Thread.__init__ ( self )
      
   def __repr__(self):
      return 'Listener(pid=%d)' % int(self.process.pid)
      
   def run(self):
      """ Run the thread, called by Listener.start() """
      self.line_num = 0      
      self.done = False
      while self.done is False:
         
         if self.process != 0:
            line = self.process.stdout.readline()
            if line:
               """ 
               Got some data. Append to deque. If full, 
               rotate it and append to end. 
               """
               if self.line_num < self.max_lines:
                  self.data.append (line)
                  self.line_num += 1
               else:
                  self.data.rotate(-1)
                  self.data[self.line_num-1] = line
            else:
               self.done = True
         else:
            # Snooze a second
            time.sleep(1)
     
   def stop (self):
      self._stop.set()
      self.done = True
      
process_list[i]['listener'] = Listener(process_list[i]['process'])
process_list[i]['listener'].start()

The final puzzle was how to collect the process statistics needed to ensure that a process wasn't going into meltdown (a good test is to monitor the amount of memory it's consuming). However, the subprocess package couldn't do this, but psutil could.

For a given PID, I simply use the Process class;

import psutil
proc = psutil.Process(process_list[i]['process'].pid)
print proc.get_memory_percent()
0.211289

psutil also provides methods to suspend, resume and kill processes. Killing can be problematic because the target process might deflect some incoming signals. For example;

proc.send_signal(1) # NB: 1 = SIGHUP

which might not work if the process is catching and ignoring shutdown signals. The way around this is to use a signal that the process cannot catch and ignore: SIGKILL;

if secs_elapsed > 10:
   self.signum = 9  # NB: 9 = SIGKILL
proc.send_signal(self.signum)
try:
   proc.wait(timeout=2)
except psutil.TimeoutExpired:
   time.sleep(1)
secs_elapsed += 2

This guarantees that a process will die while giving it every opportunity to shutdown gracefully if it is catching SIGHUP signals (or even SIGINTs).

Helpfully, psutil provides the suspend() and resume() methods, which could be used in conjunction with the status attribute to see whether a process is suspended or not.

So there is a bare bones process execution, monitoring, output handling and control application. The last part of the jigsaw was to include an HTTP listener so that the running monitor processes could receive remote instructions (such as to install new software or restart a process). Again, this was surprisingly easy. I just passed my final process_list into the response handler of a SimpleHTTPServer object; remote clients could then request process statistics or even the contents of a running process over HTTP, in addition to issuing command instructions.

Links