Source code for opentea.plugin

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""plugin.py

TODO

Created Nov 2016 by COOP team
"""

from __future__ import absolute_import

import os
import sys
import shlex
import logging
import subprocess
import json
import warnings

from os.path import dirname, abspath, isfile, join
from .exceptions import OTTooManyNodesException, OTInterrupt
from . import PathTools
from . import __version__

__all__ = ["Plugin"]


[docs]class Plugin(PathTools): """Base class for all plugins""" def __init__(self, dataset, base_dir, typ, **kwargs): PathTools.__init__(self, base_dir) self.ds = dataset self.dataset = dataset self.type = typ self.dir2send = [] self._dir2send_curdir = "" self.name = self.__class__.__name__ self.nbcores = 0 self.enable_sav = False self.action = None self.exec_directory = None self.appli = None self.options = "" self.supported_tools = [] self.supported_codes = [] self.command_exe = None self.machine = "" self.login = "" self.ssh_setup = "" self.full_host = "" self.distant_directory = "" self.log = logging.getLogger(__name__) self.read_json = False self.__dict__.update(**kwargs) self._check_version() def _check_version(self): """Check plugin version for force update when OpenTEA does""" # Backwards compat: 2.1 didn't check version # If undeclared in plugin, 2.1 is assumed if not hasattr(self, "version"): self.version = "2.1" ot_version = ".".join(__version__.split('.')[:2]) assert all((ot_version >= self.version, ot_version.split()[0] == self.version.split()[0])), ( "The version detected for your plugin is {slf}, but " "the OpenTEA version is {ot}. Either you have not " "declared your version in the plugin or it is " "incompatible.".format( slf=self.version, ot=ot_version))
[docs] def check_support(self): """Check that appli is indeed supported by plugin""" warnings.warn("check_support is deprecated from version 2.2", DeprecationWarning) if self.appli not in self.supported_tools + self.supported_codes: raise OTInterrupt("Application {0} is neither in " "supported tools nor codes".format(self.appli)) self.log.debug("Plugin {0} : Running execute_action" " {1} in {2} ({3})".format(self.name, self.action, self.exec_directory, self.appli))
@property def env(self): """Environment variables. If self.read_json, read ~/.env.json file generated by the $OPENTEA_HOME/tools/ensure_env.py tool to ensure the environment of the shell is sent to the subprocess. """ env = dict(os.environ) if self.read_json: env_file = env['HOME'] + '/.env.json' assert isfile(env_file), ( "This plugin expects to find a {} file.\n".format(env_file) + "Please create it using the " "$OPENTEA_HOME/tools/ensure_env.py tool") env.update(dict((k, v.encode('utf8')) for k, v in json.load(open(env_file)).items())) return env
[docs] def execute_local(self, command=None, from_dir=None): """Execute the script file command (if applicable)," " otherwise command, locally""" if command is None: command = self.command_exe if from_dir is None: from_dir = self.exec_directory start_dir = os.getcwd() if from_dir is not None: os.chdir(self.abspath(from_dir)) script_dir = dirname(abspath(sys.modules['__main__'].__file__)) if isfile(join(script_dir, command)): command = join(script_dir, command) self.log.debug("Executing command :") for line in command.split('\n'): self.log.debug(line) command = shlex.split(command) # self.log.debug(repr(command) + ' in ' # + repr(os.getcwd()) + ':\n' + 50*'-' + '\n') read_from = None if "<" in command: read_from = command[-1] command = command[:-2] subp = subprocess.Popen(command, stdin=subprocess. PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) if read_from: subp.stdin.write(open(read_from, "r").read()) stdout_data = [] self.log.info("=============StdOut=================") while True: line = subp.stdout.readline() if not line: break self.log.info(' ' + line.rstrip()) sys.stdout.flush() stdout_data.append(line) returncode = subp.wait() stderr_data = subp.stderr.read() if not returncode: self.log.debug("=============StdErr=================") for dat in stderr_data.split('\n'): self.log.debug(" " + dat) else: self.log.error("Problem while running command :") for line in " ".join(command).split('\n'): self.log.error(" > " + line) self.log.error( "=============StdErr=================" + stderr_data) raise RuntimeError os.chdir(start_dir) output = "".join(stdout_data) return output
[docs] def ssh_host(self): """reconstruct full host description""" self.full_host = self.machine if not self.machine: self.log.error("SSH host unspecified") if self.login: self.full_host = self.login + '@' + self.machine
[docs] def execute_ssh(self, command=None, options=""): """Wrapper for call to execute_local through ssh""" # TODO: check that we are on a platform were ssh actually exists self.ssh_host() if command is None: command = self.command_exe if self.ssh_setup: command = self.ssh_setup + " \n " + command self.log.debug("Executing via ssh: ") for line in command.split('\n'): self.log.debug(line) return self.execute_local("ssh {0} {1} '{2}'".format( options, self.full_host, command))
[docs] def ssh_prepare_directory(self): """ Prepare (creates) a directory on a distant server """ if not self.distant_directory: self.log.error( "Please specify a distant directory, e.g.: /home/toto/") if (self.distant_directory[0] != '/' and self.distant_directory[0] != '~'): self.log.error( "Relative path given. Please specify an absolute path") command = """mkdir -p """ + self.distant_directory self.execute_ssh(command)
[docs] def execute_action(self, action, exec_directory, appli, add_dirs=None): """This method goes into distant exec_directory and executes action (like -auto-exec-)""" raise NotImplementedError("Plugin : execute_action() " "is not implemented in current plugin ..." "Please implement this command " "in your current plugin.")
[docs] def ssh_send(self, local_directories_list): """ Uploads files to a server using ssh. This creates a tar archive, and pipes it through ssh to a 'tar xf' on the distant side. In short, the command that we run is: tar cvfh - -C directory1 directory2 | ssh distant_server "tar xfh - -C distant_directory" (with some additional safety) """ # TODO: check that we are on a platform where tar, scp, ssh actually # exist # TODO: Check that local_directories_list is actually a list. # People will try with string, and it does bad things with strings. self.ssh_host() if not local_directories_list: self.log.debug("ssh_send : No directories to be sent") return self.log.debug( "ssh_send : Sending and extracting archive if needed...") # Clean distant_path (no final slash) self.distant_directory = self.distant_directory.rstrip('/') # Decomposition # Bash : passage en bash # -c :(...) # tar cvf : compresse # local directoryes_list (liste de tous les rep a envoyer) # - : vers le pipe # | : pipe # ssh mylogin@host - # \:(...) # tarc xv : decompresse # - depuis le pipe # -C:(...) # distant directory : Dans la directory distante # \ :(...) # TODO: replace with an rsync to save a lot of time def ssh_send_one(path): """Send 1 directory (path): go to path/.. and tar $(basename path).""" command = ( 'bash -c \"tar cvfh - -C {0}/../ $(basename {0}) ' '| ssh {1.full_host} ' '\\\"tar xfh - -C {1.distant_directory}\\\" \"'.format( self.abspath(path), self)) self.execute_local(command) self.log.debug("ssh_send : command " + command) for directory in local_directories_list: ssh_send_one(directory)
[docs] def send_directory(self, directory): """ This method adds a directory to the run archive. """ # basename(abspath()) removes the trailing '/'s. # local_directory = os.path.abspath(directory) # directory_name = os.path.basename(local_directory) local_directory = os.path.relpath(directory) # We don't check that relpath doesn't contain some '../../../'. # Might be an issue. # tar removes them on its own, a plain scp might not. # Also, handling of symlinks is unknown. # Check that directory exists if not os.path.exists(local_directory): raise OTInterrupt(directory + " doesn't exist") # Check that we didn't change working directory along the way. if self.dir2send: if os.getcwd() != self._dir2send_curdir: raise OTInterrupt("Current directory changed between " "calls to sendDirectory(). " "This is not supported !") else: self._dir2send_curdir = os.getcwd() # Add to the list of files to be sent self.dir2send.append(local_directory)
[docs] def retrieve_directory(self, directory): """This method retrieves directory from the distant place """ raise NotImplementedError("Plugin : retrieveDirectory " "is not implemented in current plugin ...")
[docs] def remove_directory(self, directory): """This method removes directory from the distant place """ raise NotImplementedError("Plugin : removeDirectory " "is not implemented in current plugin ...")
[docs] def get_plugin_param(self, param_name): """getValue wrapper for specific plugin type""" try: return self.dataset.getValue( param_name, self.name, self.type + "_plugins") except OTTooManyNodesException: return self.dataset.getValue(param_name, self.name, self.type + "_plugins", self.dataset.getValue("name", "solver", "meta"))