"""
Module for defining a local executor that directly invokes the input python function.
This is a plugin executor module; it is loaded if found and properly structured.
"""
import os
from concurrent.futures import ProcessPoolExecutor
from typing import Any, Callable, Dict, List
from covalent._shared_files import TaskCancelledError, TaskRuntimeError, logger
from covalent.executor import BaseExecutor
from covalent.executor.utils.wrappers import io_wrapper
EXECUTOR_PLUGIN_NAME = "LocalExecutor"
app_log = logger.app_log
log_stack_info = logger.log_stack_info
_EXECUTOR_PLUGIN_DEFAULTS = {
"log_stdout": "stdout.log",
"log_stderr": "stderr.log",
"cache_dir": os.path.join(
os.environ.get("XDG_CACHE_HOME") or os.path.join(os.environ["HOME"], ".cache"), "covalent"
),
}
proc_pool = ProcessPoolExecutor()
class LocalExecutor(BaseExecutor):
"""
Local executor class that directly invokes the input function.
"""
def run(self, function: Callable, args: List, kwargs: Dict, task_metadata: Dict) -> Any:
"""
Execute the function locally
Arg(s)
function: Function to be executed
args: Arguments passed to the function
kwargs: Keyword arguments passed to the function
task_metadata: Metadata of the task to be executed
Return(s)
Task output
"""
app_log.debug(f"Running function {function} locally")
self.set_job_handle(42)
if self.get_cancel_requested():
app_log.debug("Task has been cancelled don't proceed")
raise TaskCancelledError
fut = proc_pool.submit(io_wrapper, function, args, kwargs)
output, worker_stdout, worker_stderr, tb = fut.result()
print(worker_stdout, end="", file=self.task_stdout)
print(worker_stderr, end="", file=self.task_stderr)
if tb:
print(tb, end="", file=self.task_stderr)
raise TaskRuntimeError(tb)
return output