Usage of Processors with Event Objects

The following example demonstrates the delivery of events to the opensearch output connector

[ ]:
import json
import uuid
from logprep.factory import Factory
from logprep.util.time import TimeParser
from logprep.ng.event.log_event import LogEvent
from logprep.ng.event.event_state import EventStateType
import logging
import sys

# Configure logging
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)

document = {
    "id": f"{uuid.uuid4()}",
    "@timestamp": str(TimeParser.now()),
    "user": {
        "name": "Hubert K. Kabal",
        "email": "kabal@example.com",
        "id": 12345,
    },
}

event = LogEvent(document, original=b"", state=EventStateType.RECEIVED)

print(f"Event before processing: {json.dumps(event.data, indent=2)}")

# adding a custom field to the event
config = {
    "my generic adder": {
        "type": "ng_generic_adder",
        "rules": [{"filter": "*", "generic_adder": {"add": {"event.tags": "generic added tag"}}}],
    }
}
processor = Factory.create(config)
processor.setup()
processor.process(event)
print(f"Event after processing: {json.dumps(event.data, indent=2)}")
Event before processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "Hubert K. Kabal",
    "email": "kabal@example.com",
    "id": 12345
  }
}
DEBUG:Processor:GenericAdder (my generic adder) loaded 1 rules
DEBUG:Processor:GenericAdder (my generic adder) processing event LogEvent(data={'id': '912b6720-53e1-4b33-bdc5-9c5f404491ee', '@timestamp': '2025-07-28 13:14:36.035533+00:00', 'user': {'name': 'Hubert K. Kabal', 'email': 'kabal@example.com', 'id': 12345}}, state=received)
Event after processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "Hubert K. Kabal",
    "email": "kabal@example.com",
    "id": 12345
  },
  "event": {
    "tags": "generic added tag"
  }
}
[ ]:
print(f"Event before processing: {json.dumps(event.data, indent=2)}")

# Predetection example
config = {
    "almighty pre_detector": {
        "type": "ng_pre_detector",
        "outputs": [
            {"opensearch": "pseudonyms"}
        ],
        "rules": [
            {
                "filter": 'user.id: 12345',
                "pre_detector": {
                    "case_condition": "directly",
                    "id": "RULE_ONE_ID",
                    "mitre": [
                        "attack.something1",
                        "attack.something2"
                    ],
                    "severity": "critical",
                    "title": "Rule one",
                    "description": "Some malicious event."
                }
            }
        ],
    }
}
processor = Factory.create(config)
processor.setup()
processor.process(event)
print(f"Event after processing: {json.dumps(event.data, indent=2)}")
print(f"{len(event.extra_data)=}")
print(f"Event extra data: {json.dumps(event.extra_data[0].data, indent=2)}")
Event before processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "Hubert K. Kabal",
    "email": "kabal@example.com",
    "id": 12345
  },
  "event": {
    "tags": "generic added tag"
  }
}
DEBUG:Processor:PreDetector (almighty pre_detector) loaded 1 rules
DEBUG:Processor:PreDetector (almighty pre_detector) processing event LogEvent(data={'id': '912b6720-53e1-4b33-bdc5-9c5f404491ee', '@timestamp': '2025-07-28 13:14:36.035533+00:00', 'user': {'name': 'Hubert K. Kabal', 'email': 'kabal@example.com', 'id': 12345}, 'event': {'tags': 'generic added tag'}}, state=received)
Event after processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "Hubert K. Kabal",
    "email": "kabal@example.com",
    "id": 12345
  },
  "event": {
    "tags": "generic added tag"
  },
  "pre_detection_id": "1f686a6f-7f61-46bf-9b60-0481e97521e0"
}
len(event.extra_data)=1
Event extra data: {
  "description": "",
  "id": "RULE_ONE_ID",
  "title": "Rule one",
  "severity": "critical",
  "mitre": [
    "attack.something1",
    "attack.something2"
  ],
  "case_condition": "directly",
  "rule_filter": "user.id:\"12345\"",
  "pre_detection_id": "1f686a6f-7f61-46bf-9b60-0481e97521e0",
  "creation_timestamp": "2025-07-28T13:14:40.312029+00:00",
  "@timestamp": "2025-07-28T13:14:36.035533Z"
}
[54]:
print(f"Event before processing: {json.dumps(event.data, indent=2)}")

