Source code for machetli.pddl.files

import logging
from pathlib import Path
from pickle import PickleError
import sys
from typing import Union

from machetli.pddl.constants import KEY_IN_STATE
from machetli.pddl.downward import pddl_parser
from machetli.pddl.downward.pddl import Truth
from machetli.pddl.downward.pddl.conditions import ConstantCondition, Atom

from machetli import tools
from machetli.evaluator import EXIT_CODE_CRITICAL, EXIT_CODE_BEHAVIOR_PRESENT, \
    EXIT_CODE_BEHAVIOR_NOT_PRESENT

SIN = " "  # single indentation
DIN = "  "  # double indentation


def find_domain_path(task_path: Path):
    """
    Find domain path for the given task using automatic naming rules.
    """
    domain_basenames = [
        "domain.pddl",
        task_path.stem + "-domain" + task_path.suffix,
        task_path.name[:3] + "-domain.pddl",  # for airport
        "domain_" + task_path.name,
        "domain-" + task_path.name,
    ]

    for domain_basename in domain_basenames:
        domain_path = task_path.parent / domain_basename
        if domain_path.exists():
            return domain_path


[docs]def generate_initial_state(domain_path: Union[Path, str], task_path: Union[Path, str]) -> dict: """ Parse the PDDL task defined in the given PDDL files. :return: a dictionary pointing to the task specified in the files. """ return { KEY_IN_STATE: pddl_parser.open(domain_filename=domain_path, task_filename=task_path) }
def _run_evaluator_on_pddl_files(evaluate, domain_filename, task_filename): """ Run the given function *evaluate* and exit with the appropriate exit code. If the function returns ``True``, use :attr:`EXIT_CODE_BEHAVIOR_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_PRESENT>`, otherwise use :attr:`EXIT_CODE_BEHAVIOR_NOT_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_NOT_PRESENT>`. :param evaluate: is a function taking filenames of a PDDL domain and problem file as input and returning ``True`` if the specified behavior occurs for the given instance, and ``False`` if it doesn't. Other ways of exiting the function (exceptions, ``sys.exit`` with exit codes other than :attr:`EXIT_CODE_BEHAVIOR_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_PRESENT>` or :attr:`EXIT_CODE_BEHAVIOR_NOT_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_NOT_PRESENT>`) are treated as failed evaluations by the search. :param domain_filename: is the filename of a PDDL domain file. :param task_filename: is the filename of a PDDL problem file. """ if evaluate(domain_filename, task_filename): sys.exit(EXIT_CODE_BEHAVIOR_PRESENT) else: sys.exit(EXIT_CODE_BEHAVIOR_NOT_PRESENT)
[docs]def run_evaluator(evaluate): """ Load the state passed to the script via its command line arguments, then run the given function *evaluate* on the domain and problem encoded in the state, and exit the program with the appropriate exit code. If the function returns ``True``, use :attr:`EXIT_CODE_BEHAVIOR_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_PRESENT>`, otherwise use :attr:`EXIT_CODE_NOT_BEHAVIOR_PRESENT<machetli.evaluator.EXIT_CODE_NOT_BEHAVIOR_PRESENT>`. In addition to running the evaluator, this function creates the PDDL files as 'domain.pddl' and 'problem.pddl' in the current directory. This function is meant to be used as the main function of an evaluator script. Instead of a path to the state, the command line arguments can also be paths to a PDDL domain and problem (where the domain can be omitted if it can be found with automated naming rules). This is meant for testing and debugging the evaluator directly on PDDL input. :param evaluate: is a function taking filenames of a PDDL domain and problem file as input and returning ``True`` if the specified behavior occurs for the given instance, and ``False`` if it doesn't. Other ways of exiting the function (exceptions, ``sys.exit`` with exit codes other than :attr:`EXIT_CODE_BEHAVIOR_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_PRESENT>` or :attr:`EXIT_CODE_BEHAVIOR_NOT_PRESENT<machetli.evaluator.EXIT_CODE_BEHAVIOR_NOT_PRESENT>`) are treated as failed evaluations by the search. """ filenames = sys.argv[1:] if len(filenames) == 1: try: state = tools.read_state(filenames[0]) write_files(state, "domain.pddl", "problem.pddl") _run_evaluator_on_pddl_files(evaluate, "domain.pddl", "problem.pddl") except (FileNotFoundError, PickleError): task_path = Path(filenames[0]) domain_path = find_domain_path(task_path) if domain_path is None: logging.critical( "Error: Could not find domain file using automatic naming rules.") _run_evaluator_on_pddl_files(evaluate, domain_path, task_path) elif len(filenames) == 2: domain_filename, task_filename = filenames _run_evaluator_on_pddl_files(evaluate, domain_filename, task_filename) else: logging.critical( "Error: evaluator has to be called with either a path to a pickled " "state, a task filename, or a domain filename followed by a task " "filename.") sys.exit(EXIT_CODE_CRITICAL)
def _write_domain_header(task, file): file.write("define (domain {})\n".format(task.domain_name)) def _write_domain_requirements(task, file): if len(task.requirements.requirements) != 0: file.write(SIN + "(:requirements") for req in task.requirements.requirements: file.write(" " + req) file.write(")\n") def _write_domain_types(task, file): if task.types: file.write(SIN + "(:types\n") types_dict = {} for tp in task.types: # build dictionary of base types and types if tp.basetype_name is not None: if tp.basetype_name not in types_dict: types_dict[tp.basetype_name] = [tp.name] else: types_dict[tp.basetype_name].append(tp.name) for basetype in types_dict: file.write(SIN + DIN) for name in types_dict[basetype]: file.write(name + " ") file.write("- " + basetype + "\n") file.write(SIN + ")\n") def _write_domain_objects(task, file): if task.objects: # all objects from planning task are going to be written into constants file.write(SIN + "(:constants\n") objects_dict = {} for obj in task.objects: # build dictionary of object type names and object names if obj.type_name not in objects_dict: objects_dict[obj.type_name] = [obj.name] else: objects_dict[obj.type_name].append(obj.name) for type_name in objects_dict: file.write(SIN + DIN) for name in objects_dict[type_name]: file.write(name + " ") file.write("- " + type_name + "\n") file.write(SIN + ")\n") def _write_domain_predicates(task, file): if len(task.predicates) != 0: file.write(SIN + "(:predicates\n") for pred in task.predicates: if pred.name == "=": continue types_dict = {} for arg in pred.arguments: if arg.type_name not in types_dict: types_dict[arg.type_name] = [arg.name] else: types_dict[arg.type_name].append(arg.name) file.write(SIN + SIN + "(" + pred.name) for obj in types_dict: for name in types_dict[obj]: file.write(" " + name) file.write(" - " + obj) file.write(")\n") file.write(SIN + ")\n") def _write_domain_functions(task, file): if task.functions: file.write(SIN + "(:functions\n") for function in task.functions: function.dump_pddl(file, DIN) file.write(SIN + ")\n") def _write_domain_actions(task, file): for action in task.actions: file.write(SIN + "(:action {}\n".format(action.name)) file.write(DIN + ":parameters (") if action.parameters: for par in action.parameters: file.write("%s - %s " % (par.name, par.type_name)) file.write(")\n") file.write(SIN + SIN + ":precondition\n") if not isinstance(action.precondition, Truth): action.precondition.dump_pddl(file, DIN) file.write(DIN + ":effect\n") file.write(DIN + "(and\n") for eff in action.effects: eff.dump_pddl(file, DIN) if action.cost: action.cost.dump_pddl(file, DIN + DIN) file.write(DIN + ")\n") file.write(SIN + ")\n") def _write_domain_axioms(task, file): for axiom in task.axioms: file.write(SIN + "(:derived ({} ".format(axiom.name)) for par in axiom.parameters: file.write("%s - %s " % (par.name, par.type_name)) file.write(")\n") axiom.condition.dump_pddl(file, DIN) file.write(SIN + ")\n") def _write_domain(task, path: Path): with path.open("w") as file: file.write("\n(") _write_domain_header(task, file) _write_domain_requirements(task, file) _write_domain_types(task, file) _write_domain_objects(task, file) _write_domain_predicates(task, file) _write_domain_functions(task, file) _write_domain_axioms(task, file) _write_domain_actions(task, file) file.write(")\n") def _write_problem_header(task, file): file.write("define (problem {})\n".format(task.task_name)) def _write_problem_domain(task, file): file.write(SIN + "(:domain {})\n".format(task.domain_name)) def _write_problem_init(task, file): file.write(SIN + "(:init\n") for elem in task.init: if isinstance(elem, Atom) and elem.predicate == "=": continue elem.dump_pddl(file, SIN + DIN) file.write(SIN + ")\n") def _write_problem_goal(task, file): file.write(SIN + "(:goal\n") if not isinstance(task.goal, ConstantCondition): task.goal.dump_pddl(file, SIN + DIN) file.write("%s)\n" % SIN) def _write_problem_metric(task, file): if task.use_min_cost_metric: file.write("%s(:metric minimize (total-cost))\n" % SIN) def _write_problem(task, path: Path): with path.open("w") as file: file.write("\n(") _write_problem_header(task, file) _write_problem_domain(task, file) _write_problem_init(task, file) _write_problem_goal(task, file) _write_problem_metric(task, file) file.write(")\n")
[docs]def write_files(state: dict, domain_path: Union[Path, str], problem_path: Union[Path, str]): """ Write the domain and problem files represented in `state` to disk. """ _write_domain(state[KEY_IN_STATE], Path(domain_path)) _write_problem(state[KEY_IN_STATE], Path(problem_path))