Skip to content

Commit

Permalink
Fix pysat_diagnosis_metamodel (#60)
Browse files Browse the repository at this point in the history
* Fix typo

* Ignore .idea

* Add test feature models

* Add ConsistencyChecker and new version of PySATModel supporting diagnosis tasks

* Functions return a list instead of a CNF

* Add FastDiag and QuickXPlain

* Add documentation

* Migrate to new package

* Support Configuration

* Support Configuration

* Clean up

* improve diagnosis messages

* use assumptions in solver.solve()

* done diagnosis with assumptions and better result messages

* Add HSDAG

* Remove D

* Add documentation

* Fix bug

* Fix typo

* fix: refactoring to optimize code in execution. Only save names when needed for diagnosis

* fix: finishing the separation, still need to workout the method call

* fix: decouping diagnosis task from basic pysat operations

* Fix cracks appearing after decoupling diagnosis task from basic pysat operations.

* Fix errors/problems detected by prospector

* Fix errors/problems detected by mypy

---------

Co-authored-by: José A. Galindo <jagalindo@us.es>
  • Loading branch information
manleviet and jagalindo authored Oct 28, 2023
1 parent 1408946 commit 3d5dd43
Show file tree
Hide file tree
Showing 21 changed files with 573 additions and 491 deletions.
2 changes: 1 addition & 1 deletion .prospector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ test-warnings: true
doc-warnings: false

ignore-paths:
- env
- venv
- tests
- build

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List, Dict, Tuple
from typing import List, Dict

from flamapy.metamodels.configuration_metamodel.models import Configuration

from flamapy.metamodels.pysat_metamodel.models import PySATModel


Expand Down Expand Up @@ -32,27 +33,30 @@ def get_extension() -> str:

def __init__(self) -> None:
super().__init__()
self.C = None # set of constraints which could be faulty
self.B = None # background knowledge (i.e., the knowledge that is known to be true)

self.KB = None # set of all CNF with added assumptions
self.constraint_map: list[(str, list[list[int]])] = [] # map clauses to relationships/constraint

self.constraint_assumption_map = None

def add_clause_toMap(self, description: str, clauses: list[list[int]]) -> None:
self.constraint_map.append((description, clauses))

def get_C(self) -> list:
return self.C

def get_B(self) -> list:
return self.B

def get_KB(self) -> list:
return self.KB

def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str:
# set of constraints which could be faulty
self.set_c: List[int] = []
# background knowledge (i.e., the knowledge that is known to be true)
self.set_b: List[int] = []
# set of all CNF with added assumptions
self.set_kb: List[List[int]] = []
# map clauses to relationships/constraint
self.constraint_map: Dict[str, List[List[int]]] = {}
# map id of assumptions to relationships/constraint
self.constraint_assumption_map: Dict[int, str] = {}

def add_clause_to_map(self, description: str, clauses: List[List[int]]) -> None:
self.constraint_map[description] = clauses

def get_c(self) -> List[int]:
return self.set_c

def get_b(self) -> List[int]:
return self.set_b

def get_kb(self) -> List[List[int]]:
return self.set_kb

def get_pretty_diagnoses(self, assumptions: List[List[int]]) -> str:
diagnoses = []
for ass in assumptions:
diag = []
Expand All @@ -63,7 +67,8 @@ def get_pretty_diagnoses(self, assumptions: list[list[int]]) -> str:

return ','.join(diagnoses)

def prepare_diagnosis_task(self, configuration: Configuration = None, test_case: Configuration = None) -> None:
def prepare_diagnosis_task(self, configuration: Configuration = None,
test_case: Configuration = None) -> None:
"""
Execute this method after the model is built.
If a configuration is given:
Expand All @@ -83,19 +88,18 @@ def prepare_diagnosis_task(self, configuration: Configuration = None, test_case:
if configuration is not None:
# C = configuration
# B = {f0 = true} + CF (i.e., = PySATModel)
self.C, self.B, self.KB, self.constraint_assumption_map = \
self.prepare_assumptions(configuration=configuration)
self._prepare_assumptions(configuration=configuration)
else:
if test_case is None:
# Diagnosis the feature model
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true}
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions()
self._prepare_assumptions()
else:
# Diagnosis the error
# C = CF (i.e., = PySATModel - {f0 = true})
# B = {f0 = true} + test_case
self.C, self.B, self.KB, self.constraint_assumption_map = self.prepare_assumptions(test_case=test_case)
self._prepare_assumptions(test_case=test_case)

def prepare_redundancy_detection_task(self) -> None:
"""
Expand All @@ -105,76 +109,68 @@ def prepare_redundancy_detection_task(self) -> None:
B = {}
"""
# C = CF (i.e., = PySATModel - {f0 = true})
self.C = self.prepare_assumptions
self.B = [] # B = {}
# self.set_c = self._prepare_assumptions
# self.set_b = [] # B = {}
# ToDo: TBD

def prepare_assumptions(self, configuration: Configuration = None, test_case: Configuration = None) \
-> Tuple[List, List, List, Dict]:
assumption = []
KB = []
constraint_assumption_map = {}
def _prepare_assumptions(self, configuration: Configuration = None,
test_case: Configuration = None) -> None:
assumption: List[int] = []

id_assumption = len(self.variables) + 1
id_assumption = self.prepare_assumptions_for_KB(KB, assumption, constraint_assumption_map, id_assumption)
id_assumption = self._prepare_assumptions_for_kb(assumption, id_assumption)

start_id_configuration = len(assumption)
if configuration is not None:
constraint_assumption_map = {} # reset the map
id_assumption = self.prepare_assumptions_for_configuration(KB, assumption, configuration,
constraint_assumption_map,
id_assumption)
self.constraint_assumption_map = {} # reset the map
id_assumption = self._prepare_assumptions_for_configuration(assumption,
configuration,
id_assumption)

start_id_test_case = len(assumption)
if test_case is not None:
self.prepare_assumptions_for_configuration(KB, assumption, test_case,
constraint_assumption_map,
id_assumption)
self._prepare_assumptions_for_configuration(assumption, test_case,
id_assumption)

if configuration is not None:
B = assumption[:start_id_configuration]
C = assumption[start_id_configuration:]
self.set_b = assumption[:start_id_configuration]
self.set_c = assumption[start_id_configuration:]
else:
if test_case is not None:
B = [assumption[0]] + assumption[start_id_test_case:]
C = assumption[1:start_id_test_case]
self.set_b = [assumption[0]] + assumption[start_id_test_case:]
self.set_c = assumption[1:start_id_test_case]
else:
B = [assumption[0]]
C = assumption[1:]

return C, B, KB, constraint_assumption_map
self.set_b = [assumption[0]]
self.set_c = assumption[1:]

def prepare_assumptions_for_KB(self, KB, assumption, constraint_assumption_map, id_assumption):
c_map = self.constraint_map
def _prepare_assumptions_for_kb(self, assumption: List[int], id_assumption: int) -> int:
cstr_map = self.constraint_map
# loop through all tuples in the constraint map
for i in range(len(c_map)):
# get description
desc = c_map[i][0]
for _, key in enumerate(cstr_map):
# get clauses
clauses = c_map[i][1]
clauses = cstr_map[key]
# loop through all variables in the constraint
for j in range(len(clauses)):
# get each clause
clause = clauses[j]
for _, clause in enumerate(clauses):
# add the assumption variable to the clause
# assumption => clause
# i.e., -assumption v clause
clause.append(-1 * id_assumption)

assumption.append(id_assumption)
KB.extend(clauses)
constraint_assumption_map[id_assumption] = desc
self.set_kb.extend(clauses)
self.constraint_assumption_map[id_assumption] = key

id_assumption += 1

return id_assumption

def prepare_assumptions_for_configuration(self, KB, assumption, configuration, constraint_assumption_map,
id_assumption):
def _prepare_assumptions_for_configuration(self, assumption: List[int],
configuration: Configuration,
id_assumption: int) -> int:
config = [feat.name for feat in configuration.elements]
for feat in config:
if feat not in self.variables.keys():
raise Exception(f'Feature {feat} is not in the model.')
raise KeyError(f'Feature {feat} is not in the model.')

for feat in configuration.elements.items():
desc = ''
Expand All @@ -188,8 +184,8 @@ def prepare_assumptions_for_configuration(self, KB, assumption, configuration, c
clause = [-1 * self.variables[feat[0].name], -1 * id_assumption]

assumption.append(id_assumption)
KB.append(clause)
constraint_assumption_map[id_assumption] = desc
self.set_kb.append(clause)
self.constraint_assumption_map[id_assumption] = desc

id_assumption += 1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .glucose3_abstract_identifier import Glucose3AbstractIdentifier
from .glucose3_conflict import Glucose3Conflict
from .glucose3_diagnosis import Glucose3Diagnosis


__all__ = [
'Glucose3AbstractIdentifier',
'Glucose3Diagnosis',
'Glucose3Conflict'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .checker import ConsistencyChecker
from .fastdiag import FastDiag
from .quickxplain import QuickXPlain

__all__ = [
"ConsistencyChecker",
"FastDiag",
"QuickXPlain"
]
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,29 @@
A Java version of this implementation is available at:
https://github.com/HiConfiT/hiconfit-core/blob/main/ca-cdr-package/src/main/java/at/tugraz/ist/ase/cacdr/checker/ChocoConsistencyChecker.java
"""
from typing import List

from pysat.solvers import Solver


class ConsistencyChecker:

def __init__(self, solverName: str, KB: list) -> None:
self.solver = None

def __init__(self, solver_name: str, set_kb: List[List[int]]) -> None:
self.result = False

self.solver = Solver(solverName, bootstrap_with=KB)
self.solver = Solver(solver_name, bootstrap_with=set_kb)

def is_consistent(self, C: list, Δ: list) -> bool:
def is_consistent(self, set_c: List[int], delta: List[int]) -> bool:
"""
Check if the given CNF formula is consistent using a solver.
:param C: a list of assumptions should be added to the CNF formula
:param Δ: a list of assumptions should not be added to the CNF formula
:param set_c: a list of assumptions should be added to the CNF formula
:param delta: a list of assumptions should not be added to the CNF formula
:return: a boolean value indicating whether the given CNF formula is consistent
"""
assumptions = C + [-1 * item for item in Δ]
assumptions = set_c + [-1 * item for item in delta]
self.result = self.solver.solve(assumptions=assumptions)
# print(f"assumptions: {assumptions} - result: {self.result}")
return self.result

def delete(self):
self.solver.delete()
def delete(self) -> None:
self.solver.delete()
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"""

import logging
from typing import List

from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.utils import split, diff
from flamapy.metamodels.pysat_diagnosis_metamodel.operations.diagnosis.checker import ConsistencyChecker
from .checker import ConsistencyChecker
from .utils import split, diff


class FastDiag:
Expand All @@ -20,35 +21,36 @@ class FastDiag:
def __init__(self, checker: ConsistencyChecker) -> None:
self.checker = checker

def findDiagnosis(self, C: list, B: list) -> list:
def find_diagnosis(self, set_c: List[int], set_b: List[int]) -> List[int]:
"""
Activate FastDiag algorithm if there exists at least one constraint,
which induces an inconsistency in B. Otherwise, it returns an empty set.
// Func FastDiag(C, B) : Δ
// if isEmpty(C) or consistent(B U C) return Φ
// else return C \\ FD(Φ, C, B)
:param C: a consideration set of constraints
:param B: a background knowledge
:param set_c: a consideration set of constraints
:param set_b: a background knowledge
:return: a diagnosis or an empty set
"""
logging.debug(f'fastDiag [C={C}, B={B}]')
logging.debug('fastDiag [C=%s, B=%s]', set_c, set_b)
# print(f'fastDiag [C={C}, B={B}]')

# if isEmpty(C) or consistent(B U C) return Φ
if len(C) == 0 or self.checker.is_consistent(B + C, []):
if len(set_c) == 0 or self.checker.is_consistent(set_b + set_c, []):
logging.debug('return Φ')
# print('return Φ')
return []
else: # return C \ FD(C, B, Φ)
mss = self.fd([], C, B)
diag = diff(C, mss)

logging.debug(f'return {diag}')
# print(f'return {diag}')
return diag
# return C \ FD(C, B, Φ)
mss = self._fd([], set_c, set_b)
diag = diff(set_c, mss)

def fd(self, Δ: list, C: list, B: list) -> list:
logging.debug('return %s', diag)
# print(f'return {diag}')
return diag

def _fd(self, delta: List[int], set_c: List[int], set_b: List[int]) -> List[int]:
"""
The implementation of MSS-based FastDiag algorithm.
The algorithm determines a maximal satisfiable subset MSS (Γ) of C U B.
Expand All @@ -61,33 +63,33 @@ def fd(self, Δ: list, C: list, B: list) -> list:
// Δ1 = FD(C2, C1, B);
// Δ2 = FD(C1 - Δ1, C2, B U Δ1);
// return Δ1 ∪ Δ2;
:param Δ: check to skip redundant consistency checks
:param C: a consideration set of constraints
:param B: a background knowledge
:param delta: check to skip redundant consistency checks
:param set_c: a consideration set of constraints
:param set_b: a background knowledge
:return: a maximal satisfiable subset MSS of C U B
"""
logging.debug(f'>>> FD [Δ={Δ}, C={C}, B={B}]')
logging.debug('>>> FD [Δ=%s, C=%s, B=%s]', delta, set_c, set_b)

# if Δ != Φ and consistent(B U C) return C;
if len(Δ) != 0 and self.checker.is_consistent(B + C, Δ):
logging.debug(f'<<< return {C}')
return C
if len(delta) != 0 and self.checker.is_consistent(set_b + set_c, delta):
logging.debug('<<< return %s', set_c)
return set_c

# if singleton(C) return Φ;
if len(C) == 1:
if len(set_c) == 1:
logging.debug('<<< return Φ')
return []

# C1 = {c1..ck}; C2 = {ck+1..cn};
C1, C2 = split(C)
set_c1, set_c2 = split(set_c)

# Δ1 = FD(C2, C1, B);
Δ1 = self.fd(C2, C1, B)
delta1 = self._fd(set_c2, set_c1, set_b)
# Δ2 = FD(C1 - Δ1, C2, B U Δ1);
C1withoutΔ1 = diff(C1, Δ1)
Δ2 = self.fd(C1withoutΔ1, C2, B + Δ1)
c1_without_delta1 = diff(set_c1, delta1)
delta2 = self._fd(c1_without_delta1, set_c2, set_b + delta1)

logging.debug('<<< return [Δ1={Δ1} ∪ Δ2={Δ2}]')

# return Δ1 + Δ2
return Δ1 + Δ2
return delta1 + delta2
Loading

0 comments on commit 3d5dd43

Please sign in to comment.