База знаний
public
Apr 27, 2025
Never
37
1 import sqlite3 2 from collections import defaultdict 3 4 class DatabaseHandler: 5 def __init__(self, db_name='knowledge_base.db'): 6 self.conn = sqlite3.connect(db_name) 7 self.create_tables() 8 9 def create_tables(self): 10 cursor = self.conn.cursor() 11 cursor.execute(''' 12 CREATE TABLE IF NOT EXISTS subsumption_axioms ( 13 id INTEGER PRIMARY KEY, 14 left_concept TEXT, 15 right_concept TEXT 16 ) 17 ''') 18 cursor.execute(''' 19 CREATE TABLE IF NOT EXISTS conjunction_axioms ( 20 id INTEGER PRIMARY KEY, 21 left_concept1 TEXT, 22 left_concept2 TEXT, 23 right_concept TEXT 24 ) 25 ''') 26 cursor.execute(''' 27 CREATE TABLE IF NOT EXISTS existential_axioms ( 28 id INTEGER PRIMARY KEY, 29 left_concept TEXT, 30 role TEXT, 31 filler_concept TEXT 32 ) 33 ''') 34 cursor.execute(''' 35 CREATE TABLE IF NOT EXISTS existential_left_axioms ( 36 id INTEGER PRIMARY KEY, 37 role TEXT, 38 filler_concept TEXT, 39 right_concept TEXT 40 ) 41 ''') 42 cursor.execute(''' 43 CREATE TABLE IF NOT EXISTS individuals ( 44 id INTEGER PRIMARY KEY, 45 name TEXT UNIQUE 46 ) 47 ''') 48 cursor.execute(''' 49 CREATE TABLE IF NOT EXISTS concept_assertions ( 50 individual_id INTEGER, 51 concept TEXT, 52 FOREIGN KEY(individual_id) REFERENCES individuals(id) 53 ) 54 ''') 55 cursor.execute(''' 56 CREATE TABLE IF NOT EXISTS role_assertions ( 57 individual_id INTEGER, 58 role TEXT, 59 target_id INTEGER, 60 FOREIGN KEY(individual_id) REFERENCES individuals(id), 61 FOREIGN KEY(target_id) REFERENCES individuals(id) 62 ) 63 ''') 64 self.conn.commit() 65 66 def save_tbox(self, tbox_manager): 67 cursor = self.conn.cursor() 68 cursor.execute('DELETE FROM subsumption_axioms') 69 cursor.execute('DELETE FROM conjunction_axioms') 70 cursor.execute('DELETE FROM existential_axioms') 71 cursor.execute('DELETE FROM existential_left_axioms') 72 for (left, right) in tbox_manager.subsumption_axioms: 73 cursor.execute('INSERT INTO subsumption_axioms (left_concept, right_concept) VALUES (?, ?)', (left, right)) 74 for (c1, c2, d) in tbox_manager.conjunction_axioms: 75 cursor.execute('INSERT INTO conjunction_axioms (left_concept1, left_concept2, right_concept) VALUES (?, ?, ?)', (c1, c2, d)) 76 for (c, r, f) in tbox_manager.existential_axioms: 77 cursor.execute('INSERT INTO existential_axioms (left_concept, role, filler_concept) VALUES (?, ?, ?)', (c, r, f)) 78 for (r, f, d) in tbox_manager.existential_left_axioms: 79 cursor.execute('INSERT INTO existential_left_axioms (role, filler_concept, right_concept) VALUES (?, ?, ?)', (r, f, d)) 80 self.conn.commit() 81 82 def load_tbox(self, tbox_manager): 83 cursor = self.conn.cursor() 84 cursor.execute('SELECT left_concept, right_concept FROM subsumption_axioms') 85 tbox_manager.subsumption_axioms = [tuple(row) for row in cursor.fetchall()] 86 cursor.execute('SELECT left_concept1, left_concept2, right_concept FROM conjunction_axioms') 87 tbox_manager.conjunction_axioms = [tuple(row) for row in cursor.fetchall()] 88 cursor.execute('SELECT left_concept, role, filler_concept FROM existential_axioms') 89 tbox_manager.existential_axioms = [tuple(row) for row in cursor.fetchall()] 90 cursor.execute('SELECT role, filler_concept, right_concept FROM existential_left_axioms') 91 tbox_manager.existential_left_axioms = [tuple(row) for row in cursor.fetchall()] 92 tbox_manager.compute_closure() 93 94 def save_abox(self, abox_manager): 95 cursor = self.conn.cursor() 96 cursor.execute('DELETE FROM individuals') 97 cursor.execute('DELETE FROM concept_assertions') 98 cursor.execute('DELETE FROM role_assertions') 99 for name, ind in abox_manager.individuals.items(): 100 cursor.execute('INSERT INTO individuals (id, name) VALUES (?, ?)', (ind.id, name)) 101 for name in abox_manager.concept_assertions: 102 ind = abox_manager.individuals[name] 103 for concept in abox_manager.concept_assertions[name]: 104 cursor.execute('INSERT INTO concept_assertions (individual_id, concept) VALUES (?, ?)', (ind.id, concept)) 105 for name in abox_manager.role_assertions: 106 ind = abox_manager.individuals[name] 107 for role, targets in abox_manager.role_assertions[name].items(): 108 for target in targets: 109 tgt_ind = abox_manager.individuals[target] 110 cursor.execute('INSERT INTO role_assertions (individual_id, role, target_id) VALUES (?, ?, ?)', (ind.id, role, tgt_ind.id)) 111 self.conn.commit() 112 113 def load_abox(self, abox_manager): 114 cursor = self.conn.cursor() 115 cursor.execute('SELECT id, name FROM individuals') 116 abox_manager.individuals = {row[1]: Individual(row[1], row[0]) for row in cursor.fetchall()} 117 abox_manager.next_id = max([ind.id for ind in abox_manager.individuals.values()] or [0]) + 1 118 cursor.execute('SELECT individual_id, concept FROM concept_assertions') 119 concept_assertions = defaultdict(set) 120 for row in cursor.fetchall(): 121 ind = next((name for name, i in abox_manager.individuals.items() if i.id == row[0]), None) 122 if ind: 123 concept_assertions[ind].add(row[1]) 124 abox_manager.concept_assertions = concept_assertions 125 cursor.execute('SELECT individual_id, role, target_id FROM role_assertions') 126 role_assertions = defaultdict(lambda: defaultdict(set)) 127 for row in cursor.fetchall(): 128 ind = next((name for name, i in abox_manager.individuals.items() if i.id == row[0]), None) 129 target = next((name for name, i in abox_manager.individuals.items() if i.id == row[2]), None) 130 if ind and target: 131 role_assertions[ind][row[1]].add(target) 132 abox_manager.role_assertions = role_assertions 133 134 class TBoxManager: 135 def __init__(self): 136 self.subsumption_axioms = [] 137 self.conjunction_axioms = [] 138 self.existential_axioms = [] 139 self.existential_left_axioms = [] 140 self.subsumptions = defaultdict(set) 141 142 def compute_closure(self): 143 concepts = set() 144 for C, D in self.subsumption_axioms: 145 concepts.update([C, D]) 146 for C1, C2, D in self.conjunction_axioms: 147 concepts.update([C1, C2, D]) 148 for C, R, D in self.existential_axioms: 149 concepts.update([C, D]) 150 for R, C, D in self.existential_left_axioms: 151 concepts.add(D) 152 for C in concepts: 153 self.subsumptions[C].update([C, 'Top']) 154 changed = True 155 while changed: 156 changed = False 157 for C, D in self.subsumption_axioms: 158 if D not in self.subsumptions[C]: 159 self.subsumptions[C].add(D) 160 changed = True 161 for C, R, D in self.existential_axioms: 162 expr = f"exists {R}.{D}" 163 if expr not in self.subsumptions[C]: 164 self.subsumptions[C].add(expr) 165 changed = True 166 for e in list(self.subsumptions[C]): 167 if e.startswith(f"exists {R}."): 168 filler = e.split('.')[1] 169 if filler not in self.subsumptions[D]: 170 self.subsumptions[D].add(filler) 171 changed = True 172 for C1, C2, D in self.conjunction_axioms: 173 for concept in list(self.subsumptions.keys()): 174 if C1 in self.subsumptions[concept] and C2 in self.subsumptions[concept]: 175 if D not in self.subsumptions[concept]: 176 self.subsumptions[concept].add(D) 177 changed = True 178 for R, C, D in self.existential_left_axioms: 179 expr = f"exists {R}.{C}" 180 for concept in list(self.subsumptions.keys()): 181 if expr in self.subsumptions[concept] and D not in self.subsumptions[concept]: 182 self.subsumptions[concept].add(D) 183 changed = True 184 185 def is_subsumed_by(self, C, D): 186 return D in self.subsumptions.get(C, set()) 187 188 class Individual: 189 def __init__(self, name, id=None): 190 self.id = id 191 self.name = name 192 193 class ABoxManager: 194 def __init__(self): 195 self.individuals = {} 196 self.concept_assertions = defaultdict(set) 197 self.role_assertions = defaultdict(lambda: defaultdict(set)) 198 self.next_id = 1 199 200 def add_individual(self, name): 201 if name in self.individuals: 202 return False 203 self.individuals[name] = Individual(name, self.next_id) 204 self.next_id += 1 205 return True 206 207 def add_concept_assertion(self, name, concept): 208 if name in self.individuals: 209 self.concept_assertions[name].add(concept) 210 return True 211 return False 212 213 def get_concept_closure(self, name, tbox_manager): 214 closure = set() 215 for concept in self.concept_assertions.get(name, set()): 216 closure.update(tbox_manager.subsumptions.get(concept, set())) 217 return closure 218 219 def get_all_in_concept(self, concept, tbox_manager): 220 return [name for name in self.individuals if concept in self.get_concept_closure(name, tbox_manager)] 221 222 def get_all_in_conjunction(self, concepts, tbox_manager): 223 return [name for name in self.individuals if all(c in self.get_concept_closure(name, tbox_manager) for c in concepts] 224 225 class CommandParser: 226 def __init__(self, tbox, abox, db): 227 self.tbox = tbox 228 self.abox = abox 229 self.db = db 230 231 def parse_command(self, cmd): 232 cmd = cmd.strip().rstrip(';') 233 if cmd.startswith('set '): 234 self.handle_set(cmd[4:]) 235 elif cmd.startswith('show '): 236 self.handle_show(cmd[5:]) 237 elif cmd.startswith('insert into '): 238 self.handle_insert(cmd[len('insert into '):]) 239 elif cmd.startswith('check '): 240 self.handle_check(cmd[len('check '):]) 241 elif cmd.startswith('select all from '): 242 self.handle_select(cmd[len('select all from '):]) 243 elif cmd.startswith('delete '): 244 self.handle_delete(cmd[len('delete '):]) 245 elif cmd.startswith('remove '): 246 self.handle_remove(cmd[len('remove '):]) 247 else: 248 print("Unknown command") 249 250 def handle_set(self, expr): 251 if '<=' not in expr: 252 print("Invalid set command") 253 return 254 lhs, rhs = expr.split('<=', 1) 255 lhs, rhs = lhs.strip(), rhs.strip() 256 if 'and' in lhs: 257 parts = lhs.split('and') 258 if len(parts) != 2: 259 print("Invalid conjunction") 260 return 261 c1, c2 = parts[0].strip(), parts[1].strip() 262 self.tbox.conjunction_axioms.append((c1, c2, rhs)) 263 elif lhs.startswith('exists '): 264 parts = lhs[len('exists '):].split('.', 1) 265 if len(parts) != 2: 266 print("Invalid existential") 267 return 268 role, filler = parts[0].strip(), parts[1].strip() 269 self.tbox.existential_left_axioms.append((role, filler, rhs)) 270 elif rhs.startswith('exists '): 271 parts = rhs[len('exists '):].split('.', 1) 272 if len(parts) != 2: 273 print("Invalid existential") 274 return 275 role, filler = parts[0].strip(), parts[1].strip() 276 self.tbox.existential_axioms.append((lhs, role, filler)) 277 else: 278 self.tbox.subsumption_axioms.append((lhs, rhs)) 279 self.tbox.compute_closure() 280 self.db.save_tbox(self.tbox) 281 print("Axiom added") 282 283 def handle_show(self, concept): 284 axioms = [] 285 for (C, D) in self.tbox.subsumption_axioms: 286 if C == concept or D == concept: 287 axioms.append(f"{C} <= {D}") 288 for (C1, C2, D) in self.tbox.conjunction_axioms: 289 if C1 == concept or C2 == concept or D == concept: 290 axioms.append(f"{C1} and {C2} <= {D}") 291 for (C, R, D) in self.tbox.existential_axioms: 292 if C == concept or D == concept: 293 axioms.append(f"{C} <= exists {R}.{D}") 294 for (R, C, D) in self.tbox.existential_left_axioms: 295 if C == concept or D == concept: 296 axioms.append(f"exists {R}.{C} <= {D}") 297 print("\n".join(axioms) if axioms else "No axioms found") 298 299 def handle_insert(self, expr): 300 parts = expr.split() 301 if len(parts) < 2: 302 print("Invalid insert") 303 return 304 concept, name = parts[0], parts[1] 305 self.abox.add_individual(name) 306 self.abox.add_concept_assertion(name, concept) 307 self.db.save_abox(self.abox) 308 print(f"Added {name} to {concept}") 309 310 def handle_check(self, expr): 311 if '<=' not in expr: 312 print("Invalid check") 313 return 314 C, D = expr.split('<=', 1) 315 C, D = C.strip(), D.strip() 316 print("Yes" if self.tbox.is_subsumed_by(C, D) else "No") 317 318 def handle_select(self, expr): 319 if 'and' in expr: 320 concepts = [c.strip() for c in expr.split('and')] 321 individuals = self.abox.get_all_in_conjunction(concepts, self.tbox) 322 else: 323 individuals = self.abox.get_all_in_concept(expr, self.tbox) 324 print(", ".join(individuals) if individuals else "None found") 325 326 def handle_delete(self, expr): 327 print("Delete not implemented") 328 329 def handle_remove(self, expr): 330 print("Remove not implemented") 331 332 def main(): 333 db = DatabaseHandler() 334 tbox = TBoxManager() 335 abox = ABoxManager() 336 db.load_tbox(tbox) 337 db.load_abox(abox) 338 parser = CommandParser(tbox, abox, db) 339 while True: 340 cmd = input("KB> ").strip() 341 if cmd.lower() in ('exit', 'quit'): 342 break 343 parser.parse_command(cmd) 344 345 if __name__ == '__main__': 346 main()