G

База знаний

public
Guest Apr 27, 2025 Never 37
Clone
Plaintext paste1.txt 346 lines (319 loc) | 14.54 KB
1
import sqlite3
2
from collections import defaultdict
3
4
class DatabaseHandler:
5
def __init__(self, db_name='knowledge_base.db'):
6
self.conn = sqlite3.connect(db_name)
7
self.create_tables()
8
9
def create_tables(self):
10
cursor = self.conn.cursor()
11
cursor.execute('''
12
CREATE TABLE IF NOT EXISTS subsumption_axioms (
13
id INTEGER PRIMARY KEY,
14
left_concept TEXT,
15
right_concept TEXT
16
)
17
''')
18
cursor.execute('''
19
CREATE TABLE IF NOT EXISTS conjunction_axioms (
20
id INTEGER PRIMARY KEY,
21
left_concept1 TEXT,
22
left_concept2 TEXT,
23
right_concept TEXT
24
)
25
''')
26
cursor.execute('''
27
CREATE TABLE IF NOT EXISTS existential_axioms (
28
id INTEGER PRIMARY KEY,
29
left_concept TEXT,
30
role TEXT,
31
filler_concept TEXT
32
)
33
''')
34
cursor.execute('''
35
CREATE TABLE IF NOT EXISTS existential_left_axioms (
36
id INTEGER PRIMARY KEY,
37
role TEXT,
38
filler_concept TEXT,
39
right_concept TEXT
40
)
41
''')
42
cursor.execute('''
43
CREATE TABLE IF NOT EXISTS individuals (
44
id INTEGER PRIMARY KEY,
45
name TEXT UNIQUE
46
)
47
''')
48
cursor.execute('''
49
CREATE TABLE IF NOT EXISTS concept_assertions (
50
individual_id INTEGER,
51
concept TEXT,
52
FOREIGN KEY(individual_id) REFERENCES individuals(id)
53
)
54
''')
55
cursor.execute('''
56
CREATE TABLE IF NOT EXISTS role_assertions (
57
individual_id INTEGER,
58
role TEXT,
59
target_id INTEGER,
60
FOREIGN KEY(individual_id) REFERENCES individuals(id),
61
FOREIGN KEY(target_id) REFERENCES individuals(id)
62
)
63
''')
64
self.conn.commit()
65
66
def save_tbox(self, tbox_manager):
67
cursor = self.conn.cursor()
68
cursor.execute('DELETE FROM subsumption_axioms')
69
cursor.execute('DELETE FROM conjunction_axioms')
70
cursor.execute('DELETE FROM existential_axioms')
71
cursor.execute('DELETE FROM existential_left_axioms')
72
for (left, right) in tbox_manager.subsumption_axioms:
73
cursor.execute('INSERT INTO subsumption_axioms (left_concept, right_concept) VALUES (?, ?)', (left, right))
74
for (c1, c2, d) in tbox_manager.conjunction_axioms:
75
cursor.execute('INSERT INTO conjunction_axioms (left_concept1, left_concept2, right_concept) VALUES (?, ?, ?)', (c1, c2, d))
76
for (c, r, f) in tbox_manager.existential_axioms:
77
cursor.execute('INSERT INTO existential_axioms (left_concept, role, filler_concept) VALUES (?, ?, ?)', (c, r, f))
78
for (r, f, d) in tbox_manager.existential_left_axioms:
79
cursor.execute('INSERT INTO existential_left_axioms (role, filler_concept, right_concept) VALUES (?, ?, ?)', (r, f, d))
80
self.conn.commit()
81
82
def load_tbox(self, tbox_manager):
83
cursor = self.conn.cursor()
84
cursor.execute('SELECT left_concept, right_concept FROM subsumption_axioms')
85
tbox_manager.subsumption_axioms = [tuple(row) for row in cursor.fetchall()]
86
cursor.execute('SELECT left_concept1, left_concept2, right_concept FROM conjunction_axioms')
87
tbox_manager.conjunction_axioms = [tuple(row) for row in cursor.fetchall()]
88
cursor.execute('SELECT left_concept, role, filler_concept FROM existential_axioms')
89
tbox_manager.existential_axioms = [tuple(row) for row in cursor.fetchall()]
90
cursor.execute('SELECT role, filler_concept, right_concept FROM existential_left_axioms')
91
tbox_manager.existential_left_axioms = [tuple(row) for row in cursor.fetchall()]
92
tbox_manager.compute_closure()
93
94
def save_abox(self, abox_manager):
95
cursor = self.conn.cursor()
96
cursor.execute('DELETE FROM individuals')
97
cursor.execute('DELETE FROM concept_assertions')
98
cursor.execute('DELETE FROM role_assertions')
99
for name, ind in abox_manager.individuals.items():
100
cursor.execute('INSERT INTO individuals (id, name) VALUES (?, ?)', (ind.id, name))
101
for name in abox_manager.concept_assertions:
102
ind = abox_manager.individuals[name]
103
for concept in abox_manager.concept_assertions[name]:
104
cursor.execute('INSERT INTO concept_assertions (individual_id, concept) VALUES (?, ?)', (ind.id, concept))
105
for name in abox_manager.role_assertions:
106
ind = abox_manager.individuals[name]
107
for role, targets in abox_manager.role_assertions[name].items():
108
for target in targets:
109
tgt_ind = abox_manager.individuals[target]
110
cursor.execute('INSERT INTO role_assertions (individual_id, role, target_id) VALUES (?, ?, ?)', (ind.id, role, tgt_ind.id))
111
self.conn.commit()
112
113
def load_abox(self, abox_manager):
114
cursor = self.conn.cursor()
115
cursor.execute('SELECT id, name FROM individuals')
116
abox_manager.individuals = {row[1]: Individual(row[1], row[0]) for row in cursor.fetchall()}
117
abox_manager.next_id = max([ind.id for ind in abox_manager.individuals.values()] or [0]) + 1
118
cursor.execute('SELECT individual_id, concept FROM concept_assertions')
119
concept_assertions = defaultdict(set)
120
for row in cursor.fetchall():
121
ind = next((name for name, i in abox_manager.individuals.items() if i.id == row[0]), None)
122
if ind:
123
concept_assertions[ind].add(row[1])
124
abox_manager.concept_assertions = concept_assertions
125
cursor.execute('SELECT individual_id, role, target_id FROM role_assertions')
126
role_assertions = defaultdict(lambda: defaultdict(set))
127
for row in cursor.fetchall():
128
ind = next((name for name, i in abox_manager.individuals.items() if i.id == row[0]), None)
129
target = next((name for name, i in abox_manager.individuals.items() if i.id == row[2]), None)
130
if ind and target:
131
role_assertions[ind][row[1]].add(target)
132
abox_manager.role_assertions = role_assertions
133
134
class TBoxManager:
135
def __init__(self):
136
self.subsumption_axioms = []
137
self.conjunction_axioms = []
138
self.existential_axioms = []
139
self.existential_left_axioms = []
140
self.subsumptions = defaultdict(set)
141
142
def compute_closure(self):
143
concepts = set()
144
for C, D in self.subsumption_axioms:
145
concepts.update([C, D])
146
for C1, C2, D in self.conjunction_axioms:
147
concepts.update([C1, C2, D])
148
for C, R, D in self.existential_axioms:
149
concepts.update([C, D])
150
for R, C, D in self.existential_left_axioms:
151
concepts.add(D)
152
for C in concepts:
153
self.subsumptions[C].update([C, 'Top'])
154
changed = True
155
while changed:
156
changed = False
157
for C, D in self.subsumption_axioms:
158
if D not in self.subsumptions[C]:
159
self.subsumptions[C].add(D)
160
changed = True
161
for C, R, D in self.existential_axioms:
162
expr = f"exists {R}.{D}"
163
if expr not in self.subsumptions[C]:
164
self.subsumptions[C].add(expr)
165
changed = True
166
for e in list(self.subsumptions[C]):
167
if e.startswith(f"exists {R}."):
168
filler = e.split('.')[1]
169
if filler not in self.subsumptions[D]:
170
self.subsumptions[D].add(filler)
171
changed = True
172
for C1, C2, D in self.conjunction_axioms:
173
for concept in list(self.subsumptions.keys()):
174
if C1 in self.subsumptions[concept] and C2 in self.subsumptions[concept]:
175
if D not in self.subsumptions[concept]:
176
self.subsumptions[concept].add(D)
177
changed = True
178
for R, C, D in self.existential_left_axioms:
179
expr = f"exists {R}.{C}"
180
for concept in list(self.subsumptions.keys()):
181
if expr in self.subsumptions[concept] and D not in self.subsumptions[concept]:
182
self.subsumptions[concept].add(D)
183
changed = True
184
185
def is_subsumed_by(self, C, D):
186
return D in self.subsumptions.get(C, set())
187
188
class Individual:
189
def __init__(self, name, id=None):
190
self.id = id
191
self.name = name
192
193
class ABoxManager:
194
def __init__(self):
195
self.individuals = {}
196
self.concept_assertions = defaultdict(set)
197
self.role_assertions = defaultdict(lambda: defaultdict(set))
198
self.next_id = 1
199
200
def add_individual(self, name):
201
if name in self.individuals:
202
return False
203
self.individuals[name] = Individual(name, self.next_id)
204
self.next_id += 1
205
return True
206
207
def add_concept_assertion(self, name, concept):
208
if name in self.individuals:
209
self.concept_assertions[name].add(concept)
210
return True
211
return False
212
213
def get_concept_closure(self, name, tbox_manager):
214
closure = set()
215
for concept in self.concept_assertions.get(name, set()):
216
closure.update(tbox_manager.subsumptions.get(concept, set()))
217
return closure
218
219
def get_all_in_concept(self, concept, tbox_manager):
220
return [name for name in self.individuals if concept in self.get_concept_closure(name, tbox_manager)]
221
222
def get_all_in_conjunction(self, concepts, tbox_manager):
223
return [name for name in self.individuals if all(c in self.get_concept_closure(name, tbox_manager) for c in concepts]
224
225
class CommandParser:
226
def __init__(self, tbox, abox, db):
227
self.tbox = tbox
228
self.abox = abox
229
self.db = db
230
231
def parse_command(self, cmd):
232
cmd = cmd.strip().rstrip(';')
233
if cmd.startswith('set '):
234
self.handle_set(cmd[4:])
235
elif cmd.startswith('show '):
236
self.handle_show(cmd[5:])
237
elif cmd.startswith('insert into '):
238
self.handle_insert(cmd[len('insert into '):])
239
elif cmd.startswith('check '):
240
self.handle_check(cmd[len('check '):])
241
elif cmd.startswith('select all from '):
242
self.handle_select(cmd[len('select all from '):])
243
elif cmd.startswith('delete '):
244
self.handle_delete(cmd[len('delete '):])
245
elif cmd.startswith('remove '):
246
self.handle_remove(cmd[len('remove '):])
247
else:
248
print("Unknown command")
249
250
def handle_set(self, expr):
251
if '<=' not in expr:
252
print("Invalid set command")
253
return
254
lhs, rhs = expr.split('<=', 1)
255
lhs, rhs = lhs.strip(), rhs.strip()
256
if 'and' in lhs:
257
parts = lhs.split('and')
258
if len(parts) != 2:
259
print("Invalid conjunction")
260
return
261
c1, c2 = parts[0].strip(), parts[1].strip()
262
self.tbox.conjunction_axioms.append((c1, c2, rhs))
263
elif lhs.startswith('exists '):
264
parts = lhs[len('exists '):].split('.', 1)
265
if len(parts) != 2:
266
print("Invalid existential")
267
return
268
role, filler = parts[0].strip(), parts[1].strip()
269
self.tbox.existential_left_axioms.append((role, filler, rhs))
270
elif rhs.startswith('exists '):
271
parts = rhs[len('exists '):].split('.', 1)
272
if len(parts) != 2:
273
print("Invalid existential")
274
return
275
role, filler = parts[0].strip(), parts[1].strip()
276
self.tbox.existential_axioms.append((lhs, role, filler))
277
else:
278
self.tbox.subsumption_axioms.append((lhs, rhs))
279
self.tbox.compute_closure()
280
self.db.save_tbox(self.tbox)
281
print("Axiom added")
282
283
def handle_show(self, concept):
284
axioms = []
285
for (C, D) in self.tbox.subsumption_axioms:
286
if C == concept or D == concept:
287
axioms.append(f"{C} <= {D}")
288
for (C1, C2, D) in self.tbox.conjunction_axioms:
289
if C1 == concept or C2 == concept or D == concept:
290
axioms.append(f"{C1} and {C2} <= {D}")
291
for (C, R, D) in self.tbox.existential_axioms:
292
if C == concept or D == concept:
293
axioms.append(f"{C} <= exists {R}.{D}")
294
for (R, C, D) in self.tbox.existential_left_axioms:
295
if C == concept or D == concept:
296
axioms.append(f"exists {R}.{C} <= {D}")
297
print("\n".join(axioms) if axioms else "No axioms found")
298
299
def handle_insert(self, expr):
300
parts = expr.split()
301
if len(parts) < 2:
302
print("Invalid insert")
303
return
304
concept, name = parts[0], parts[1]
305
self.abox.add_individual(name)
306
self.abox.add_concept_assertion(name, concept)
307
self.db.save_abox(self.abox)
308
print(f"Added {name} to {concept}")
309
310
def handle_check(self, expr):
311
if '<=' not in expr:
312
print("Invalid check")
313
return
314
C, D = expr.split('<=', 1)
315
C, D = C.strip(), D.strip()
316
print("Yes" if self.tbox.is_subsumed_by(C, D) else "No")
317
318
def handle_select(self, expr):
319
if 'and' in expr:
320
concepts = [c.strip() for c in expr.split('and')]
321
individuals = self.abox.get_all_in_conjunction(concepts, self.tbox)
322
else:
323
individuals = self.abox.get_all_in_concept(expr, self.tbox)
324
print(", ".join(individuals) if individuals else "None found")
325
326
def handle_delete(self, expr):
327
print("Delete not implemented")
328
329
def handle_remove(self, expr):
330
print("Remove not implemented")
331
332
def main():
333
db = DatabaseHandler()
334
tbox = TBoxManager()
335
abox = ABoxManager()
336
db.load_tbox(tbox)
337
db.load_abox(abox)
338
parser = CommandParser(tbox, abox, db)
339
while True:
340
cmd = input("KB> ").strip()
341
if cmd.lower() in ('exit', 'quit'):
342
break
343
parser.parse_command(cmd)
344
345
if __name__ == '__main__':
346
main()