Skip to content

Commit

Permalink
Merge pull request #96 from bowen-xu/SimpleEventBuffer
Browse files Browse the repository at this point in the history
Add simple eventbuffer and generate temporal results for #95
  • Loading branch information
bowen-xu authored Mar 11, 2024
2 parents 4e379ad + 02cf656 commit 52ed0fa
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 4 deletions.
133 changes: 133 additions & 0 deletions Tests/Test_Buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import unittest

from pynars import Narsese
from pynars.Config import Config
from pynars.NARS.DataStructures import EventBuffer
from pynars.Narsese import Judgement, Term, Task, Stamp, Base, Statement


class TEST_Buffer(unittest.TestCase):

def test(self):
pass # todo add regular Buffer tests if needed

class TEST_EventBuffer(unittest.TestCase):

def test_3_firstorder_event_temporal_chaining(self):
"""
Add 3 first order events to the buffer (A,B,C), each with different timestamps (A=1, B=2, C=3)
Ensure that the compound events are all created:
(A &/ B), (B &/ C), (A &/ C)
Ensure that the implication statement is created:
((A &/ B) =/> C)
"""
event_buffer: EventBuffer = EventBuffer(capacity=3)

event_A_task: Task = Narsese.parser.parse("<A1-->A2>.")
event_A_time = 0
event_A_task.stamp.t_occurrence = event_A_time

event_B_task: Task = Narsese.parser.parse("<B1-->B2>.")
event_B_time = event_A_time + (Config.temporal_duration + 1)
event_B_task.stamp.t_occurrence = event_B_time

event_C_task: Task = Narsese.parser.parse("<C1-->C2>.")
event_C_time = event_B_time + (Config.temporal_duration + 1)
event_C_task.stamp.t_occurrence = event_C_time

event_buffer.put(event_A_task)
event_buffer.put(event_B_task)
event_buffer.put(event_C_task)
results = event_buffer.generate_temporal_sentences()


A_and_B: Task = Narsese.parser.parse("(&/, <A1-->A2>,+" + str(event_B_time - event_A_time) + ",<B1-->B2>).")

B_and_C: Task = Narsese.parser.parse("(&/, <B1-->B2>,+" + str(event_C_time - event_B_time) + ",<C1-->C2>).")

A_and_C: Task = Narsese.parser.parse("(&/, <A1-->A2>,+" + str(event_C_time - event_A_time) + ",<C1-->C2>).")

A_and_B_imply_C = Narsese.parser.parse("<(&/, <A1-->A2>,+" + str(event_B_time - event_A_time) + ",<B1-->B2>,+" + str(
event_C_time - event_B_time) + ") =/> <C1-->C2>>.")

expected_results = [A_and_B.term,
B_and_C.term,
A_and_C.term,
A_and_B_imply_C.term]

for result in results:
self.assertTrue(result.term in expected_results,msg=str(result.term) + " was not found in results.")
expected_results.remove(result.term)


def test_buffer_overflow_maintains_capacity(self):
"""
Test to ensure the number of items in the buffer doesnt
exceed its upper bound
"""
capacity = 5
event_buffer: EventBuffer = EventBuffer(capacity=capacity)

# ensure the buffer adds events regularly
for i in range(capacity):
self.assertEqual(i, len(event_buffer))
event_task = Narsese.parser.parse("<A1-->A2>.")
event_task.stamp.t_occurrence = 1
event_buffer.put(event_task)

# ensure the buffer is at max capcity
self.assertEqual(capacity, len(event_buffer))

# ensure the buffer does not exceed its capacity when overflowing
event_task = Narsese.parser.parse("<A1-->A2>.")
event_task.stamp.t_occurrence = 1
event_buffer.put(event_task)
self.assertEqual(capacity, len(event_buffer))

def test_buffer_overflow_discards_older_events(self):
"""
Test to ensure that new events are added to the buffer,
whereas old events are purged from the buffer.
"""
capacity = 5
event_buffer: EventBuffer = EventBuffer(capacity=capacity)

for i in range(capacity):
event_task = Narsese.parser.parse("<A" + str(i) + " -->B" + str(i) + ">.")
event_task.stamp.t_occurrence = i + 1
event_buffer.put(event_task)

# ensure getting older/newer events functions properly
self.assertTrue(event_buffer.get_newest_event().stamp.t_occurrence > event_buffer.get_oldest_event().stamp.t_occurrence)

# add an event older than all the others, ensure it doesnt get into the buffer
old_event_task = Narsese.parser.parse("<oldA-->oldB>.")
old_event_task.stamp.t_occurrence = 0
event_buffer.put(old_event_task)
self.assertTrue(event_buffer.get_oldest_event().term != old_event_task.term)

# add an event newer than all the others, ensure it goes to the front of the buffer
new_event_task = Narsese.parser.parse("<newA-->newB>.")
new_event_task.stamp.t_occurrence = capacity
event_buffer.put(new_event_task)
self.assertTrue(event_buffer.get_newest_event().term == new_event_task.term)

if __name__ == '__main__':

test_classes_to_run = [
TEST_EventBuffer
]

loader = unittest.TestLoader()

suites = []
for test_class in test_classes_to_run:
suite = loader.loadTestsFromTestCase(test_class)
suites.append(suite)

suites = unittest.TestSuite(suites)

