KeyChecker
This presentations goal it to introduce the features of the KeyChecker and how to configure it.
The challenge
I want to ensure, that the log format does not change unnoticed.
given log entry:
[1]:
document = {
"_systemd_owner_uid": "1000",
"_systemd_session": "198",
"_hostname": "dev-machine",
"_audit_session": "198",
"_boot_id": "3eef443102284373bb33022da6c23d2b",
"_systemd_unit": "session-198.scope",
"_transport": "syslog",
"_pid": "712694",
"_cmdline": "/usr/bin/sudo journalctl --no-pager -o json -f",
"_cap_effective": "1ffffffffff",
"__monotonic_timestamp": "263250766668",
"_selinux_context": "unconfined\n",
"__realtime_timestamp": "1667914601897529",
"_gid": "0",
"_uid": "1000",
"_systemd_cgroup": "/user.slice/user-1000.slice/session-198.scope",
"_comm": "sudo",
"_audit_loginuid": "1000",
"_systemd_slice": "user-1000.slice",
"_machine_id": "edafb3b3b3ed4d7a8878309023f456fe",
"syslog_timestamp": "nov 8 13:36:41 ",
"message": "pam_unix(sudo:session): session opened for user root(uid=0) by vagrant(uid=1000)",
"_systemd_user_slice": "-.slice",
"syslog_identifier": "sudo",
"_systemd_invocation_id": "19bb831be8c04629b4df55edf5b3bdcb",
"syslog_facility": "10",
"__cursor": "s=99e63e2c458b47fcbad587fb0e74be0d;i=21fd41;b=3eef443102284373bb33022da6c23d2b;m=3d4af7eb4c;t=5ecf5a15c9e39;x=36322c30d547bfb8",
"priority": "6",
"_exe": "/usr/bin/sudo",
"_source_realtime_timestamp": "1667914601879236",
}
to this:
[2]:
expected = {
"_systemd_owner_uid": "1000",
"_systemd_session": "198",
"_hostname": "dev-machine",
"_audit_session": "198",
"_boot_id": "3eef443102284373bb33022da6c23d2b",
"_systemd_unit": "session-198.scope",
"_transport": "syslog",
"_pid": "712694",
"_cmdline": "/usr/bin/sudo journalctl --no-pager -o json -f",
"_cap_effective": "1ffffffffff",
"__monotonic_timestamp": "263250766668",
"_selinux_context": "unconfined\n",
"__realtime_timestamp": "1667914601897529",
"_gid": "0",
"_uid": "1000",
"_systemd_cgroup": "/user.slice/user-1000.slice/session-198.scope",
"_comm": "sudo",
"_audit_loginuid": "1000",
"_systemd_slice": "user-1000.slice",
"_machine_id": "edafb3b3b3ed4d7a8878309023f456fe",
"syslog_timestamp": "nov 8 13:36:41 ",
"message": "pam_unix(sudo:session): session opened for user root(uid=0) by vagrant(uid=1000)",
"_systemd_user_slice": "-.slice",
"syslog_identifier": "sudo",
"_systemd_invocation_id": "19bb831be8c04629b4df55edf5b3bdcb",
"syslog_facility": "10",
"__cursor": "s=99e63e2c458b47fcbad587fb0e74be0d;i=21fd41;b=3eef443102284373bb33022da6c23d2b;m=3d4af7eb4c;t=5ecf5a15c9e39;x=36322c30d547bfb8",
"priority": "6",
"_exe": "/usr/bin/sudo",
"_source_realtime_timestamp": "1667914601879236",
}
Create rule and processor
create the rule:
[3]:
from pathlib import Path
import sys
sys.path.append("../../../../../")
import tempfile
rule_yaml = """---
filter: "message"
key_checker:
source_fields:
- _systemd_owner_uid
- _systemd_session
- _hostname
- _audit_session
- _boot_id
- _systemd_unit
- _transport
- _pid
- _cmdline
- _cap_effective
- __monotonic_timestamp
- _selinux_context
- __realtime_timestamp
- _gid
- _uid
- _systemd_cgroup
- _comm
- _audit_loginuid
- _systemd_slice
- _machine_id
- syslog_timestamp
- message
- _systemd_user_slice
- syslog_identifier
- _systemd_invocation_id
- syslog_facility
- __cursor
- priority
- _exe
- _source_realtime_timestamp
target_field: missing_fields
"""
rule_path = Path(tempfile.gettempdir()) / "concatenator"
rule_path.mkdir(exist_ok=True)
rule_file = rule_path / "data-stream.yml"
rule_file.write_text(rule_yaml)
[3]:
675
create the processor config:
[4]:
processor_config = {
"almighty_keychecker": {
"type": "key_checker",
"rules": [str(rule_path), "/dev"],
}
}
create the processor with the factory:
[5]:
from unittest import mock
import sys
from logprep.factory import Factory
mock_logger = mock.MagicMock()
keychecker = Factory.create(processor_config)
keychecker
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[5], line 4
2 import sys
3 sys.path.append("..")
----> 4 from logprep.factory import Factory
6 mock_logger = mock.MagicMock()
7 keychecker = Factory.create(processor_config, mock_logger)
ModuleNotFoundError: No module named 'logprep'
Process event
Case 1 - nothing changed
[ ]:
from copy import deepcopy
mydocument = deepcopy(document)
keychecker.process(mydocument)
assert mydocument == expected
Case 2 - missing field
[ ]:
mydocument.pop("_hostname")
mydocument.pop("syslog_timestamp")
keychecker.process(mydocument)
print(mydocument == expected)
mydocument["missing_fields"]
False
['_hostname', 'syslog_timestamp']