#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""plugin.py
TODO
Created Nov 2016 by COOP team
"""
from __future__ import absolute_import
import os
import sys
import shlex
import logging
import subprocess
import json
import warnings
from os.path import dirname, abspath, isfile, join
from .exceptions import OTTooManyNodesException, OTInterrupt
from . import PathTools
from . import __version__
__all__ = ["Plugin"]
[docs]class Plugin(PathTools):
"""Base class for all plugins"""
def __init__(self, dataset, base_dir, typ, **kwargs):
PathTools.__init__(self, base_dir)
self.ds = dataset
self.dataset = dataset
self.type = typ
self.dir2send = []
self._dir2send_curdir = ""
self.name = self.__class__.__name__
self.nbcores = 0
self.enable_sav = False
self.action = None
self.exec_directory = None
self.appli = None
self.options = ""
self.supported_tools = []
self.supported_codes = []
self.command_exe = None
self.machine = ""
self.login = ""
self.ssh_setup = ""
self.full_host = ""
self.distant_directory = ""
self.log = logging.getLogger(__name__)
self.read_json = False
self.__dict__.update(**kwargs)
self._check_version()
def _check_version(self):
"""Check plugin version for force update when OpenTEA does"""
# Backwards compat: 2.1 didn't check version
# If undeclared in plugin, 2.1 is assumed
if not hasattr(self, "version"):
self.version = "2.1"
ot_version = ".".join(__version__.split('.')[:2])
assert all((ot_version >= self.version,
ot_version.split()[0] == self.version.split()[0])), (
"The version detected for your plugin is {slf}, but "
"the OpenTEA version is {ot}. Either you have not "
"declared your version in the plugin or it is "
"incompatible.".format(
slf=self.version, ot=ot_version))
[docs] def check_support(self):
"""Check that appli is indeed supported by plugin"""
warnings.warn("check_support is deprecated from version 2.2",
DeprecationWarning)
if self.appli not in self.supported_tools + self.supported_codes:
raise OTInterrupt("Application {0} is neither in "
"supported tools nor codes".format(self.appli))
self.log.debug("Plugin {0} : Running execute_action"
" {1} in {2} ({3})".format(self.name,
self.action,
self.exec_directory,
self.appli))
@property
def env(self):
"""Environment variables.
If self.read_json, read ~/.env.json file generated by the
$OPENTEA_HOME/tools/ensure_env.py tool to ensure the environment
of the shell is sent to the subprocess.
"""
env = dict(os.environ)
if self.read_json:
env_file = env['HOME'] + '/.env.json'
assert isfile(env_file), (
"This plugin expects to find a {} file.\n".format(env_file) +
"Please create it using the "
"$OPENTEA_HOME/tools/ensure_env.py tool")
env.update(dict((k, v.encode('utf8'))
for k, v in json.load(open(env_file)).items()))
return env
[docs] def execute_local(self, command=None, from_dir=None):
"""Execute the script file command (if applicable),"
" otherwise command, locally"""
if command is None:
command = self.command_exe
if from_dir is None:
from_dir = self.exec_directory
start_dir = os.getcwd()
if from_dir is not None:
os.chdir(self.abspath(from_dir))
script_dir = dirname(abspath(sys.modules['__main__'].__file__))
if isfile(join(script_dir, command)):
command = join(script_dir, command)
self.log.debug("Executing command :")
for line in command.split('\n'):
self.log.debug(line)
command = shlex.split(command)
# self.log.debug(repr(command) + ' in '
# + repr(os.getcwd()) + ':\n' + 50*'-' + '\n')
read_from = None
if "<" in command:
read_from = command[-1]
command = command[:-2]
subp = subprocess.Popen(command,
stdin=subprocess.
PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.env)
if read_from:
subp.stdin.write(open(read_from, "r").read())
stdout_data = []
self.log.info("=============StdOut=================")
while True:
line = subp.stdout.readline()
if not line:
break
self.log.info(' ' + line.rstrip())
sys.stdout.flush()
stdout_data.append(line)
returncode = subp.wait()
stderr_data = subp.stderr.read()
if not returncode:
self.log.debug("=============StdErr=================")
for dat in stderr_data.split('\n'):
self.log.debug(" " + dat)
else:
self.log.error("Problem while running command :")
for line in " ".join(command).split('\n'):
self.log.error(" > " + line)
self.log.error(
"=============StdErr=================" +
stderr_data)
raise RuntimeError
os.chdir(start_dir)
output = "".join(stdout_data)
return output
[docs] def ssh_host(self):
"""reconstruct full host description"""
self.full_host = self.machine
if not self.machine:
self.log.error("SSH host unspecified")
if self.login:
self.full_host = self.login + '@' + self.machine
[docs] def execute_ssh(self, command=None, options=""):
"""Wrapper for call to execute_local through ssh"""
# TODO: check that we are on a platform were ssh actually exists
self.ssh_host()
if command is None:
command = self.command_exe
if self.ssh_setup:
command = self.ssh_setup + " \n " + command
self.log.debug("Executing via ssh: ")
for line in command.split('\n'):
self.log.debug(line)
return self.execute_local("ssh {0} {1} '{2}'".format(
options, self.full_host, command))
[docs] def ssh_prepare_directory(self):
""" Prepare (creates) a directory on a distant server """
if not self.distant_directory:
self.log.error(
"Please specify a distant directory, e.g.: /home/toto/")
if (self.distant_directory[0] != '/'
and self.distant_directory[0] != '~'):
self.log.error(
"Relative path given. Please specify an absolute path")
command = """mkdir -p """ + self.distant_directory
self.execute_ssh(command)
[docs] def execute_action(self, action, exec_directory, appli, add_dirs=None):
"""This method goes into distant exec_directory
and executes action (like -auto-exec-)"""
raise NotImplementedError("Plugin : execute_action() "
"is not implemented in current plugin ..."
"Please implement this command "
"in your current plugin.")
[docs] def ssh_send(self, local_directories_list):
""" Uploads files to a server using ssh.
This creates a tar archive, and pipes it through
ssh to a 'tar xf' on the distant side.
In short, the command that we run is:
tar cvfh - -C directory1 directory2 |
ssh distant_server "tar xfh - -C distant_directory"
(with some additional safety)
"""
# TODO: check that we are on a platform where tar, scp, ssh actually
# exist
# TODO: Check that local_directories_list is actually a list.
# People will try with string, and it does bad things with strings.
self.ssh_host()
if not local_directories_list:
self.log.debug("ssh_send : No directories to be sent")
return
self.log.debug(
"ssh_send : Sending and extracting archive if needed...")
# Clean distant_path (no final slash)
self.distant_directory = self.distant_directory.rstrip('/')
# Decomposition
# Bash : passage en bash
# -c :(...)
# tar cvf : compresse
# local directoryes_list (liste de tous les rep a envoyer)
# - : vers le pipe
# | : pipe
# ssh mylogin@host -
# \:(...)
# tarc xv : decompresse
# - depuis le pipe
# -C:(...)
# distant directory : Dans la directory distante
# \ :(...)
# TODO: replace with an rsync to save a lot of time
def ssh_send_one(path):
"""Send 1 directory (path): go to path/..
and tar $(basename path)."""
command = (
'bash -c \"tar cvfh - -C {0}/../ $(basename {0}) '
'| ssh {1.full_host} '
'\\\"tar xfh - -C {1.distant_directory}\\\" \"'.format(
self.abspath(path), self))
self.execute_local(command)
self.log.debug("ssh_send : command " + command)
for directory in local_directories_list:
ssh_send_one(directory)
[docs] def send_directory(self, directory):
""" This method adds a directory to the run archive. """
# basename(abspath()) removes the trailing '/'s.
# local_directory = os.path.abspath(directory)
# directory_name = os.path.basename(local_directory)
local_directory = os.path.relpath(directory)
# We don't check that relpath doesn't contain some '../../../'.
# Might be an issue.
# tar removes them on its own, a plain scp might not.
# Also, handling of symlinks is unknown.
# Check that directory exists
if not os.path.exists(local_directory):
raise OTInterrupt(directory + " doesn't exist")
# Check that we didn't change working directory along the way.
if self.dir2send:
if os.getcwd() != self._dir2send_curdir:
raise OTInterrupt("Current directory changed between "
"calls to sendDirectory(). "
"This is not supported !")
else:
self._dir2send_curdir = os.getcwd()
# Add to the list of files to be sent
self.dir2send.append(local_directory)
[docs] def retrieve_directory(self, directory):
"""This method retrieves directory from the distant place """
raise NotImplementedError("Plugin : retrieveDirectory "
"is not implemented in current plugin ...")
[docs] def remove_directory(self, directory):
"""This method removes directory from the distant place """
raise NotImplementedError("Plugin : removeDirectory "
"is not implemented in current plugin ...")
[docs] def get_plugin_param(self, param_name):
"""getValue wrapper for specific plugin type"""
try:
return self.dataset.getValue(
param_name, self.name, self.type + "_plugins")
except OTTooManyNodesException:
return self.dataset.getValue(param_name,
self.name,
self.type + "_plugins",
self.dataset.getValue("name",
"solver",
"meta"))