runner = unittest.TextTestRunner()
results = runner.run(suites)
11 changes: 9 additions & 2 deletions pynars/NARS/Control/Reasoner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pynars.Narsese._py.Budget import Budget
from pynars.Narsese._py.Statement import Statement
from pynars.Narsese._py.Task import Belief
from ..DataStructures import Bag, Memory, NarseseChannel, Buffer, Task, Concept
from ..DataStructures import Bag, Memory, NarseseChannel, Buffer, Task, Concept, EventBuffer
from ..InferenceEngine import GeneralEngine, TemporalEngine, VariableEngine
from pynars import Config
from pynars.Config import Enable
Expand Down Expand Up @@ -38,6 +38,7 @@ def __init__(self, n_memory, capacity, config='./config.json', nal_rules={1, 2,
self.memory = Memory(n_memory, global_eval=self.global_eval)
self.overall_experience = Buffer(capacity)
self.internal_experience = Buffer(capacity)
self.event_buffer = EventBuffer(3)
self.narsese_channel = NarseseChannel(capacity)
self.perception_channel = Channel(capacity)
self.channels: List[Channel] = [
Expand Down Expand Up @@ -131,11 +132,17 @@ def observe(self, tasks_derived: List[Task]):
Process Channels/Buffers
"""
judgement_revised, goal_revised, answers_question, answers_quest = None, None, None, None
# step 1. Take out an Item from `Channels`, and then put it into the `Overall Experience`
# step 1. Take out an Item from `Channels`, and then put it into the `Overall Experience` and Event Buffers
for channel in self.channels:
task_in: Task = channel.take()
if task_in is not None:
self.overall_experience.put(task_in)
if self.event_buffer.can_task_enter(task_in):
self.event_buffer.put(task_in)
# when there's a new event, run the temporal chaining
temporal_results = self.event_buffer.generate_temporal_sentences()
for result in temporal_results:
self.overall_experience.put(result)

# step 2. Take out an Item from the `Internal Experience`, with putting it back afterwards, and then put it
# into the `Overall Experience`
Expand Down
83 changes: 81 additions & 2 deletions pynars/NARS/DataStructures/_py/Buffer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from pynars.NAL.Functions import Truth_intersection, Stamp_merge
from pynars.NAL.Inference.TemporalRules import induction_composition, induction_implication
from .Bag import Bag
from pynars.Config import Config
from pynars.Narsese import Item, Task
from pynars.Narsese import Item, Task, TermType, Compound, Interval, Statement
from pynars.NAL.Functions.BudgetFunctions import *
from typing import Callable, Any
from typing import Callable, Any, List


class Buffer(Bag):
'''
Expand Down Expand Up @@ -40,3 +43,79 @@ def __init__(self, capacity: int, n_buckets: int=None, take_in_order: bool=False

def is_expired(self, put_time, current_time):
return (current_time - put_time) > self.max_duration


class EventBuffer:
'''
This buffer holds first-order events, sorted by time.
The purpose of this buffer is to generate temporal implication statements, e.g., (A &/ B =/> C)
and compound events, e.g., (A &/ B).
The operation for generating temporal statements is exhaustive. That means, for generating 3-component
implication statements like (A &/ B =/> C), the algorithm scales O(n^3) for n elements
The oldest events are at the lowest index, the newest events are at the highest index.
The larger the event's timestamp, the newer it is.
'''
def __init__(self, capacity: int):
self.buffer: List[Task] = []
self.capacity: int = capacity

def __len__(self):
return len(self.buffer)

def get_oldest_event(self):
return self.buffer[0]

def get_newest_event(self):
return self.buffer[-1]

def generate_temporal_sentences(self):
results: List[Task] = []
# first event A occurred, then event B occurred, then event C
for i in range(len(self.buffer)):
event_A_task = self.buffer[i]
for j in range(i+1,len(self.buffer)):
# create (A &/ B)
event_B_task = self.buffer[j]
compound_event_task = induction_composition(event_A_task, event_B_task)
results.append(compound_event_task) # append
for k in range(j + 1, len(self.buffer)):
# create (A &/ B) =/> C
event_C = self.buffer[k]
temporal_implication_task = induction_implication(compound_event_task, event_C)
results.append(temporal_implication_task) # append

return results

def put(self, event_task_to_insert: Task):
if not self.can_task_enter(event_task_to_insert):
raise Exception("ERROR! Only events with first-order statements can enter the EventBuffer.")

if len(self.buffer) == 0: # if nothing in the buffer, just insert it
self.buffer.append(event_task_to_insert)
return

newest_event = self.get_newest_event()

if event_task_to_insert.stamp.t_occurrence >= newest_event.stamp.t_occurrence:
# if its newer than even the newest event, just insert it at the end
self.buffer.append(event_task_to_insert)
else:
# otherwise, we have to go through the list to insert it properly
for i in range(len(self.buffer)):
buffer_event = self.buffer[i]
if event_task_to_insert.stamp.t_occurrence <= buffer_event.stamp.t_occurrence:
# the inserted event occurs first, so insert it here
self.buffer.insert(i, event_task_to_insert)
break


if len(self.buffer) > self.capacity:
# if too many events, take out the oldest event
self.buffer.pop(0)

def can_task_enter(self, task: Task):
return task.is_event \
and task.term.type == TermType.STATEMENT \
and not task.term.is_higher_order

0 comments on commit 52ed0fa

Please sign in to comment.