summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/local.py
blob: 04b9ecdcb8e6e89ba627e930493dbbc1255cd08f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Local execution of operations.
"""

import os
import subprocess
import threading
import StringIO

from . import ctx_proxy
from . import exceptions
from . import common
from . import constants
from . import environment_globals
from . import python_script_scope


def run_script(ctx, script_path, process, **kwargs):
    if not script_path:
        ctx.task.abort('Missing script_path')
    process = process or {}
    script_path = common.download_script(ctx, script_path)
    script_func = _get_run_script_func(script_path, process)
    return script_func(
        ctx=ctx,
        script_path=script_path,
        process=process,
        operation_kwargs=kwargs)


def _get_run_script_func(script_path, process):
    if _treat_script_as_python_script(script_path, process):
        return _eval_script_func
    else:
        if _treat_script_as_powershell_script(script_path):
            process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE)
        return _execute_func


def _treat_script_as_python_script(script_path, process):
    eval_python = process.get('eval_python')
    script_extension = os.path.splitext(script_path)[1].lower()
    return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and
                                    eval_python is not False))


def _treat_script_as_powershell_script(script_path):
    script_extension = os.path.splitext(script_path)[1].lower()
    return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION


def _eval_script_func(script_path, ctx, operation_kwargs, **_):
    with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs):
        execfile(script_path, environment_globals.create_initial_globals(script_path))


def _execute_func(script_path, ctx, process, operation_kwargs):
    os.chmod(script_path, 0755)
    process = common.create_process_config(
        script_path=script_path,
        process=process,
        operation_kwargs=operation_kwargs)
    command = process['command']
    env = os.environ.copy()
    env.update(process['env'])
    ctx.logger.info('Executing: {0}'.format(command))
    with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy:
        env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
        running_process = subprocess.Popen(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            env=env,
            cwd=process.get('cwd'),
            bufsize=1,
            close_fds=not common.is_windows())
        stdout_consumer = _OutputConsumer(running_process.stdout)
        stderr_consumer = _OutputConsumer(running_process.stderr)
        exit_code = running_process.wait()
    stdout_consumer.join()
    stderr_consumer.join()
    ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command))

    def error_check_func():
        if exit_code:
            raise exceptions.ProcessException(
                command=command,
                exit_code=exit_code,
                stdout=stdout_consumer.read_output(),
                stderr=stderr_consumer.read_output())
    return common.check_error(ctx, error_check_func=error_check_func)


class _OutputConsumer(object):

    def __init__(self, out):
        self._out = out
        self._buffer = StringIO.StringIO()
        self._consumer = threading.Thread(target=self._consume_output)
        self._consumer.daemon = True
        self._consumer.start()

    def _consume_output(self):
        for line in iter(self._out.readline, b''):
            self._buffer.write(line)
        self._out.close()

    def read_output(self):
        return self._buffer.getvalue()

    def join(self):
        self._consumer.join()