The Event Backlog๏
Offers a container for managing events via insertion, retrieval, and deletion.
Following code fragment demonstrates:
Create Backlog
Add Events to Backlog
[1]:
from logprep.ng.event.set_event_backlog import SetEventBacklog
from logprep.ng.event.event_state import EventStateType
from logprep.ng.event.log_event import LogEvent
backlog = SetEventBacklog()
events = [
LogEvent(
data={"id": 0, "message": "Test Event 0"},
original=b"",
state=EventStateType.RECEIVING),
LogEvent(
data={"id": 1, "message": "Test Event 1"},
original=b"",
state=EventStateType.PROCESSING),
LogEvent(
data={"id": 2, "message": "Test Event 2"},
original=b"",
state=EventStateType.ACKED,
),
]
print(f"\nLen: {len(backlog.backlog)}")
backlog.register(events)
print("\n๐ฆ Events in Backlog:")
for event in backlog.backlog:
print(f"> {event.data=}, state: {event.state}")
print(f"\nLen: {len(backlog.backlog)}")
Len: 0
๐ฆ Events in Backlog:
> event.data={'id': 0, 'message': 'Test Event 0'}, state: receiving
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing
> event.data={'id': 2, 'message': 'Test Event 2'}, state: acked
Len: 3
Following code fragment demonstrates:
Getting Event from Backlog
[2]:
processing_events = backlog.get(EventStateType.PROCESSING)
print(f"Len: {len(backlog.backlog)}")
print("\n๐ฅ Events in PROCESSING state:")
for event in processing_events:
print(f"> {event.data=}, state: {event.state}")
print(f"\nLen: {len(backlog.backlog)}")
Len: 3
๐ฅ Events in PROCESSING state:
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing
Len: 3
Following code fragment demonstrates:
Removing Events from Backlog
[3]:
print(f"Len: {len(backlog.backlog)}")
acked_events = backlog.unregister(EventStateType.ACKED)
print("\n๐งน Unregistered events with state ACKED:")
for event in acked_events:
print(f"> {event.data=}, state: {event.state}")
print("\n๐ฆ Remaining events in backlog:")
for state in EventStateType:
still_in_backlog = backlog.get(state)
for event in still_in_backlog:
print(f"> {event.data=}, state: {event.state}")
print(f"\nLen: {len(backlog.backlog)}")
Len: 3
๐งน Unregistered events with state ACKED:
> event.data={'id': 2, 'message': 'Test Event 2'}, state: acked
๐ฆ Remaining events in backlog:
> event.data={'id': 0, 'message': 'Test Event 0'}, state: receiving
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing
Len: 2
Following code fragment demonstrates:
FAILING Removing Events from Backlog
[4]:
fail_unregister_states = [
EventStateType.RECEIVING,
EventStateType.RECEIVED,
EventStateType.PROCESSING,
EventStateType.PROCESSED,
EventStateType.STORED_IN_OUTPUT,
EventStateType.STORED_IN_ERROR,
EventStateType.DELIVERED,
]
cnt_failing = 0
for state in fail_unregister_states:
try:
backlog.unregister(state)
except ValueError:
cnt_failing += 1
print(f"> Failed: unregister({state})")
print(f"Expected Failing: {len(fail_unregister_states)}\nFailed: {cnt_failing}")
success_unregister_states = [
EventStateType.FAILED,
EventStateType.ACKED,
]
print("\nShould work without exceptions")
for state in success_unregister_states:
backlog.unregister(state)
print(f"> Ok: unregister({state})")
> Failed: unregister(receiving)
> Failed: unregister(received)
> Failed: unregister(processing)
> Failed: unregister(processed)
> Failed: unregister(stored_in_output)
> Failed: unregister(stored_in_error)
> Failed: unregister(delivered)
Expected Failing: 7
Failed: 7
Should work without exceptions
> Ok: unregister(failed)
> Ok: unregister(acked)
[ ]: