KeyChecker

This presentations goal it to introduce the features of the KeyChecker and how to configure it.

The challenge

I want to ensure, that the log format does not change unnoticed.

given log entry:

[1]:
document = {
    "_systemd_owner_uid": "1000",
    "_systemd_session": "198",
    "_hostname": "dev-machine",
    "_audit_session": "198",
    "_boot_id": "3eef443102284373bb33022da6c23d2b",
    "_systemd_unit": "session-198.scope",
    "_transport": "syslog",
    "_pid": "712694",
    "_cmdline": "/usr/bin/sudo journalctl --no-pager -o json -f",
    "_cap_effective": "1ffffffffff",
    "__monotonic_timestamp": "263250766668",
    "_selinux_context": "unconfined\n",
    "__realtime_timestamp": "1667914601897529",
    "_gid": "0",
    "_uid": "1000",
    "_systemd_cgroup": "/user.slice/user-1000.slice/session-198.scope",
    "_comm": "sudo",
    "_audit_loginuid": "1000",
    "_systemd_slice": "user-1000.slice",
    "_machine_id": "edafb3b3b3ed4d7a8878309023f456fe",
    "syslog_timestamp": "nov  8 13:36:41 ",
    "message": "pam_unix(sudo:session): session opened for user root(uid=0) by vagrant(uid=1000)",
    "_systemd_user_slice": "-.slice",
    "syslog_identifier": "sudo",
    "_systemd_invocation_id": "19bb831be8c04629b4df55edf5b3bdcb",
    "syslog_facility": "10",
    "__cursor": "s=99e63e2c458b47fcbad587fb0e74be0d;i=21fd41;b=3eef443102284373bb33022da6c23d2b;m=3d4af7eb4c;t=5ecf5a15c9e39;x=36322c30d547bfb8",
    "priority": "6",
    "_exe": "/usr/bin/sudo",
    "_source_realtime_timestamp": "1667914601879236",
}

to this:

[2]:
expected = {
    "_systemd_owner_uid": "1000",
    "_systemd_session": "198",
    "_hostname": "dev-machine",
    "_audit_session": "198",
    "_boot_id": "3eef443102284373bb33022da6c23d2b",
    "_systemd_unit": "session-198.scope",
    "_transport": "syslog",
    "_pid": "712694",
    "_cmdline": "/usr/bin/sudo journalctl --no-pager -o json -f",
    "_cap_effective": "1ffffffffff",
    "__monotonic_timestamp": "263250766668",
    "_selinux_context": "unconfined\n",
    "__realtime_timestamp": "1667914601897529",
    "_gid": "0",
    "_uid": "1000",
    "_systemd_cgroup": "/user.slice/user-1000.slice/session-198.scope",
    "_comm": "sudo",
    "_audit_loginuid": "1000",
    "_systemd_slice": "user-1000.slice",
    "_machine_id": "edafb3b3b3ed4d7a8878309023f456fe",
    "syslog_timestamp": "nov  8 13:36:41 ",
    "message": "pam_unix(sudo:session): session opened for user root(uid=0) by vagrant(uid=1000)",
    "_systemd_user_slice": "-.slice",
    "syslog_identifier": "sudo",
    "_systemd_invocation_id": "19bb831be8c04629b4df55edf5b3bdcb",
    "syslog_facility": "10",
    "__cursor": "s=99e63e2c458b47fcbad587fb0e74be0d;i=21fd41;b=3eef443102284373bb33022da6c23d2b;m=3d4af7eb4c;t=5ecf5a15c9e39;x=36322c30d547bfb8",
    "priority": "6",
    "_exe": "/usr/bin/sudo",
    "_source_realtime_timestamp": "1667914601879236",
}

Create rule and processor

create the rule:

[3]:
from pathlib import Path
import sys
sys.path.append("../../../../../")
import tempfile


rule_yaml = """---
filter: "message"
key_checker:
  source_fields:
    - _systemd_owner_uid
    - _systemd_session
    - _hostname
    - _audit_session
    - _boot_id
    - _systemd_unit
    - _transport
    - _pid
    - _cmdline
    - _cap_effective
    - __monotonic_timestamp
    - _selinux_context
    - __realtime_timestamp
    - _gid
    - _uid
    - _systemd_cgroup
    - _comm
    - _audit_loginuid
    - _systemd_slice
    - _machine_id
    - syslog_timestamp
    - message
    - _systemd_user_slice
    - syslog_identifier
    - _systemd_invocation_id
    - syslog_facility
    - __cursor
    - priority
    - _exe
    - _source_realtime_timestamp
  target_field: missing_fields
"""

rule_path = Path(tempfile.gettempdir()) / "concatenator"
rule_path.mkdir(exist_ok=True)
rule_file = rule_path / "data-stream.yml"
rule_file.write_text(rule_yaml)

[3]:
675

create the processor config:

[4]:
processor_config = {
    "almighty_keychecker": {
        "type": "key_checker",
        "rules": [str(rule_path), "/dev"],
    }
}

create the processor with the factory:

[5]:
from unittest import mock
import sys
from logprep.factory import Factory

mock_logger = mock.MagicMock()
keychecker = Factory.create(processor_config)
keychecker

---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[5], line 4
      2 import sys
      3 sys.path.append("..")
----> 4 from logprep.factory import Factory
      6 mock_logger = mock.MagicMock()
      7 keychecker = Factory.create(processor_config, mock_logger)

ModuleNotFoundError: No module named 'logprep'

Process event

Case 1 - nothing changed

[ ]:
from copy import deepcopy

mydocument = deepcopy(document)
keychecker.process(mydocument)
assert mydocument == expected

Case 2 - missing field

[ ]:
mydocument.pop("_hostname")
mydocument.pop("syslog_timestamp")
keychecker.process(mydocument)
print(mydocument == expected)
mydocument["missing_fields"]
False
['_hostname', 'syslog_timestamp']