Source code for machetli.sas.generators

import copy
import itertools
import random

from machetli.sas.constants import KEY_IN_STATE
from machetli.sas.sas_tasks import SASTask, SASMutexGroup, SASInit, SASGoal, \
    SASOperator, SASAxiom
from machetli.successors import Successor, SuccessorGenerator, RNG


[docs]class RemoveOperators(SuccessorGenerator): """ For each operator, generate a successor where this operator is removed. The order of the successors is randomized. """
[docs] def get_description(self): return "Tries to remove individual operators."
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] operator_names = [op.name for op in task.operators] RNG.shuffle(operator_names) for name in operator_names: child_state = copy.deepcopy(state) pre_child_task = child_state[KEY_IN_STATE] child_state[KEY_IN_STATE] = self.transform(pre_child_task, name) yield Successor(child_state, f"Removed operator '{name}'. Remaining operators: {len(operator_names) - 1}")
[docs] def transform(self, task, op_name): new_operators = [op for op in task.operators if not op.name == op_name] return SASTask(task.variables, task.mutexes, task.init, task.goal, new_operators, task.axioms, task.metric)
[docs]class RemoveVariables(SuccessorGenerator): """ For each variable, generate a successor where this variable is compiled away by removing it from the initial state as well as every place where it is mentioned in the prevail condition, effect condition, effect fact, or goal. The order of the successors is randomized. """
[docs] def get_description(self): return "Tries to project away individual variables."
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] variables = [var for var in range(len(task.variables.axiom_layers))] RNG.shuffle(variables) for var in variables: child_state = copy.deepcopy(state) pre_child_task = child_state[KEY_IN_STATE] child_state[KEY_IN_STATE] = self.transform(pre_child_task, var) yield Successor(child_state, f"Removed a variable. Remaining variables: {len(variables) - 1}")
[docs] def transform(self, task, var): # remove var attributes from variables object new_variables = task.variables del new_variables.axiom_layers[var] del new_variables.ranges[var] del new_variables.value_names[var] # remove var from from mutex groups new_mutexes = [] for group in task.mutexes: new_facts = [] for fact in group.facts: if fact[0] == var: continue if fact[0] > var: variable_index, value = fact variable_index = variable_index - 1 # decrement variable indices above var fact = (variable_index, value) new_facts.append(fact) new_mutexes.append(SASMutexGroup(new_facts)) # remove var from init new_init = SASInit(task.init.values) del new_init.values[var] # remove var from goal pairs and decrement higher indices than var new_goal_pairs = [] for pair in task.goal.pairs: if pair[0] == var: continue if pair[0] > var: variable_index, value = pair variable_index = variable_index - 1 # decrement variable indices above var pair = (variable_index, value) new_goal_pairs.append(pair) new_goal = SASGoal(new_goal_pairs) # remove var from operators new_operators = [] for op in task.operators: new_prevail = [] for pre in op.prevail: if pre[0] == var: continue if pre[0] > var: variable_index, value = pre variable_index = variable_index - 1 # decrement variable indices above var pre = (variable_index, value) new_prevail.append(pre) new_effects = [] for eff in op.pre_post: v, pre, post, cond = eff if v == var: continue if v > var: v = v - 1 # decrement variable indices above var new_cond = [] for precondition in cond: if precondition[0] == var: continue if precondition[0] > var: variable_index, value = precondition variable_index = variable_index - 1 # decrement variable indices above var precondition = (variable_index, value) new_cond.append(precondition) new_effects.append((v, pre, post, new_cond)) if not new_effects: continue new_operators.append(SASOperator(op.name, new_prevail, new_effects, op.cost)) # remove var from condition and effect of axioms new_axioms = [] for ax in task.axioms: if ax.effect[0] == var: continue if ax.effect[0] > var: variable_index, value = ax.effect variable_index = variable_index - 1 # decrement variable indices above var ax.effect = (variable_index, value) new_condition = [] for cond in ax.condition: if cond[0] == var: continue if cond[0] > var: variable_index, value = cond variable_index = variable_index - 1 # decrement variable indices above var cond = (variable_index, value) new_condition.append(cond) # axiom condition may also be empty new_axioms.append(SASAxiom(new_condition, ax.effect)) return SASTask(new_variables, new_mutexes, new_init, new_goal, new_operators, new_axioms, task.metric)
[docs]class RemovePrePosts(SuccessorGenerator): """ For each precondition/effect pair in each operator, generate a successor where this pair is removed. This essentially ignores the variable in the operator. The order in which operators are considered is randomized, as is the order of precondition/effect pairs of the same operator, but all successors stemming from the same operator follow consecutively. """
[docs] def get_description(self): return ("Tries to remove individual precondition/effect pairs in " "operators. This ignores a variable in an operators.")
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] num_ops = len(task.operators) for op in RNG.sample(range(num_ops), num_ops): num_eff = len(task.operators[op].pre_post) for effect in RNG.sample(range(num_eff), num_eff): child_state = copy.deepcopy(state) del child_state[KEY_IN_STATE].operators[op].pre_post[effect] yield Successor(child_state, f"Removed an effect of operator '{task.operators[op].name}'.")
[docs]class SetUnspecifiedPreconditions(SuccessorGenerator): """ For each operator and each variable on which this operator has an effect but no precondition, and for each possible value of this variable, generate a successor with an additional precondition on the variable. This limits the situations where the operator can be applied, potentially limiting branching in the search. The order in which operators are considered is randomized, as is the order of effects of the same operator, but all successors stemming from the same operator follow consecutively. """
[docs] def get_description(self): return ("Tries to add preconditions to an operator, which have an " "effect but no precondition on a variable.")
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] num_ops = len(task.operators) for op in RNG.sample(range(num_ops), num_ops): num_eff = len(task.operators[op].pre_post) for effect in RNG.sample(range(num_eff), num_eff): var, pre, post, cond = task.operators[op].pre_post[effect] if pre == -1: num_val = task.variables.ranges[var] for val in RNG.sample(range(num_val), num_val): child_state = copy.deepcopy(state) child_state[KEY_IN_STATE].operators[op].pre_post[ effect] = (var, val, post, cond) yield Successor( child_state, f"Removed a prevail condition of operator '{task.operators[op].name}'.")
[docs]class MergeOperators(SuccessorGenerator): """ For each pair of operators, generate a successor where these two operators are merged into one. Specifically, these operators are removed and instead a new operator is added that is equivalent to executing the two operators in sequence. Cases where this is not possible (e.g., with conflicting prevail conditions) are skipped. """
[docs] def get_description(self): return ("Tries to replace a pair of operators with a single operator " "equivalent to applying both in sequence.")
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] for op1, op2 in itertools.permutations(task.operators, 2): child_state = copy.deepcopy(state) child_task = self.transform(child_state[KEY_IN_STATE], op1, op2) if child_task: child_state[KEY_IN_STATE] = child_task yield Successor(child_state, f"Merged operators '{op1.name}' and '{op2.name}'. " + f"Remaining operators: {len(task.operators) - 1}")
[docs] def transform(self, task, op1, op2): def combined_pre_post(op): combined_pre, combined_post = {}, {} for var, value in op.prevail: combined_pre[var] = value combined_post[var] = value for var, pre, post, cond in op.pre_post: if cond: raise NotImplementedError("Conditional effects not yet supported.") if pre != -1: combined_pre[var] = pre combined_post[var] = post return combined_pre, combined_post pre1, post1 = combined_pre_post(op1) pre2, post2 = combined_pre_post(op2) # Check that op2 is applicable and update preconditions merged_pre = pre1 for var, pre in pre2.items(): if var not in post1: merged_pre[var] = pre elif post1[var] != pre: # Operators not compatible return None # update effects merged_post = post1 merged_post.update(post2) merged_prevail = [] merged_pre_post = [] for var, post in merged_post.items(): pre = merged_pre.get(var, -1) if pre == post: merged_prevail.append((var, pre)) else: merged_pre_post.append((var, pre, post, [])) merged_name = op1.name + " and then " + op2.name merged_cost = op1.cost + op2.cost merged_op = SASOperator(merged_name, merged_prevail, merged_pre_post, merged_cost) new_operators = [op for op in task.operators if op.name not in [op1.name, op2.name]] + [merged_op] return SASTask(task.variables, task.mutexes, task.init, task.goal, new_operators, task.axioms, task.metric)
[docs]class RemoveGoals(SuccessorGenerator): """ For each goal condition, generate a successor where this goal condition is removed. The order of the successors is randomized """
[docs] def get_description(self): return "Tries to remove goal conditions."
[docs] def get_successors(self, state): task = state[KEY_IN_STATE] num_goals = len(task.goal.pairs) for goal_id in RNG.sample(range(num_goals), num_goals): child_state = copy.deepcopy(state) del child_state[KEY_IN_STATE].goal.pairs[goal_id] yield Successor(child_state, f"Removed a goal. Remaining goals: {num_goals - 1}")