# Pseudonymization
config = {
    "almighty pseudonymizer": {
        "type": "ng_pseudonymizer",
        "pubkey_analyst": "../../../../../examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem",
        "pubkey_depseudo": "../../../../../examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem",
        "regex_mapping": "../../../../../examples/exampledata/rules/pseudonymizer/regex_mapping.yml",
        "hash_salt": "a_secret_tasty_ingredient",
        "outputs": [
            {"opensearch": "pseudonyms"}
        ],
        "rules": [
            {
                "filter": "*",
                "pseudonymizer": {
                    "mapping": {
                        "user.name": "RE_WHOLE_FIELD",
                    }
                }
            }
        ],
        "max_cached_pseudonyms": 1000000
    }
}
processor = Factory.create(config)
processor.setup()
processor.process(event)
print(f"Event after processing: {json.dumps(event.data, indent=2)}")
print(f"{len(event.extra_data)=}")
print(f"Event extra data: {json.dumps(event.extra_data[1].data, indent=2)}")
Event before processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "<pseudonym:811e0bc983ec82c3a44469a243b547db259ba89ce3448efd31dc6568042ed9ff>",
    "email": "kabal@example.com",
    "id": 12345
  },
  "event": {
    "tags": "generic added tag"
  },
  "pre_detection_id": "1f686a6f-7f61-46bf-9b60-0481e97521e0"
}
DEBUG:Processor:Pseudonymizer (almighty pseudonymizer) loaded 1 rules
DEBUG:Component:Checking health of almighty pseudonymizer
DEBUG:Processor:Pseudonymizer (almighty pseudonymizer) processing event LogEvent(data={'id': '912b6720-53e1-4b33-bdc5-9c5f404491ee', '@timestamp': '2025-07-28 13:14:36.035533+00:00', 'user': {'name': '<pseudonym:811e0bc983ec82c3a44469a243b547db259ba89ce3448efd31dc6568042ed9ff>', 'email': 'kabal@example.com', 'id': 12345}, 'event': {'tags': 'generic added tag'}, 'pre_detection_id': '1f686a6f-7f61-46bf-9b60-0481e97521e0'}, state=received)
Event after processing: {
  "id": "912b6720-53e1-4b33-bdc5-9c5f404491ee",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00",
  "user": {
    "name": "<pseudonym:811e0bc983ec82c3a44469a243b547db259ba89ce3448efd31dc6568042ed9ff>",
    "email": "kabal@example.com",
    "id": 12345
  },
  "event": {
    "tags": "generic added tag"
  },
  "pre_detection_id": "1f686a6f-7f61-46bf-9b60-0481e97521e0"
}
len(event.extra_data)=2
Event extra data: {
  "pseudonym": "811e0bc983ec82c3a44469a243b547db259ba89ce3448efd31dc6568042ed9ff",
  "origin": "DSsdFzcgxCsnirGibno2ixeuNBn5O5uK9f7BQ169oLE7h8q/d9I4TvlzjVl1Ia5lVCtf5BsqzjRp8WQyWFMMcWn2pyqJXf79H1AlGmRKkg2ahuEvAGv1z26q1cConS4kK+1F4pw2e9WmM+fwqWJUzBPjDZCsmIn82hZfQwjwA18=:liEUJfmrju2FcrhUkj34aw==:r4bNr3wl9es5rL4OmNs3HcZWwevlL7cq3jcVafeQVbny13pxMs2GE23OtYDkD0i7SHZjdk6YtceW26v7BeoIJDhatbMvjHNpk3ZJCSAX4LphSKo/KYYiTD3aTifMjKmc7oi2+1FehJbG6nqSh/dnl4vhOa+QLbzd1bx2G4KDqWCQ552/S1ctg2CfjgsVp4iWe1BV/KByiFTGC6banTbsHLGCO3+7qBK14ToSE/ndGAKL36nfL33rIREV7CYOQyCaC4ZN2uzfsPsF72DI5WkcDKcQhrgLOlyoIu9BCqIyuyZhS6yWmnZHZ7ss7JiW6oyH/uA8hE0exn1iTTQlNt/wXQ==:VX/wHIucunc7QMbBiuaK0w==:lHArIpHhjZoNgMwS572f",
  "@timestamp": "2025-07-28 13:14:36.035533+00:00"
}