Source code for lump.keypress

from multiprocessing import Process
from textwrap import dedent
from collections import namedtuple
import sys
import termios
import tty
import logging

logger = logging.getLogger(__name__)
Option = namedtuple('Option', ('input', 'description', 'callback'))

stdout = sys.stdout


[docs]class Options:
[docs] class NO_ACTION: pass
def __init__(self, auto_include_help=None): self._options = {} self.help_title = 'Available Options' self._init_(auto_include_help) def _init_(self, auto_include_help=None): if auto_include_help is None or auto_include_help: self.register('help', self.print_help, dedent(self.print_help.__doc__).strip())
[docs] def print_help(self): """ provides a help message of possible options """ print(self.help_title) print('=' * len(self.help_title)) print() print('\n'.join(['\t'.join((option.input, option.description)) for option in self._options.values()])) print() if len(self._options) == 1: logger.warning('No options currently registered')
[docs] def register(self, value, callback, description): if type(value) is not str or len(value) == 0: raise ValueError("The value needs to be a string with length > 0") if value in self._options: raise ValueError("An action for %r has already been defined" % (value,)) self._options[value] = Option(value, description, callback)
[docs] def run(self, value, *args): if value not in self._options: logger.warning('Unknown command %r, possible commands: %s', value, ', '.join(self._options.keys())) return self.NO_ACTION return self._options[value].callback(*args)
[docs]class CharOptions(Options): def _init_(self, auto_include_help=None): if auto_include_help is None or auto_include_help: self.register('?', self.print_help, dedent(self.print_help.__doc__).strip()) def _process(self, char): self.run(char)
[docs] def register(self, value, callback, description): if len(value) != 1 or type(value) is not str: raise ValueError("Expected character to be a string of length 1") super().register(value, callback, description)
[docs]class ComplexOptions(Options): """ Options with possibility of arguments """
[docs] def run(self, value, *args): raise NotImplementedError()
[docs]class InteractiveTerminalHandler: """ A handler that reads commands from stdin in a separate process, and handles any actions that may be associated with it. """ def _process(self, char): if char == '\x7f': # rudimentary backspace support self._buffer = self._buffer[:-1] stdout.write('\n') stdout.write(self._buffer) stdout.flush() return stdout.write(char) stdout.flush() if char == '\n': self.options.run(self._buffer) self._buffer = '' else: self._buffer += char def __init__(self): self.process = None self._init_() def _init_(self): self.options = Options() self._buffer = ''
[docs] def register(self, *args, **kwargs): return self.options.register(*args, **kwargs)
[docs] def start(self): """ Start the process for handling keypresses, this should be the last thing called as it will block further script execution... """ self.process = Process(target=self.listen) self.process.start() self.process.join()
[docs] def listen(self): sys.stdin = open(0) while True: try: char = self._get_char() self._process(char) except (KeyboardInterrupt, EOFError, SystemExit) as e: return except Exception as e: logger.exception(e)
@staticmethod def _get_char(): """ Read a character from stdin :return: Character :rtype: str """ fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setcbreak(sys.stdin.fileno()) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch
[docs]class KeyPressHandler(InteractiveTerminalHandler): """ A handler that reads keypresses from stdin in a separate process, and handles any actions that may be associated with that keypress. """ def _init_(self): self.options = CharOptions()
[docs] def register(self, *args, **kwargs): return self.options.register(*args, **kwargs)
def _process(self, char): self.options.run(char)