Processors
Amides
The Amides processor implements the proof-of-concept Adaptive Misuse Detection System
(AMIDES). AMIDES extends conventional rule matching of SIEM systems by applying machine learning
components aiming to detect attacks that evade existing SIEM rules as well as otherwise undetected
attack variants. It learns from SIEM rules and historical benign events and can thus estimate which
SIEM rule was tried to be evaded. An overview of AMIDES is depicted in the figure below.
Overview of the AMIDES architecture.
The machine learning components of AMIDES are trained using the current SIEM rule set and historical benign events. Incoming events are transformed into feature vectors by the feature extraction component. During operation, features learned during the training phase will be re-used by the feature extraction component. Feature vectors are then passed to the Misuse Classification component which classifies events as malicious or benign. In case of a malicious result, the feature vector is passed to the Rule Attribution component which generates a ranked list of SIEM rules potentially evaded by the event. Finally, results generated by the Rule Attribution component and conventional rule matching results can be correlated for alert generation.
Since there is a plethora of different SIEM event types, the current implementation focuses on events that provide process command lines. Command lines are most commonly targeted by SIEM rules while they are also highly vulnerable to evasions. The rules and models for AMIDES provided in the deployment examples are for Sysmon Process Creation events. In general, the Amides rule format allows to create rules for other event types that provide process command lines, e.g. Process Creation events generated by Windows Security Auditing.
Misuse classification is performed by the MisuseDetector class. Instances of the
MisuseDetector contain the model for misuse classification, which includes the trained
classifier instance, the corresponding feature extractor, and an additional scaler to transform
classifier results into the pre-defined output range between 0 and 1. The processor configuration
parameter decision_threshold is used to fine-tune the classification results produced by the
misuse detector.
Rule attribution is performed by the RuleAttributor class. The num_rule_attributions
configuration parameter determines the number of rule attributions returned by the attributor.
Models and vectorizer for rule attribution and feature extraction are held by RuleAttributor
instances.
In order to speed up the detection and attribution process, the Amides processor makes use
of a LRU cache that keeps track of incoming command line samples. In case of a previously seen
command line, classification and attribution results can be retrieved from the cache in a shorter
amount of time. The max_cache_entries configuration parameter determines the maximum number
of elements of the internal cache.
Models used by the MisuseDetector and RuleAttributor are currently generated by scikit-learn.
Each trained model needs to be packed into a dictionary together with its corresponding feature
extractor and scaler. Dictionaries are then pickled and compressed (.zip). The URI or path of the
compressed models file is given by the models_path configuration parameter. An example of a
configuration of the Amides processor is given below:
Processor Configuration
1- amides:
2 type: amides
3 rules:
4 - tests/testdata/rules/rules
5 max_cache_entries: 10000
6 decision_threshold: 0.0
7 num_rule_attributions: 10
To keep track of the components performance, the Amides processor tracks several processor
metrics. This includes the mean misuse detection time, the mean rule attribution time, and several
cache-related metrics like the number of hits and misses and the current cache load.
- class logprep.processor.amides.processor.Amides.Config
Amides processor configuration class.
- max_cache_entries: int
Maximum number of cached command lines and their rule attribution results.
- decision_threshold: float
Specifies the decision threshold of the misuse detector to adjust it’s overall classification performance.
- num_rule_attributions: int
Number of rule attributions returned in case of a positive misuse detection result.
- models_path: str
Path or URI of the archive (.zip) containing the models used by the misuse detector and the rule attributor.
Security Best Practice - Processor - Amides Model
Ensure that you only use models from trusted sources, as it can be used to inject python code into the runtime.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
This rule enables to check if incoming documents are of a specific type
suitable for classification by the Amides processor. The specified
source_field should contain command line strings. In case of an
positive detection result, rule attributions are written into
the target_field.
The following example shows a complete rule:
1filter: 'some_field: "sample_cmdline"'
2amides:
3 source_fields: ["process.command_line"]
4 target_field: "rule_attributions"
5description: Sample rule for AMIDES processor.
- class logprep.processor.amides.rule.AmidesRule.Config
Config of AmidesRule to specify source fields of command lines and target field of rule attribution results.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Calculator
The Calculator can be used to calculate with or without field values.
Processor Configuration
1- calculatorname:
2 type: calculator
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.calculator.processor.Calculator.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: 'duration'
2calculator:
3 target_field: duration
4 calc: ${duration} * 10e5
5 overwrite_target: True
6description: '...'
1{"duration": "0.01"}
1{"duration": 10000.0}
- class logprep.processor.calculator.rule.CalculatorRule.Config
Config for Calculator
- calc: str
The calculation expression. Fields from the event can be used by surrounding them with
${and}.
- timeout: int
The maximum time in seconds for the calculation. Defaults to
1
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged, and the event is not tagged with the a failure tag. As soon as one field is missing no calculation is performed at all. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Following a list with example calculation expressions, where all factors and the operators can be
retrieved from a field with the schema ${your.dotted.field}:
9=>9-9=>-9--9=>9-E=>-math.e9 + 3 + 6=>9 + 3 + 69 + 3 / 11=>9 + 3.0 / 11(9 + 3)=>(9 + 3)(9+3) / 11=>(9 + 3.0) / 119 - 12 - 6=>9 - 12 - 69 - (12 - 6)=>9 - (12 - 6)2*3.14159=>2 * 3.141593.1415926535*3.1415926535 / 10=>3.1415926535 * 3.1415926535 / 10PI * PI / 10=>math.pi * math.pi / 10PI*PI/10=>math.pi * math.pi / 10PI^2=>math.pi ** 2round(PI^2)=>round(math.pi ** 2)6.02E23 * 8.048=>6.02e23 * 8.048e / 3=>math.e / 3sin(PI/2)=>math.sin(math.pi / 2)10+sin(PI/4)^2=>10 + math.sin(math.pi / 4) ** 2trunc(E)=>int(math.e)trunc(-E)=>int(-math.e)round(E)=>round(math.e)round(-E)=>round(-math.e)E^PI=>math.e ** math.piexp(0)=>1exp(1)=>math.e2^3^2=>2 ** 3 ** 2(2^3)^2=>(2 ** 3) ** 22^3+2=>2 ** 3 + 22^3+5=>2 ** 3 + 52^9=>2 ** 9sgn(-2)=>-1sgn(0)=>0sgn(0.1)=>1round(E, 3)=>round(math.e, 3)round(PI^2, 3)=>round(math.pi ** 2, 3)sgn(cos(PI/4))=>1sgn(cos(PI/2))=>0sgn(cos(PI*3/4))=>-1+(sgn(cos(PI/4)))=>1-(sgn(cos(PI/4)))=>-1hypot(3, 4)=>5multiply(3, 7)=>21all(1,1,1)=>Trueall(1,1,1,1,1,0)=>False
The calc expression is not whitespace sensitive.
Clusterer
The log clustering is mainly developed for Syslogs, unstructured and semi-structured logs. The clusterer calculates a log signature based on the message field. The log signature is calculated with heuristic and deterministic rules. The idea of a log signature is to extract a subset of the constant parts of a log and to delete the dynamic parts. If the fields syslog.facility and event.severity are in the log, then they are prefixed to the log signature.
Logs are only clustered if at least one of the following criteria is fulfilled:
Criteria 1: { "message": "A sample message", "tags": ["clusterable", ...], ... }
Criteria 2: { "message": "A sample message", "clusterable": true, ... }
Criteria 3: { "message": "A sample message", "syslog": { "facility": <number> }, "event": { "severity": <string> }, ... }
Processor Configuration
1- clusterername:
2 type: clusterer
3 rules:
4 - tests/testdata/rules/rules
5 output_field_name: target_field
- class logprep.processor.clusterer.processor.Clusterer.Config
Clusterer Configuration
- output_field_name: str
defines in which field results of the clustering should be stored.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
Rules of the clusterer are evaluated in alphanumerical order. Some rules do only make sense if they are performed in a sequence with other rules. The clusterer matches multiple rules at once and applies them all before creating a clustering signature. Therefore, it is recommended to prefix rules with numbers, i.e. 00_01_*. Here the first two digits represent a type of rules that make sense together and the second digits represent the order of rules of the same type.
A subset of terms from this field can be extracted into the clustering-signature field defined in the clusterer configuration.
Since clusterer rules must be used in a sequence, it makes no sense to perform regular
auto tests on them.
Thus, every rule can have a field tests containing signature calculation tests.
It can contain one test or a list of tests.
Each tests consists of the fields tests.raw and tests.result.
tests.raw is the input and would be usually the message.
tests.result is the expected result.
1filter: ...
2clusterer: ...
3tests:
4 raw: 'Some message'
5 result: 'Some changed message'
1filter: ...
2clusterer: ...
3tests:
4 - raw: 'Some message'
5 result: 'Some changed message'
6 - raw: 'Another message'
7 result: 'Another changed message'
In the following rule example the word byte is stemmed.
1filter: message
2clusterer:
3 target: message
4 pattern: '(bytes|Bytes|Byte)'
5 repl: 'byte'
6description: '...'
7tests:
8 raw: 'Byte is a Bytes is a bytes is a byte'
9 result: 'byte is a byte is a byte is a byte'
In the following rule example the word baz is removed.
1filter: message
2clusterer:
3 target: message
4 pattern: 'foo (bar) baz'
5 repl: ''
6description: '...'
7tests:
8 raw: 'foo bar baz'
9 result: 'foo baz'
In the following rule example the word baz is surrounded by extraction tags.
1filter: message
2clusterer:
3 target: message
4 pattern: 'foo (bar) baz'
5 repl: '<+></+>'
6description: '...'
7tests:
8 raw: 'foo bar baz'
9 result: 'foo <+>bar</+> baz'
- class logprep.processor.clusterer.rule.ClustererRule.Config
RuleConfig for Clusterer
- pattern: Pattern
Defines the regex pattern that will be matched on the
clusterer.source_fields.
- repl: str
Anything within a capture group in
clusterer.patternwill be substituted with values defined inclusterer.repl. The clusterer will only extract terms into a signature that are surrounded by the tags <+></+>. One could first use rules to remove common terms, other rules to perform stemming and finally rules to wrap terms in <+></+> to create a signature.For example: * Setting
clusterer.repl: ''would remove anything within a capture group. * Settingclusterer.repl: 'FOO'would replace anything within a capture group with FOO. * Settingclusterer.repl: '<+></+>'would surround anything within a capture group with <+></+>.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
True
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The field from where to get the value which should be clustered.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Concatenator
The concatenator processor allows to concat a list of source fields into one new target field. The concat separator and the target field can be specified. Furthermore, it is possible to directly delete all given source fields, or to overwrite the specified target field.
Processor Configuration
1- Concatenatorname:
2 type: concatenator
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.concatenator.processor.Concatenator.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: 'date AND time'
2concatenator:
3 source_fields: ["date", "time"]
4 target_field: timestamp
5 separator: " "
6 overwrite_target: True
7 delete_source_fields: True
8description: '...'
1{
2 "date": "01.01.1007",
3 "time": "13:07"
4}
1{
2 "timetsamp": "01.01.1007 13:07"
3}
- class logprep.processor.concatenator.rule.ConcatenatorRule.Config
RuleConfig for Concatenator
- separator: str
The character(s) that should be used between the combined source field values.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The source fields that should be concatenated, can contain dotted field paths.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
DatetimeExtractor
The datetime_extractor is a processor that can extract timestamps from a field and split it into its parts.
Processor Configuration
1- datetimeextractorname:
2 type: datetime_extractor
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.datetime_extractor.processor.DatetimeExtractor.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The datetime extractor requires the additional field datetime_extractor.
The additional fields datetime_extractor.source_fields and
datetime_extractor.target_field must be defined.
The first one contains the name of the field from which the timestamp should be taken
and the last one contains the name of the field under which a split timestamp should be written.
In the following example the timestamp will be extracted from
@timestamp and written to split_@timestamp.
1filter: '@timestamp'
2datetime_extractor:
3 source_fields: ['@timestamp']
4 target_field: 'split_@timestamp'
5description: '...'
- class logprep.processor.datetime_extractor.rule.DatetimeExtractorRule.Config
Config for DatetimeExtractorRule
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Deleter
The deleter is a processor that removes an entire event from further pipeline processing.
Processor Configuration
1- deletename:
2 type: deleter
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.deleter.processor.Deleter.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The example below deletes the log message if the message field equals “foo”.
1filter: 'message: "foo"'
2deleter:
3 delete: true
4description: '...'
- class logprep.processor.deleter.rule.DeleterRule.Config
Config for DeleterRule
- delete: bool
Delete or not
- target_field
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Decoder
The decoder processor decodes or parses field values from the configured
source_format. Following options for source_format are implemented:
json
base64
nginx parser for kubernetes ingress
syslog_rfc3164
syslog_rfc3164_local
syslog_rfc5324
logfmt
cri
docker
decolorize (removing color codes in logs)
Processor Configuration
1- samplename:
2 type: decoder
3 rules:
4 - tests/testdata/rules/
- class logprep.processor.decoder.processor.Decoder.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Decoder
With the decoder processor you are able to parse fields from
different formats.
A speaking example:
1filter: message
2decoder:
3 source_format: json
4 mapping:
5 message: parsed
6description: 'parse message field to the field called parsed'
1{
2 "message": "{\"timestamp\": \"2019-08-02T09:46:18.625Z\", \"log\": \"user login failed\"}"
3}
1{
2 "message": "{\"timestamp\": \"2019-08-02T09:46:18.625Z\", \"log\": \"user login failed\"}",
3 "parsed": {
4 "timestamp": "2019-08-02T09:46:18.625Z",
5 "log": "user login failed"
6 }
7}
- class logprep.processor.decoder.rule.DecoderRule.Config
Config for DecoderRule
- source_format: str
The source format in the source field. Defaults to
jsonPossible values arejson,base64,clf,nginx,syslog_rfc5424,syslog_rfc3164,syslog_rfc3164_local,logfmt,cri,docker,decolorize
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The field to decode as list with maximum one element. For multi field operations use
mappinginstead.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for decoder:
decodes_simple_json_to_target_field:
rule:
{'filter': 'message', 'decoder': {'source_fields': ['message'], 'target_field': 'new_field'}}message:
{'message': '{"to_decode": "decode value"}'}processed:
{'message': '{"to_decode": "decode value"}', 'new_field': {'to_decode': 'decode value'}}
decodes_simple_json_to_target_field_dotted:
rule:
{'filter': 'message', 'decoder': {'source_fields': ['message'], 'target_field': 'new_field'}}message:
{'message': '{"to.decode": "decode value"}'}processed:
{'message': '{"to.decode": "decode value"}', 'new_field': {'to.decode': 'decode value'}}
decodes_json_with_mapping_to_corresponding_target_fields:
rule:
{'filter': 'json_message OR escaped_message', 'decoder': {'mapping': {'json_message': 'json_field', 'escaped_message': 'escaped_field'}}}message:
{'escaped_message': '{"to_decode": "decode value"}', 'json_message': '{"json_decode": "json_value"}'}processed:
{'escaped_message': '{"to_decode": "decode value"}', 'json_message': '{"json_decode": "json_value"}', 'json_field': {'json_decode': 'json_value'}, 'escaped_field': {'to_decode': 'decode value'}}
decodes_json_with_mapping_to_corresponding_target_fields_dotted:
rule:
{'filter': 'json\.message OR escaped\.message', 'decoder': {'mapping': {'json\.message': 'json\.field', 'escaped\.message': 'escaped\.field'}}}message:
{'escaped.message': '{"to.decode": "decode value"}', 'json.message': '{"json.decode": "json.value"}'}processed:
{'escaped.message': '{"to.decode": "decode value"}', 'json.message': '{"json.decode": "json.value"}', 'json.field': {'json.decode': 'json.value'}, 'escaped.field': {'to.decode': 'decode value'}}
decodes_json_with_mapping_to_corresponding_target_fields_dotted_and_backslashes:
rule:
{'filter': 'json\.message\\ OR escaped\.message\\', 'decoder': {'mapping': {'json\.message\\': 'json\.field\\', 'escaped\.message\\': 'escaped\.field\\'}}}message:
{'escaped.message\': '{"to.decode": "decode value"}', 'json.message\': '{"json.decode": "json.value"}'}processed:
{'escaped.message\': '{"to.decode": "decode value"}', 'json.message\': '{"json.decode": "json.value"}', 'json.field\': {'json.decode': 'json.value'}, 'escaped.field\': {'to.decode': 'decode value'}}
decodes_simple_base64:
rule:
{'filter': 'message', 'decoder': {'source_fields': ['message'], 'target_field': 'new_field', 'source_format': 'base64'}}message:
{'message': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ=='}processed:
{'message': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ==', 'new_field': 'this,is,the,message'}
decodes_simple_base64_and_removes_source_field:
rule:
{'filter': 'message', 'decoder': {'source_fields': ['message'], 'target_field': 'new_field', 'source_format': 'base64', 'delete_source_fields': True}}message:
{'message': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ=='}processed:
{'new_field': 'this,is,the,message'}
decodes_simple_base64_and_removes_source_fields_with_mapping:
rule:
{'filter': 'message1', 'decoder': {'mapping': {'message1': 'new_field1', 'message2': 'new_field2'}, 'source_format': 'base64', 'delete_source_fields': True}}message:
{'message1': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ==', 'message2': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ=='}processed:
{'new_field1': 'this,is,the,message', 'new_field2': 'this,is,the,message'}
decodes_simple_base64_and_overwrites_source_fields:
rule:
{'filter': 'message1', 'decoder': {'mapping': {'message1': 'message1', 'message2': 'message2'}, 'source_format': 'base64', 'overwrite_target': True}}message:
{'message1': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ==', 'message2': 'dGhpcyxpcyx0aGUsbWVzc2FnZQ=='}processed:
{'message1': 'this,is,the,message', 'message2': 'this,is,the,message'}
parse clf:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'clf', 'overwrite_target': True}}message:
{'message': '127.0.0.1 ident alice [01/May/2025:07:20:10 +0000] "GET /index.html HTTP/1.1" 200 9481'}processed:
{'message': '127.0.0.1 ident alice [01/May/2025:07:20:10 +0000] "GET /index.html HTTP/1.1" 200 9481', 'parsed': {'host': '127.0.0.1', 'ident': 'ident', 'authuser': 'alice', 'timestamp': '01/May/2025:07:20:10 +0000', 'request_line': 'GET /index.html HTTP/1.1', 'status': '200', 'bytes': '9481'}}
parse nginx:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'nginx', 'overwrite_target': True}}message:
{'message': '192.168.32.9 - - [19/Dec/2023:14:04:42 +0000] 200 "POST /otlp/v1/metrics HTTP/1.1" 0 "-" "OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)" "-"'}processed:
{'message': '192.168.32.9 - - [19/Dec/2023:14:04:42 +0000] 200 "POST /otlp/v1/metrics HTTP/1.1" 0 "-" "OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)" "-"', 'parsed': {'agent': 'OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)', 'code': '200', 'gzip_ratio': '-', 'host': '192.168.32.9', 'method': 'POST', 'path': '/otlp/v1/metrics', 'referer': '-', 'size': '0', 'time': '19/Dec/2023:14:04:42 +0000', 'user': '-'}}
parse nginx health check:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'nginx', 'overwrite_target': True}}message:
{'message': '192.168.16.37 - - [19/Dec/2023:14:04:39 +0000] 200 "GET / HTTP/1.1" 2 "-" "kube-probe/1.32+" "-"'}processed:
{'message': '192.168.16.37 - - [19/Dec/2023:14:04:39 +0000] 200 "GET / HTTP/1.1" 2 "-" "kube-probe/1.32+" "-"', 'parsed': {'agent': 'kube-probe/1.32+', 'code': '200', 'gzip_ratio': '-', 'host': '192.168.16.37', 'method': 'GET', 'path': '/', 'referer': '-', 'size': '2', 'time': '19/Dec/2023:14:04:39 +0000', 'user': '-'}}
parse nginx opentelemetry:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'nginx', 'overwrite_target': True}}message:
{'message': '192.168.42.31 - boat-cmb-write [19/Dec/2024:14:04:33 +0000] "POST /v1/metrics HTTP/1.1" 200 2 "-" "OpenTelemetry Collector for Kubernetes/0.134.0 (linux/amd64)"'}processed:
{'message': '192.168.42.31 - boat-cmb-write [19/Dec/2024:14:04:33 +0000] "POST /v1/metrics HTTP/1.1" 200 2 "-" "OpenTelemetry Collector for Kubernetes/0.134.0 (linux/amd64)"', 'parsed': {'agent': 'OpenTelemetry Collector for Kubernetes/0.134.0 (linux/amd64)', 'code': '200', 'host': '192.168.42.31', 'method': 'POST', 'path': '/v1/metrics', 'referer': '-', 'size': '2', 'time': '19/Dec/2024:14:04:33 +0000', 'user': 'boat-cmb-write'}}
parse nginx opentelemetry 2:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'nginx', 'overwrite_target': True}}message:
{'message': '192.168.32.9 - - [19/Dec/2023:14:04:32 +0000] 400 "POST /otlp/v1/metrics HTTP/1.1" 462 "-" "OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)" "-"'}processed:
{'message': '192.168.32.9 - - [19/Dec/2023:14:04:32 +0000] 400 "POST /otlp/v1/metrics HTTP/1.1" 462 "-" "OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)" "-"', 'parsed': {'agent': 'OpenTelemetry Collector Contrib/0.132.0 (linux/amd64)', 'code': '400', 'host': '192.168.32.9', 'method': 'POST', 'path': '/otlp/v1/metrics', 'gzip_ratio': '-', 'referer': '-', 'size': '462', 'time': '19/Dec/2023:14:04:32 +0000', 'user': '-'}}
parse syslog rfc 3164:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'syslog_rfc3164', 'overwrite_target': True}}message:
{'message': "<34>Oct 3 10:15:32 mymachine su[12345]: 'su root' failed for user on /dev/pts/0"}processed:
{'message': "<34>Oct 3 10:15:32 mymachine su[12345]: 'su root' failed for user on /dev/pts/0", 'parsed': {'host': 'mymachine', 'ident': 'su', 'message': "'su root' failed for user on /dev/pts/0", 'pid': '12345', 'pri': '34', 'time': 'Oct 3 10:15:32'}}
parse syslog rfc 3164 local:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'syslog_rfc3164_local', 'overwrite_target': True}}message:
{'message': "<34>Oct 3 10:15:32 su[12345]: 'su root' failed for user on /dev/pts/0"}processed:
{'message': "<34>Oct 3 10:15:32 su[12345]: 'su root' failed for user on /dev/pts/0", 'parsed': {'ident': 'su', 'message': "'su root' failed for user on /dev/pts/0", 'pid': '12345', 'pri': '34', 'time': 'Oct 3 10:15:32'}}
parse syslog rfc 5424:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'syslog_rfc5424', 'overwrite_target': True}}message:
{'message': "<34>1 2025-01-03T14:07:15.003Z mymachine.example.com su 12345 ID47 - 'su root' failed for user on /dev/pts/0"}processed:
{'message': "<34>1 2025-01-03T14:07:15.003Z mymachine.example.com su 12345 ID47 - 'su root' failed for user on /dev/pts/0", 'parsed': {'host': 'mymachine.example.com', 'ident': 'su', 'pid': '12345', 'message': "'su root' failed for user on /dev/pts/0", 'pri': '34', 'time': '2025-01-03T14:07:15.003Z', 'msgid': 'ID47', 'extradata': '-'}}
parse logfmt:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'logfmt', 'overwrite_target': True}}message:
{'message': 'level=INFO host=Ubuntu msg="Connected to PostgreSQL database"'}processed:
{'message': 'level=INFO host=Ubuntu msg="Connected to PostgreSQL database"', 'parsed': {'host': 'Ubuntu', 'level': 'INFO', 'msg': 'Connected to PostgreSQL database'}}
parse more complex logfmt:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'logfmt', 'overwrite_target': True}}message:
{'message': 'time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="this is a log line" extra="user=foo"'}processed:
{'message': 'time=2012-11-01T22:08:41+00:00 app=loki level=WARN duration=125 message="this is a log line" extra="user=foo"', 'parsed': {'app': 'loki', 'duration': '125', 'extra': 'user=foo', 'level': 'WARN', 'message': 'this is a log line', 'time': '2012-11-01T22:08:41+00:00'}}
parse cri:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'cri', 'overwrite_target': True}}message:
{'message': '2019-04-30T02:12:41.8443515Z stdout F message'}processed:
{'message': '2019-04-30T02:12:41.8443515Z stdout F message', 'parsed': {'stream': 'stdout', 'flags': 'F', 'message': 'message', 'timestamp': '2019-04-30T02:12:41.8443515Z'}}
parse docker:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'docker', 'overwrite_target': True}}message:
{'message': '{"log":"log message","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}'}processed:
{'message': '{"log":"log message","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"}', 'parsed': {'stream': 'stderr', 'output': 'log message', 'timestamp': '2019-04-30T02:12:41.8443515Z'}}
parse docker with additional fields:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'parsed'}, 'source_format': 'docker', 'overwrite_target': True}}message:
{'message': '{"log":"log message","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z", "extra": "not expected field"}'}processed:
{'message': '{"log":"log message","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z", "extra": "not expected field"}', 'parsed': {'stream': 'stderr', 'output': 'log message', 'timestamp': '2019-04-30T02:12:41.8443515Z'}}
decolorize simple:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'message'}, 'source_format': 'decolorize', 'overwrite_target': True}}message:
{'message': 'lsrnx1b[00mx1b[01;31mexamplefile.zipx1b[00mrnx1b[01;31m'}processed:
{'message': 'lsrnexamplefile.ziprn'}
decolorize log:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'message'}, 'source_format': 'decolorize', 'overwrite_target': True}}message:
{'message': '2021-07-14T03:23:44.315Z / x1b[32minfox1b[39m: Server started on port: 3000 - Environment rn'}processed:
{'message': '2021-07-14T03:23:44.315Z / info: Server started on port: 3000 - Environment rn'}
base64 double quote escape:
rule:
{'filter': 'message', 'decoder': {'mapping': {'message': 'message'}, 'source_format': 'base64', 'overwrite_target': True}}message:
{'message': 'dGhpcyBpcyBlc2NhcGVkIG9uIHdyb25nIHBsYWNlIiBhZnRlciBlc2NhcGUK'}processed:
{'message': 'this is escaped on wrong place" after escapen'}
Dissector
The dissector is a processor that tokenizes incoming strings using defined patterns. The behavior is based of the logstash dissect filter plugin and has the same advantage that for the event processing no regular expressions are used. Additionally, it can be used to convert datatypes of given fields.
Processor Configuration
1- dissectorname:
2 type: dissector
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.dissector.processor.Dissector.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: message
2dissector:
3 mapping:
4 message: "%{}of %{extracted.message_float} and an int of %{extracted.message_int}"
5 convert_datatype:
6 extracted.message_int: "int"
7 extracted.message_float: "float"
8description: '...'
1{"message": "This message has a float of 1.23 and an int of 1337"}
1{
2 "message": "This message has a float of 1.23 and an int of 1337",
3 "extracted": {"message_float": 1.23, "message_int": 1337},
4}
Dissect Pattern Language
The dissect pattern describes the textual format of the source field.
Given a dissect pattern of %{field1} %{field2} the source field value will be dissected into
everything before the first whitespace which would be written into the field field1 and everything
after the first whitespace which would be written into the field field2.
The string between %{ and } is the desired target field. This can be declared
in dotted field notation (e.g. %{target.subfield1.subfield2}). Every subfield between the
first and the last subfield will be created if necessary.
By default, the target field will always be overwritten with the captured value. If you want to
append to a preexisting target field value, as string or list, you have to use
the + operator. If you want to use a prefix before the appended string use this notation
+( ). In this example a whitespace would be added before the extracted string is added.
If you want to use the symbols ( or ) as your separator, you have to escape with
\` (e.g. :code:`+(\()).
If you want to remove unwanted padding characters around a dissected pattern you have to use the
-(<char>) notation, while <char> can be any character similar to the +( )
notation. If for example you have a field like
"[2022-11-04 10:00:00 AM ] - 127.0.0.1" and you want to extract the timestamp and the
ip, you can use the dissect-pattern [%{time-( )}] - %{ip} to remove the unwanted spaces
after the ‘AM’. This works independent of the number of spaces.
It is also possible to capture the target field name from the source field value with the notation
%{?<your name for the reference>} (e.g. %{?key1}). In the same dissection pattern
this can be referred to with the notation %{&<the reference>} (e.g. %{&key1}).
References can be combined with the append operator. For examples see below.
Additionally an optional convert datatype can be provided after the key using | as separator
to convert the value from string to int, float or bool.
The conversion to bool is interpreted by meaning.
(e.g. yes is translated to True). When removing padding characters at the same time
then the conversion has to come after the padding character (e.g. %{field2-(#)|bool}).
If you want to reorder parts of a dissection you can give the order by adding /<position> to
the dissect pattern. A valid example would be: %{time/1} %{+time/3} %{+time/2}. When
removing padding characters at the same time then the position has to come after the padding
character (e.g. %{time-(*)/2}).
- class logprep.processor.dissector.rule.DissectorRule.Config
Config for Dissector
- convert_datatype: dict
A mapping from source field and desired datatype [optional]. The datatypes could be
float,int,bool,string
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A mapping from source fields to a dissect pattern [optional]. Dotted field notation is possible in key and in the dissect pattern.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for dissection and datatype conversion:
writes new fields with same separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{field4}'}}}message:
{'message': 'This is a message'}processed:
{'message': 'This is a message', 'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message'}
writes new fields with different separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2}:%{field3} %{field4}'}}}message:
{'message': 'This is:a message'}processed:
{'message': 'This is:a message', 'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message'}
writes new fields with long separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{field4}'}}}message:
{'message': 'This is a message'}processed:
{'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': 'message'}
writes new fields and appends to existing list:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+field4}'}}}message:
{'message': 'This is a message', 'field4': ['preexisting']}processed:
{'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': ['preexisting', 'message']}
writes new fields and appends to existing empty list:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+field4}'}}}message:
{'message': 'This is a message', 'field4': []}processed:
{'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': ['message']}
writes new fields and appends to existing string:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} is %{field3} %{+( )field4}'}}}message:
{'message': 'This is a message', 'field4': 'preexisting'}processed:
{'message': 'This is a message', 'field1': 'This', 'field3': 'a', 'field4': 'preexisting message'}
writes new dotted fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{my.new.field2} %{field3} %{+field4}'}}}message:
{'message': 'This is a message', 'field4': 'preexisting'}processed:
{'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'is'}}, 'field3': 'a', 'field4': 'preexistingmessage'}
overwrites dotted fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{my.new.field2} %{field3} %{+( )field4}'}}}message:
{'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': 'preexisting'}}}processed:
{'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'is'}}, 'field3': 'a', 'field4': 'preexisting message'}
appends to dotted fields preexisting string:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{+my.new.field2} %{field3} %{+( )field4}'}}}message:
{'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': 'preexisting'}}}processed:
{'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': 'preexistingis'}}, 'field3': 'a', 'field4': 'preexisting message'}
appends to dotted fields preexisting list:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{+my.new.field2} %{field3} %{+( )field4}'}}}message:
{'message': 'This is a message', 'field4': 'preexisting', 'my': {'new': {'field2': ['preexisting']}}}processed:
{'message': 'This is a message', 'field1': 'This', 'my': {'new': {'field2': ['preexisting', 'is']}}, 'field3': 'a', 'field4': 'preexisting message'}
processes dotted source field:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message.key1.key2': '%{field1} %{field2} %{field3} %{field4}'}}}message:
{'message': {'key1': {'key2': 'This is the message'}}}processed:
{'message': {'key1': {'key2': 'This is the message'}}, 'field1': 'This', 'field2': 'is', 'field3': 'the', 'field4': 'message'}
processes multiple mappings to different target fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'source1': '%{extracted.source1.key1} %{extracted.source1.key2} %{extracted.source1.key3}', 'source2': '%{extracted.source2.key1} %{extracted.source2.key2} %{extracted.source2.key3}'}}}message:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}processed:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'source1': {'key1': 'This', 'key2': 'is', 'key3': 'source1'}, 'source2': {'key1': 'This', 'key2': 'is', 'key3': 'source2'}}}
processes multiple mappings to same target fields (overwrite):
rule:
{'filter': 'message', 'dissector': {'mapping': {'source1': '%{extracted.key1} %{extracted.key2} %{extracted.key3}', 'source2': '%{extracted.key1} %{extracted.key2} %{extracted.key3}'}}}message:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}processed:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'key1': 'This', 'key2': 'is', 'key3': 'source2'}}
processes multiple mappings to same target fields (appending):
rule:
{'filter': 'message', 'dissector': {'mapping': {'source1': '%{+extracted.key1} %{+extracted.key2} %{+extracted.key3}', 'source2': '%{+( )extracted.key1} %{+( )extracted.key2} %{+( )extracted.key3}'}}}message:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2'}processed:
{'message': 'This message does not matter', 'source1': 'This is source1', 'source2': 'This is source2', 'extracted': {'key1': 'This This', 'key2': 'is is', 'key3': 'source1 source2'}}
append to new field in different order as string:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{+( )extracted/4} %{+( )extracted/3} %{+( )extracted/2} %{+extracted/1}'}}}message:
{'message': 'This is the message'}processed:
{'message': 'This is the message', 'extracted': 'message the is This'}
append to existing field in different order as string:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{+( )extracted/4} %{+( )extracted/3} %{+( )extracted/2} %{+( )extracted/1}'}}}message:
{'message': 'This is the message', 'extracted': 'preexisting'}processed:
{'message': 'This is the message', 'extracted': 'preexisting message the is This'}
append to existing empty list field in different order as list:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{+extracted/4} %{+extracted/3} %{+extracted/2} %{+extracted/1}'}}}message:
{'message': 'This is the message', 'extracted': []}processed:
{'message': 'This is the message', 'extracted': ['message', 'the', 'is', 'This']}
append to existing prefilled field in different order as list:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{+extracted/4} %{+extracted/3} %{+extracted/2} %{+extracted/1}'}}}message:
{'message': 'This is the message', 'extracted': ['preexisting']}processed:
{'message': 'This is the message', 'extracted': ['preexisting', 'message', 'the', 'is', 'This']}
append to new field in specified order as string with multiple fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{}: %{+( )extracted/2}', 'message2': '%{}: %{+extracted/1}'}}}message:
{'message': 'The first message: first', 'message2': 'The second message: second'}processed:
{'message': 'The first message: first', 'message2': 'The second message: second', 'extracted': 'second first'}
converts datatype without mapping:
rule:
{'filter': 'message', 'dissector': {'convert_datatype': {'message': 'int'}}}message:
{'message': '42'}processed:
{'message': 42}
converts datatype with mapping in dotted field notation:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{}of %{extracted.message_float} and a int of %{extracted.message_int}'}, 'convert_datatype': {'extracted.message_int': 'int', 'extracted.message_float': 'float'}}}message:
{'message': 'This message has a float of 1.23 and a int of 1337'}processed:
{'message': 'This message has a float of 1.23 and a int of 1337', 'extracted': {'message_float': 1.23, 'message_int': 1337}}
indirect field notation: uses captured field as key:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key}'}}}message:
{'message': 'This is the message'}processed:
{'message': 'This is the message', 'This': 'is the message'}
indirect field notation: uses captured field as key and appends to it:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key} %{} %{+( )&key}'}}}message:
{'message': 'This is the message'}processed:
{'message': 'This is the message', 'This': 'is message'}
handles special chars as captured content:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{+field4}'}}}message:
{'message': 'This is \a + mess}age'}processed:
{'message': 'This is \a + mess}age', 'field1': 'This', 'field2': 'is', 'field3': '\a', 'field4': '+ mess}age'}
handles special chars in captured content and target field names:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{~field1} %{fie ld2} %{+field4}'}}}message:
{'message': '&This isx02 mess}age /1'}processed:
{'message': '&This isx02 mess}age /1', '~field1': '&This', 'fie ld2': 'isx02', 'field4': 'mess}age /1'}
deletes source fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{field1} %{field2} %{field3} %{field4}', 'message2': '%{field21} %{field22} %{field23} %{field24}'}, 'delete_source_fields': True}}message:
{'message': 'This is a message', 'message2': 'This is a message'}processed:
{'field1': 'This', 'field2': 'is', 'field3': 'a', 'field4': 'message', 'field21': 'This', 'field22': 'is', 'field23': 'a', 'field24': 'message'}
parses path elements:
rule:
{'filter': 'path', 'dissector': {'mapping': {'path': '/%{field1}/%{field2}/%{field3}/%{field4}'}}}message:
{'path': '/this/is/the/path'}processed:
{'path': '/this/is/the/path', 'field1': 'this', 'field2': 'is', 'field3': 'the', 'field4': 'path'}
Appending without separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'INFO#%{date}#%{+date}#MOREINFO%{}'}}}message:
{'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO'}processed:
{'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO', 'date': '2022 12 06 15:12:30:534+0100'}
Appending with special field separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'INFO#%{+(\()date}#%{+(\))date}#MOREINFO%{}'}}}message:
{'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO'}processed:
{'message': 'INFO#2022 12 06 15:12:30:534#+0100#MOREINFO', 'date': '(2022 12 06 15:12:30:534)+0100'}
Dissection with delimiter ending:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{target}.'}}}message:
{'message': 'this is the message.'}processed:
{'message': 'this is the message.', 'target': 'the message'}
Convert datatype via dissect pattern:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{field1|int} message and this is %{field2|bool}'}}}message:
{'message': 'this is 42 message and this is 0'}processed:
{'message': 'this is 42 message and this is 0', 'field1': 42, 'field2': False}
Strip char after dissecting:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-( )}] - %{ip}'}}}message:
{'message': '[2022-11-04 10:00:00 AM ] - 127.0.0.1'}processed:
{'message': '[2022-11-04 10:00:00 AM ] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}
Strip special char after dissecting:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(#)}] - %{ip}'}}}message:
{'message': '[2022-11-04 10:00:00 AM####] - 127.0.0.1'}processed:
{'message': '[2022-11-04 10:00:00 AM####] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}
Strip another special char after dissecting:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(?)}] - %{ip}'}}}message:
{'message': '[2022-11-04 10:00:00 AM?????] - 127.0.0.1'}processed:
{'message': '[2022-11-04 10:00:00 AM?????] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}
Strip char on both sides:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time-(*)}] - %{ip}'}}}message:
{'message': '[***2022-11-04 10:00:00 AM***] - 127.0.0.1'}processed:
{'message': '[***2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}
Strip char while appending:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time} %{+( )time} %{+( )time-(*)}] - %{ip}'}}}message:
{'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1'}processed:
{'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 10:00:00 AM', 'ip': '127.0.0.1'}
Strip char while changing position:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '[%{time/1} %{+( )time/3} %{+( )time-(*)/2}] - %{ip}'}}}message:
{'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1'}processed:
{'message': '[2022-11-04 10:00:00 AM***] - 127.0.0.1', 'time': '2022-11-04 AM 10:00:00', 'ip': '127.0.0.1'}
Strip char in indirect field notation:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{?key} %{&key-(#)} %{} %{+( )&key-(#)}'}}}message:
{'message': 'This is## the message####'}processed:
{'message': 'This is## the message####', 'This': 'is message'}
Strip char while inferring datatype:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'this is %{field1-(#)|int} message and this is %{field2-(#)|bool}'}}}message:
{'message': 'this is 42#### message and this is 0##'}processed:
{'message': 'this is 42#### message and this is 0##', 'field1': 42, 'field2': False}
extract end of string:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'system_%{type}'}}}message:
{'message': 'system_monitor'}processed:
{'message': 'system_monitor', 'type': 'monitor'}
copy field - dissect without separator:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{sys_type}'}}}message:
{'message': 'system_monitor'}processed:
{'message': 'system_monitor', 'sys_type': 'system_monitor'}
ignore missing fields:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': '%{sys_type}', 'does_not_exist': '%{sys_type}'}, 'ignore_missing_fields': True}}message:
{'message': 'system_monitor'}processed:
{'message': 'system_monitor', 'sys_type': 'system_monitor'}
handle curly braces in message simple case:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'proxy{addr=%{destination.address}}'}}}message:
{'message': 'proxy{addr=10.99.172.10:4191}'}processed:
{'destination': {'address': '10.99.172.10:4191'}, 'message': 'proxy{addr=10.99.172.10:4191}'}
handle curly braces in message full case:
rule:
{'filter': 'message', 'dissector': {'mapping': {'message': 'proxy{addr=%{destination.address}}:service{ns=linkerd-multicluster name=%{destination.domain} port=4191}:endpoint{addr=%{source.address}}: %{log.logger}: %{message}'}}}message:
{'message': 'proxy{addr=10.99.172.10:4191}:service{ns=linkerd-multicluster name=probe-gateway-bbb port=4191}:endpoint{addr=192.8.177.98:4191}: linkerd_reconnect: Failed to connect error=connect timed out after 1s'}processed:
{'destination': {'address': '10.99.172.10:4191', 'domain': 'probe-gateway-bbb'}, 'log': {'logger': 'linkerd_reconnect'}, 'message': 'Failed to connect error=connect timed out after 1s', 'source': {'address': '192.8.177.98:4191'}}
DomainLabelExtractor
The domain_label_extractor is a processor that splits a domain into it’s corresponding labels
like registered_domain, top_level_domain and subdomain. If instead an IP
is given in the target field an informational tag is added to the configured tags field. If
neither a domain nor an ip address can be recognized an invalid error tag will be added to the
tag field in the event. The added tags contain each the target field name that was checked by the
configured rule, such that it is possible to distinguish between different domain fields in one
event. For example for the target field url.domain following tags could be added:
invalid_domain_in_url_domain and ip_in_url_domain
Processor Configuration
1- domainlabelextractorname:
2 type: domain_label_extractor
3 rules:
4 - tests/testdata/rules/rules
5 tagging_field_name: resolved
- class logprep.processor.domain_label_extractor.processor.DomainLabelExtractor.Config
DomainLabelExtractor config
- tagging_field_name: str
Optional configuration field that defines into which field in the event the informational tags should be written to. If this field is not present it defaults to
tags.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The domain label extractor requires the additional field domain_label_extractor.
The mandatory keys under domain_label_extractor are source_fields
and target_field. Former is used to identify the field
(declared as list with one element) which contains the domain.
And the latter is used to define the parent field where theresults should be written to.
Both fields can be dotted subfields. The sub fields of the parent output field of the
result are: registered_domain, top_level_domain and subdomain.
In the following example the domain www.sub.domain.de
will be split into it’s subdomain www.sub, it’s
registered domain domain and lastly it’s TLD de:
1filter: 'url'
2domain_label_extractor:
3 source_fields: ['url.domain']
4 target_field: 'url'
5description: '...'
The example rule applied to the input event
{
"url": {
"domain": "www.sub.domain.de"
}
}
will result in the following output
{
"url": {
"domain": "www.sub.domain.de",
"registered_domain": "domain.de",
"top_level_domain": "de",
"subdomain": "www.sub"
}
}
- class logprep.processor.domain_label_extractor.rule.DomainLabelExtractorRule.Config
Config for DomainLabelExtractorRule
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
DomainResolver
The domain_resolver is a processor that can resolve domains inside a defined field.
Processor Configuration
1- domainresolvername:
2 type: domain_resolver
3 rules:
4 - tests/testdata/rules/rules
5 timeout: 0.5
6 max_cached_domains: 20000
7 max_caching_days: 1
8 hash_salt: secure_salt
9 cache_enabled: true
10 debug_cache: false
- class logprep.processor.domain_resolver.processor.DomainResolver.Config
DomainResolver config
- timeout: float
Timeout for resolving of domains.
Security Best Practice - Processor - Domain Resolver Timeout
Ensure to set this to a reasonable value to avoid DOS attacks by malicious domains in your logs. The default is set to 0.5 seconds.
- max_cached_domains: int
The maximum number of cached domains. One cache entry requires ~250 Byte, thus 10 million elements would require about 2.3 GB RAM. The cache is not persisted. Restarting Logprep does therefore clear the cache.
Security Best Practice - Processor - Domain Resolver Max Cached Domains
Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.
- max_caching_days: int
Number of days a domains is cached after the last time it appeared. This caching reduces the CPU load of Logprep (no demanding encryption must be performed repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). Setting the caching days to Null deactivates the caching. In case the cache size has been exceeded (see domain_resolver.max_cached_domains),the oldest cached resolved domains will be discarded first.Thus, it is possible that a domain is re-added to the cache before max_caching_days has elapsed if it was discarded due to the size limit.
- hash_salt: str
A salt that is used for hashing.
- cache_enabled: bool
If enabled activates a cache such that already seen domains do not need to be resolved again.
- debug_cache: bool
If enabled adds debug information to the current event, for example if the event was retrieved from the cache or newly resolved, as well as the cache size.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The domain resolver requires the additional field domain_resolver.
The additional field domain_resolver.source_fields must be defined as list with one element.
It contains the field from which an URL should be parsed and then written to resolved_ip.
The URL can be located in continuous text insofar the URL is valid.
Optionally, the output field can be configured (overriding the default resolved_ip) using the parameter target_field.
This can be a dotted subfield.
In the following example the URL from the field url will be extracted and written to resolved_ip.
1 filter: url
2 domain_resolver:
3 source_fields: [url]
4 description: '...'
- class logprep.processor.domain_resolver.rule.DomainResolverRule.Config
RuleConfig for DomainResolver
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processor output to. Defaults to
resovled_ip
- tests: List[Dict[str, str]]
Custom tests for this rule.
Dropper
The dropper is a processor that removes fields from log messages. Which fields are deleted is determined within each rule.
Processor Configuration
1- droppername:
2 type: dropper
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.dropper.processor.Dropper.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
Which fields are removed is defined in the additional field drop.
It contains a list of fields in dot notation.
For nested fields all subfields are also removed if they are empty.
If only the specified subfield should be removed, then this can be achieved by setting
the option drop_full: false.
In the following example the field keep_me.drop_me is deleted while
the fields keep_me and keep_me.keep_me_too are kept.
1filter: keep_me.drop_me
2dropper:
3 drop:
4 - keep_me.drop_me
1[{
2 "keep_me": {
3 "drop_me": "something",
4 "keep_me_too": "something"
5 }
6}]
1[{
2 "keep_me": {
3 "keep_me_too": "something"
4 }
5}]
- class logprep.processor.dropper.rule.DropperRule.Config
RuleConfig for DropperRule
- drop: list
List of fields to drop
- drop_full: bool
Drop recursive? defaults to [True]
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- tests: List[Dict[str, str]]
Custom tests for this rule.
FieldManager
The field_manager processor copies or moves values from multiple source fields to one target field. Additionally, it can be used to merge multiple source field values into one target field value. In this process, source field lists will be merged.
Processor Configuration
1- fieldmanagername:
2 type: field_manager
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.field_manager.processor.FieldManager.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: client.ip
2field_manager:
3 source_fields:
4 - client.ip
5 - destination.ip
6 - host.ip
7 - observer.ip
8 - server.ip
9 - source.ip
10 - server.nat.ip
11 - client.nat.ip
12 target_field: related.ip
13 merge_with_target: True
14description: '...'
1{
2 "client": {"ip": ["127.0.0.1", "fe89::", "192.168.5.1"], "nat": {"ip": "223.2.3.2"}},
3 "destination": {"ip": "8.8.8.8"},
4 "host": {"ip": ["192.168.5.1", "180.22.66.3"]},
5 "observer": {"ip": "10.10.2.33"},
6 "server": {"ip": "10.10.2.33", "nat": {"ip": "180.22.66.1"}},
7 "source": {"ip": "10.10.2.33"}
8}
1{
2 "client": {"ip": ["127.0.0.1", "fe89::", "192.168.5.1"], "nat": {"ip": "223.2.3.2"}},
3 "destination": {"ip": "8.8.8.8"},
4 "host": {"ip": ["192.168.5.1", "180.22.66.3"]},
5 "observer": {"ip": "10.10.2.33"},
6 "server": {"ip": "10.10.2.33", "nat": {"ip": "180.22.66.1"}},
7 "source": {"ip": "10.10.2.33"},
8 "related": {
9 "ip": [
10 "10.10.2.33",
11 "127.0.0.1",
12 "180.22.66.1",
13 "180.22.66.3",
14 "192.168.5.1",
15 "223.2.3.2",
16 "8.8.8.8",
17 "fe89::"
18 ]
19 }
20}
- class logprep.processor.field_manager.rule.FieldManagerRule.Config
Config for FieldManagerRule
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for field_manager:
copies single field to non existing target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field'}}message:
{'message': 'This is a message'}processed:
{'message': 'This is a message', 'new_field': 'This is a message'}
copies single field to existing target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'overwrite_target': True}}message:
{'message': 'This is a message', 'new_field': 'existing value'}processed:
{'message': 'This is a message', 'new_field': 'This is a message'}
moves single field to non existing target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'delete_source_fields': True}}message:
{'message': 'This is a message'}processed:
{'new_field': 'This is a message'}
moves single field to existing target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'existing', 'delete_source_fields': True, 'overwrite_target': True}}message:
{'message': 'This is a message', 'existing': 'existing'}processed:
{'existing': 'This is a message'}
moves single field to existing target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'delete_source_fields': True, 'overwrite_target': True}}message:
{'message': 'This is a message', 'new_field': 'existing content'}processed:
{'new_field': 'This is a message'}
moves field and writes as list to target field:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'message': 'This is a message'}processed:
{'new_field': ['This is a message']}
moves multiple fields and writes them as list to non existing target field:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': 'value1', 'field2': 'value2', 'field3': 'value3'}processed:
{'new_field': ['value1', 'value2', 'value3']}
moves multiple fields and writes them as list to existing target field:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True, 'overwrite_target': True}}message:
{'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}processed:
{'new_field': ['value1', 'value2', 'value3']}
moves multiple fields and replaces existing target field with list including the existing value:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3']}
moves multiple fields and writes them to a existing list:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': ['i exist']}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3']}
moves multiple fields and writes them to a existing target field as list:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': 'value1', 'field2': 'value2', 'field3': 'value3', 'new_field': 'i exist'}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3']}
moves multiple fields and merges to target list:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': ['value1', 'value2', 'value3'], 'field2': ['value4'], 'field3': ['value5', 'value6'], 'new_field': ['i exist']}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3', 'value4', 'value5', 'value6']}
moves multiple fields and merges to target list with different source types:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': ['value1', 'value2', 'value3'], 'field2': 'value4', 'field3': ['value5', 'value6'], 'new_field': ['i exist']}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3', 'value4', 'value5', 'value6']}
(‘moves multiple fields and merges to target list ‘, ‘with different source types and filters duplicates’):
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True}}message:
{'field1': ['value1', 'value2', 'value3', 'value5'], 'field2': 'value4', 'field3': ['value5', 'value6', 'value4'], 'new_field': ['i exist']}processed:
{'new_field': ['i exist', 'value1', 'value2', 'value3', 'value5', 'value4', 'value6']}
(‘moves multiple fields and merges to target list ‘, ‘with different source types and filters duplicates and overwrites target’):
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True, 'delete_source_fields': True, 'overwrite_target': True}}message:
{'field1': ['value1', 'value2', 'value3', 'value5'], 'field2': 'value4', 'field3': ['value5', 'value6', 'value4'], 'new_field': ['i exist']}processed:
{'new_field': ['value1', 'value2', 'value3', 'value5', 'value4', 'value6']}
real world example from documentation:
rule:
{'filter': 'client.ip', 'field_manager': {'source_fields': ['client.ip', 'destination.ip', 'host.ip', 'observer.ip', 'server.ip', 'source.ip', 'server.nat.ip', 'client.nat.ip'], 'target_field': 'related.ip', 'merge_with_target': True}}message:
{'client': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1'], 'nat': {'ip': '223.2.3.2'}}, 'destination': {'ip': '8.8.8.8'}, 'host': {'ip': ['192.168.5.1', '180.22.66.3']}, 'observer': {'ip': '10.10.2.33'}, 'server': {'ip': '10.10.2.33', 'nat': {'ip': '180.22.66.1'}}, 'source': {'ip': '10.10.2.33'}}processed:
{'client': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1'], 'nat': {'ip': '223.2.3.2'}}, 'destination': {'ip': '8.8.8.8'}, 'host': {'ip': ['192.168.5.1', '180.22.66.3']}, 'observer': {'ip': '10.10.2.33'}, 'server': {'ip': '10.10.2.33', 'nat': {'ip': '180.22.66.1'}}, 'source': {'ip': '10.10.2.33'}, 'related': {'ip': ['127.0.0.1', 'fe89::', '192.168.5.1', '8.8.8.8', '180.22.66.3', '10.10.2.33', '180.22.66.1', '223.2.3.2']}}
copies multiple fields to multiple target fields:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}}}message:
{'field': {'one': 1, 'two': 2, 'three': 3}}processed:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': 3}
copies multiple fields to multiple target fields, while overwriting existing fields:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'overwrite_target': True}}message:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'three': 'exists already'}processed:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': 3}
copies multiple fields to multiple target fields, while one list will be extended:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}message:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'three': ['exists already']}processed:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'one': 1, 'two': 2, 'three': ['exists already', 3]}
copies multiple fields to multiple target fields, while one list will be extended with existing list:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}message:
{'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'three': ['exists already']}processed:
{'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'one': 1, 'two': 2, 'three': ['exists already', 3, 3]}
copies multiple fields to multiple target fields, while one target list will be overwritten with existing list:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'overwrite_target': True}}message:
{'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'three': ['exists already']}processed:
{'field': {'one': 1, 'two': 2, 'three': [3, 3]}, 'one': 1, 'two': 2, 'three': [3, 3]}
copies multiple fields to multiple target fields, while one source field is missing:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}}}message:
{'field': {'one': 1, 'three': 3}}processed:
{'field': {'one': 1, 'three': 3}, 'one': 1, 'three': 3, 'tags': ['_field_manager_missing_field_warning']}
moves multiple fields to multiple target fields:
rule:
{'filter': 'field', 'field_manager': {'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'delete_source_fields': True}}message:
{'field': {'one': 1, 'two': 2, 'three': 3}}processed:
{'one': 1, 'two': 2, 'three': 3}
Combine fields to list and copy fields at the same time:
rule:
{'filter': 'field', 'field_manager': {'source_fields': ['source.one', 'source.two'], 'target_field': 'merged', 'mapping': {'field.one': 'one', 'field.two': 'two', 'field.three': 'three'}, 'merge_with_target': True}}message:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'source': {'one': ['a'], 'two': ['b']}}processed:
{'field': {'one': 1, 'two': 2, 'three': 3}, 'source': {'one': ['a'], 'two': ['b']}, 'one': 1, 'two': 2, 'three': 3, 'merged': ['a', 'b']}
Ignore missing fields: No warning and no failure tag if source field is missing:
rule:
{'filter': 'field.a', 'field_manager': {'mapping': {'field.a': 'target_field', 'does.not.exists': 'target_field'}, 'ignore_missing_fields': True}}message:
{'field': {'a': 'first', 'b': 'second'}}processed:
{'field': {'a': 'first', 'b': 'second'}, 'target_field': 'first'}
merge_with_target preserves list ordering:
rule:
{'filter': '(foo) OR (test)', 'field_manager': {'id': '5cfa7a26-94af-49de-bc82-460c42e9dc56', 'source_fields': ['foo', 'test'], 'target_field': 'existing_list', 'delete_source_fields': False, 'overwrite_target': False, 'merge_with_target': True}}message:
{'existing_list': ['hello', 'world'], 'foo': 'bar', 'test': 'value'}processed:
{'existing_list': ['hello', 'world', 'bar', 'value'], 'foo': 'bar', 'test': 'value'}
Convert existing target to list:
rule:
{'filter': 'message', 'field_manager': {'source_fields': ['message'], 'target_field': 'new_field', 'merge_with_target': True}}message:
{'message': 'Value B', 'new_field': 'Value A'}processed:
{'message': 'Value B', 'new_field': ['Value A', 'Value B']}
Convert existing target to list with multiple source fields:
rule:
{'filter': 'field1 OR field2 OR field3', 'field_manager': {'source_fields': ['field1', 'field2', 'field3'], 'target_field': 'new_field', 'merge_with_target': True}}message:
{'field1': 'Value B', 'field2': 'Value C', 'field3': 'Value D', 'new_field': 'Value A'}processed:
{'field1': 'Value B', 'field2': 'Value C', 'field3': 'Value D', 'new_field': ['Value A', 'Value B', 'Value C', 'Value D']}
Merge source dict into existing target dict:
rule:
{'filter': 'source', 'field_manager': {'source_fields': ['source'], 'target_field': 'target', 'merge_with_target': True}}message:
{'source': {'source1': 'value'}, 'target': {'target1': 'value'}}processed:
{'source': {'source1': 'value'}, 'target': {'source1': 'value', 'target1': 'value'}}
Merge multiple source dicts into existing target dict:
rule:
{'filter': 'source1', 'field_manager': {'source_fields': ['source1', 'source2', 'source3'], 'target_field': 'target', 'delete_source_fields': True, 'merge_with_target': True}}message:
{'source1': {'source1': 'value'}, 'source2': {'source2': 'value'}, 'source3': {'source-nested': {'foo': 'bar'}}, 'target': {'target1': 'value'}}processed:
{'target': {'source1': 'value', 'source2': 'value', 'source-nested': {'foo': 'bar'}, 'target1': 'value'}}
overlapping source with target single processing:
rule:
{'filter': 'host', 'field_manager': {'source_fields': ['host'], 'target_field': 'host.name', 'overwrite_target': True}}message:
{'host': 'example.com'}processed:
{'host': {'name': 'example.com'}}
overlapping source with target mapping processing:
rule:
{'filter': 'host', 'field_manager': {'mapping': {'host': 'host.name'}, 'overwrite_target': True}}message:
{'host': 'example.com'}processed:
{'host': {'name': 'example.com'}}
GenericAdder
The generic_adder is a processor that adds new fields and values to documents based on a list. The list resides inside a rule and/or inside a file.
Processor Configuration
1- genericaddername:
2 type: generic_adder
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.generic_adder.processor.GenericAdder.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The generic adder requires the additional field generic_adder.
The field generic_adder.add can be defined.
It contains a dictionary of field names and values that should be added.
If dot notation is being used, then all fields on the path are being automatically created.
In the following example, the field some.added.field with the
value some added value is being added.
1filter: add_generic_test
2generic_adder:
3 add:
4 some.added.field: some added value
5description: '...'
Alternatively, the additional field generic_adder.add_from_file can be added.
It contains the path or url to a file with a YML file that contains a dictionary of field names and
values that should be added to the document.
Instead of a path, a list of paths can be used to add multiple files.
All of those files must exist.
If a list is used, it is possible to tell the generic adder to only use the first existing
file by setting generic_adder.only_first_existing_file: true.
In that case, only one file must exist.
Additions from generic_adder.add and generic_adder.add_from_file are
combined.
In the following example a dictionary with field names and values is loaded from the file
at PATH_TO_FILE_WITH_LIST.
This dictionary is used like the one that can be defined via generic_adder.add.
1filter: 'add_generic_test'
2generic_adder:
3 add_from_file: PATH_TO_FILE_WITH_LIST
4description: '...'
In the following example two files are being used.
1filter: 'add_generic_test'
2generic_adder:
3 add_from_file:
4 - PATH_TO_FILE_WITH_LIST
5 - ANOTHER_PATH_TO_FILE_WITH_LIST
6description: '...'
In the following example two files are being used, but only the first existing file is being loaded.
1filter: 'add_generic_test'
2generic_adder:
3 only_first_existing_file: true
4 add_from_file:
5 - PATH_TO_FILE_THAT_DOES_NOT_EXIST
6 - PATH_TO_FILE_WITH_LIST
7description: '...'
- class logprep.processor.generic_adder.rule.GenericAdderRule.Config
Config for GenericAdderRule
- add: dict
Contains a dictionary of field names and values that should be added. If dot notation is being used, then all fields on the path are being automatically created.
- add_from_file: list
Contains the path or url to YML file that contains a dictionary of field names and values that should be added to the document. Instead of a path, a list of paths can be used to add multiple files. All of those files must exist. For string format see Getters
Security Best Practice - Processor - Generic Adder Add From File Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - Generic Adder Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- only_first_existing_file: bool
If a list is used, it is possible to tell the generic adder to only use the first existing file by setting
generic_adder.only_first_existing_file: true. In that case, only one file must exist.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
GenericResolver
The generic_resolver resolves log event values using regex lists.
Processor Configuration
1- genericresolvername:
2 type: generic_resolver
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.generic_resolver.processor.GenericResolver.Config
GenericResolver config
- max_cache_entries: int
(Optional) Size of cache for results when resolving from a list. The cache can be disabled by setting this option to
0.Security Best Practice - Processor - Generic Resolver Max Cached Entries
Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations caused by the generic resolver cache.
- cache_metrics_interval: int
(Optional) Cache metrics won’t be updated immediately. Instead updating is skipped for a number of events before it’s next update.
cache_metrics_intervalsets the number of events between updates (default: 1).
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The generic resolver requires the additional field generic_resolver.
Configurable fields are being checked by regex patterns and a configurable value will be added
if a pattern matches.
The parameters within generic_resolver must be of the form
field_mapping: {SOURCE_FIELD: DESTINATION_FIELD},
resolve_list: {REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}.
SOURCE_FIELD will be checked by the regex patterns REGEX_PATTERN_[0-N] and
a new field DESTINATION_FIELD with the value ADDED_VALUE_[0-N] will be added if there is a match.
Adding the option "merge_with_target": True makes the generic resolver write resolved values
into a list so that multiple different values can be written into the same field.
In the following example to_resolve will be checked by the regex pattern .*Hello.*.
"resolved": "Greeting" will be added to the event if the pattern matches
the value in to_resolve.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to_resolve: resolved
5 resolve_list:
6 .*Hello.*: Greeting
For YAML compliance, it is possible to declare the resolve list as follows to maintain ordering when using the configuration file with different programs. Both styles will be supported in future; however, this one is recommended for clarity and YAML compliance.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to_resolve: resolved
5 resolve_list:
6 - .*Hello.*: Greeting
7 - .*error.*: Error
8 - never_match: Panic
Alternatively, a YML file with a resolve list and a regex pattern can be used to resolve values.
For this, a field resolve_from_file with the subfields path and pattern
must be added.
The resolve list in the file at path is then used in conjunction with the regex pattern
in pattern.
pattern must be a regex pattern with a capture group that is named mapping.
The resolver will check for the pattern and get value captured by the mapping group.
This captured value is then used in the list from the file.
ignore_case can be set to ignore the case when matching values that will be resolved.
It is disabled by default. In the following example to_resolve: heLLo would be resolved,
since ignore_case is set to true.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to_resolve: resolved
5 resolve_list:
6 .*Hello.*: Greeting
7 ignore_case: true
It is furthermore possible to resolve into dictionaries. In the following example
{"to_resolve": "Hello!"} would be resolved to {"resolved": {"Greeting": "Hello"}}.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to_resolve: resolved
5 resolve_list:
6 .*Hello.*: {"Greeting": "Hello"}
Resolved dictionaries can be merged into existing dictionaries. In the following example
{"to": {"resolve": "Hello!"}} would be resolved to
{"to": {"Greeting": "Hello", "resolve": "Hello!"}}.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to.resolve: to
5 resolve_list:
6 .*Hello.*: {"Greeting": "Hello"}
In the following example to_resolve will be checked by the
regex pattern \d*(?P<mapping>[a-z]+)\d* and the list in
path/to/resolve_mapping.yml will be used to add new fields.
"resolved": "resolved foo" will be added to the event if the value
in to_resolve begins with number, ends with numbers and contains foo.
Furthermore, "resolved": "resolved bar" will be added to the event
if the value in to_resolve begins with number, ends with numbers and contains bar.
1filter: to_resolve
2generic_resolver:
3 field_mapping:
4 to_resolve: resolved
5 resolve_from_file:
6 path: path/to/resolve_mapping.yml
7 pattern: \d*(?P<mapping>[a-z]+)\d*
1foo: resolved foo
2bar: resolved bar
- class logprep.processor.generic_resolver.rule.GenericResolverRule.Config
RuleConfig for GenericResolver
- field_mapping: dict[str, str]
Mapping in form of
{SOURCE_FIELD: DESTINATION_FIELD}
- resolve_list: dict[str, dict[str, FieldValue] | list[FieldValue] | str | int | float | bool | None]
lookup mapping in form of
{REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}
- resolve_from_file: dict[Literal['path', 'pattern'], str]
Mapping with a path key to a YML file (for string format see Getters) with a resolve list and a pattern key with a regex pattern which can be used to resolve values. The resolve list in the file at
pathis then used in conjunction with the regex pattern inpattern.Security Best Practice - Processor - Generic Resolver Resolve From File Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - Generic Resolver Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- ignore_case: bool
(Optional) Ignore case when matching resolve values. Defaults to
False.
- additions: dict[str, dict[str, FieldValue] | list[FieldValue] | str | int | float | bool | None]
Contains a dictionary of field names and values that should be added.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
GeoipEnricher
Processor to enrich log messages with geolocalization information
Processor Configuration
1- geoipenrichername:
2 type: geoip_enricher
3 rules:
4 - tests/testdata/geoip_enricher/rules
5 db_path: /path/to/GeoLite2-City.mmdb
- class logprep.processor.geoip_enricher.processor.GeoipEnricher.Config
geoip_enricher config
- db_path: str
- Path to a Geo2Lite city database by Maxmind in binary format.
This must be provided separately. The file will be downloaded or copied and cached. For valid URI formats see Getters This product includes GeoLite2 data created by MaxMind, available from https://www.maxmind.com.
Security Best Practice - Processor - GeoIP Enricher Database Memory Consumption
Be aware that all values of the remote file were loaded into memory. Avoid loading a large database via http to avoid exceeding http body limits.
Security Best Practice - Processor - GeoIP Enricher Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded database.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The geoip enricher requires the additional field geoip.
The default output_field can be overridden using the optional parameter
target_field. This can be a dotted subfield.
The additional field geoip.source_fields must be given as list with one element.
It contains the IP for which the geoip data should be added.
In the following example the IP in client.ip will be enriched with geoip data.
1filter: client.ip
2geoip:
3 source_fields: [client.ip]
4description: '...'
- class logprep.processor.geoip_enricher.rule.GeoipEnricherRule.Config
RuleConfig for GeoipEnricher
- customize_target_subfields: dict
(Optional) Rewrites the default output subfield locations to custom output subfield locations. Must be in the form of key value mapping pairs (e.g.
default_output: custom_output). Following default outputs can be customized:typegeometry.typegeometry.coordinatesproperties.accuracy_radiusproperties.continentproperties.continent_codeproperties.countryproperties.country_iso_codeproperties.time_zoneproperties.cityproperties.postal_codeproperties.subdivision
A concrete example would look like this:
1filter: client.ip 2geoip: 3 source_fields: [client.ip] 4 customize_target_subfields: 5 geometry.type: client.geo.type 6 geometry.coordinates: client.geo.coordinates 7description: '...'
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
Field to get geoip information for.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
Field for the output information. Defaults to
geoip.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Grokker
The grokker processor dissects a message on a basis of grok patterns. This processor is based of the ideas of the logstash grok filter plugin. (see: https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html)
The default builtin grok patterns shipped with logprep are the same than in logstash.
Processor Configuration
1- my_grokker:
2 type: grokker
3 rules:
4 - tests/testdata/rules/rules
5 custom_patterns_dir: "http://the.patterns.us/patterns.zip"
- class logprep.processor.grokker.processor.Grokker.Config
Config of Grokker
- custom_patterns_dir: str
(Optional) A directory or URI to load patterns from. All files in all subdirectories will be loaded recursively. If an uri is given, the target file has to be a zip file with a directory structure in it.
Security Best Practice - Processor - Grokker Custom Patterns Directory Memory Consumption
Be aware that all values of the remote zip were loaded into memory. Reserve memory for this and avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - Grokker Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: message
2grokker:
3 mapping:
4 message: "%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}"
5description: 'an example log message'
1{"message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log"}
1{
2 "message": "2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log",
3 "@timestamp": "2020-07-16T19:20:30.45+01:00",
4 "logLevel": "DEBUG",
5 "logMessage": "This is a sample log"
6}
- class logprep.processor.grokker.rule.GrokkerRule.Config
Config for GrokkerRule
- patterns: dict
(Optional) additional grok patterns as mapping. E.g.
CUSTOM_PATTERN: [^s]*if you want to use special target fields, you are able to use them an usual in the mapping sections. Here you only have to declare the matching regex without named groups.
- convert_datatype: dict
A mapping from source field and desired datatype [optional]. The datatypes could be
float,int,bool,string
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A mapping from source fields to a grok pattern. Dotted field notation is possible in key and in the grok pattern. Additionally logstash field notation is possible in grok pattern. The value can be a list of search patterns or a single search pattern. Lists of search pattern will be checked in the order of the list until the first matching pattern. It is possible to use oniguruma regex pattern with or without grok patterns in the patterns part. When defining an oniguruma there is a limitation of three nested parentheses inside the pattern. Applying more nested parentheses is not possible. Logstashs ecs conform grok patterns are used to resolve the here used grok patterns. When writing patterns it is advised to be careful as the underlying regex can become complex fast. If the execution and the resolving of the pattern takes more than one second a matching timeout will be raised.
Security Best Practice - Processor - Grokker DOS (Denial of Service) via Backreferences
Avoid using backreferences in grok patterns, as they can lead to excessive memory consumption and potential denial of service attacks.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for grokker:
matches simple grok pattern:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:userfield}'}}}message:
{'message': 'this is the MyUser586'}processed:
{'message': 'this is the MyUser586', 'userfield': 'MyUser586'}
matches simple grok pattern with dotted field target:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:user.subfield}'}}}message:
{'message': 'this is the MyUser586'}processed:
{'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}
matches simple grok pattern with logstash field target:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{USER:[user][subfield]}'}}}message:
{'message': 'this is the MyUser586'}processed:
{'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}
matches custom patterns:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'this is the %{CUSTOM_PATTERN:user.subfield}'}, 'patterns': {'CUSTOM_PATTERN': '[^\s]*'}}}message:
{'message': 'this is the MyUser586'}processed:
{'message': 'this is the MyUser586', 'user': {'subfield': 'MyUser586'}}
normalize from grok:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:some_ip} %{NUMBER:port:int}'}}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}, 'some_ip': '123.123.123.123', 'port': 1234}
grok list match first matching after skipping non matching:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip_1} %{NUMBER:port_1:int} foo', '%{IP:some_ip_2} %{NUMBER:port_2:int} bar']}}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip_2': '123.123.123.123', 'port_2': 1234}
grok list match first matching after skipping non matching and does not match twice:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip_1} %{NUMBER:port_1:int} foo', '%{IP:some_ip_2} %{NUMBER:port_2:int} bar', '%{IP:some_ip_3} %{NUMBER:port_3:int} bar']}}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip_2': '123.123.123.123', 'port_2': 1234}
grok list match first matching after skipping non matching with same target fields:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': ['%{IP:some_ip} %{NUMBER:port:int} foo', '%{IP:some_ip} %{NUMBER:port:int} bar']}}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234 bar'}}, 'some_ip': '123.123.123.123', 'port': 1234}
normalization from nested grok:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:[parent][some_ip]} \w+ %{NUMBER:[parent][port]:int} %[ts]+ %{NUMBER:test:int}'}}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}, 'test': 11, 'parent': {'some_ip': '123.123.123.123', 'port': 1234}}
normalization from escaped & nested grok:
rule:
{'filter': 'win\.log.event\._id: 123456789', 'grokker': {'mapping': {'win\.log.event_data.normalize me!': '%{IP:[par\\ent][...]} \w+ %{NUMBER:[par\\ent][\\port\\]:int} %[ts]+ %{NUMBER:te\\.st\\:int}'}}}message:
{'win.log': {'api': 'wineventlog', 'event._id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}}processed:
{'win.log': {'api': 'wineventlog', 'event._id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 555 1234 %ttss 11'}}, 'te.st\': 11, 'par\ent': {'...': '123.123.123.123', '\port\': 1234}}
example log message:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': '%{TIMESTAMP_ISO8601:@timestamp} %{LOGLEVEL:logLevel} %{GREEDYDATA:logMessage}'}}}message:
{'message': '2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log'}processed:
{'message': '2020-07-16T19:20:30.45+01:00 DEBUG This is a sample log', '@timestamp': '2020-07-16T19:20:30.45+01:00', 'logLevel': 'DEBUG', 'logMessage': 'This is a sample log'}
example for ecs conform output:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': '%{COMBINEDAPACHELOG}'}}}message:
{'message': '127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"'}processed:
{'message': '127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"', 'source': {'address': '127.0.0.1'}, 'timestamp': '11/Dec/2013:00:01:45 -0800', 'http': {'request': {'method': 'GET', 'referrer': 'http://cadenza/xampp/navi.php'}, 'version': '1.1', 'response': {'status_code': 200, 'body': {'bytes': 3891}}}, 'url': {'original': '/xampp/status.php'}, 'user_agent': {'original': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0'}}
matches simple oniguruma pattern:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'this is the (?<userfield>[A-Za-z0-9]+)'}}}message:
{'message': 'this is the MyUser586'}processed:
{'message': 'this is the MyUser586', 'userfield': 'MyUser586'}
oniguruma with nested parentheses (3 levels supported):
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': '^(?<timestamp>%{DAY}%{SPACE}%{MONTH}%{SPACE}%{MONTHDAY}%{SPACE}%{TIME}%{SPACE}%{YEAR})%{SPACE}%{GREEDYDATA:[remains]}$', 'remains': '(?<action>(SEND%{SPACE}INFO)%{SPACE}(?<info>BAL)%{GREEDYDATA:rest}'}}}message:
{'message': 'Wed Dec 7 13:14:13 2005 SEND INFO BAL/4'}processed:
{'message': 'Wed Dec 7 13:14:13 2005 SEND INFO BAL/4', 'timestamp': 'Wed Dec 7 13:14:13 2005', 'action': 'SEND INFO', 'info': 'BAL', 'rest': '/4', 'remains': 'SEND INFO BAL/4'}
two oniguruma with same target names, applies only the last target:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': '^(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})%{SPACE}(?<action>%{NUMBER})$'}}}message:
{'message': '13 37 21 42'}processed:
{'message': '13 37 21 42', 'action': '42'}
ignore_missing_fields:
rule:
{'filter': 'winlog.event_id: 123456789', 'grokker': {'mapping': {'winlog.event_data.normalize me!': '%{IP:some_ip} %{NUMBER:port:int}', 'this_field_does_not_exist': '%{IP:some_ip} %{NUMBER:port:int}'}, 'ignore_missing_fields': True}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}}processed:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'normalize me!': '123.123.123.123 1234'}}, 'some_ip': '123.123.123.123', 'port': 1234}
Subfield with common prefix:
rule:
{'filter': 'message', 'grokker': {'mapping': {'message': 'Facility %{USER:facility.location} %{USER:facility.location_level}'}}}message:
{'message': 'Facility spain primary'}processed:
{'message': 'Facility spain primary', 'facility': {'location': 'spain', 'location_level': 'primary'}}
IpInformer
The ip_informer processor enriches an event with ip information.
Processor Configuration
1- myipinformer:
2 type: ip_informer
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.ip_informer.processor.IpInformer.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: message
2ip_informer:
3 source_fields: ["ip"]
4 target_field: result
5description: '...'
1{"ip": "192.168.5.1"}
1{
2 "ip": "192.168.5.1",
3 "result": {
4 "192.168.5.1": {
5 "compressed": "192.168.5.1",
6 "exploded": "192.168.5.1",
7 "is_global": false,
8 "is_link_local": false,
9 "is_loopback": false,
10 "is_multicast": false,
11 "is_private": true,
12 "is_reserved": false,
13 "is_unspecified": false,
14 "max_prefixlen": 32,
15 "reverse_pointer": "1.5.168.192.in-addr.arpa",
16 "version": 4
17 }
18 }
19}
- class logprep.processor.ip_informer.rule.IpInformerRule.Config
Config for IPInformer
- properties: list
(Optional) configures the properties to extract. Default is to extract all properties. Possible properties are:
['compressed', 'exploded', 'is_global', 'is_link_local', 'is_loopback', 'is_multicast', 'is_private', 'is_reserved', 'is_unspecified', 'max_prefixlen', 'reverse_pointer', 'version', 'compressed', 'exploded', 'ipv4_mapped', 'is_global', 'is_link_local', 'is_loopback', 'is_multicast', 'is_private', 'is_reserved', 'is_site_local', 'is_unspecified', 'max_prefixlen', 'reverse_pointer', 'scope_id', 'sixtofour', 'teredo', 'version'].Default is to extract all available properties. If you explicitly want to extract a property, which does not exist for an IPAddress (e.g. toredo which is only given for IPv4Addresses), the property will be extracted with the value
False.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for ip_informer:
single field with ipv4 address:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}message:
{'ip': '192.168.5.1'}processed:
{'ip': '192.168.5.1', 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}}}
single field with ipv6 address:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}message:
{'ip': 'fe80::2c71:58ff:fe6a:5a08'}processed:
{'ip': 'fe80::2c71:58ff:fe6a:5a08', 'result': {'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}}}
list field with ipv4 and ipv6 addresses:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result'}}message:
{'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08']}processed:
{'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}, 'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}}}
list and single field with ipv4 and ipv6 addresses:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip', 'single'], 'target_field': 'result'}}message:
{'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'single': '127.0.0.1'}processed:
{'ip': ['192.168.5.1', 'fe80::2c71:58ff:fe6a:5a08'], 'single': '127.0.0.1', 'result': {'192.168.5.1': {'compressed': '192.168.5.1', 'exploded': '192.168.5.1', 'is_global': False, 'is_link_local': False, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.5.168.192.in-addr.arpa', 'version': 4}, 'fe80::2c71:58ff:fe6a:5a08': {'compressed': 'fe80::2c71:58ff:fe6a:5a08', 'exploded': 'fe80:0000:0000:0000:2c71:58ff:fe6a:5a08', 'ipv4_mapped': None, 'is_global': False, 'is_link_local': True, 'is_loopback': False, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_site_local': False, 'is_unspecified': False, 'max_prefixlen': 128, 'reverse_pointer': '8.0.a.5.a.6.e.f.f.f.8.5.1.7.c.2.0.0.0.0.0.0.0.0.0.0.0.0.0.8.e.f.ip6.arpa', 'scope_id': None, 'sixtofour': None, 'teredo': None, 'version': 6}, '127.0.0.1': {'compressed': '127.0.0.1', 'exploded': '127.0.0.1', 'is_global': False, 'is_link_local': False, 'is_loopback': True, 'is_multicast': False, 'is_private': True, 'is_reserved': False, 'is_unspecified': False, 'max_prefixlen': 32, 'reverse_pointer': '1.0.0.127.in-addr.arpa', 'version': 4}}}
single field with ipv4 address and filtered properties:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result', 'properties': ['is_loopback']}}message:
{'ip': '192.168.5.1'}processed:
{'ip': '192.168.5.1', 'result': {'192.168.5.1': {'is_loopback': False}}}
get field value for non existent property:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip'], 'target_field': 'result', 'properties': ['teredo']}}message:
{'ip': '192.168.5.1'}processed:
{'ip': '192.168.5.1', 'result': {'192.168.5.1': {'teredo': False}}}
ignore missing fields:
rule:
{'filter': 'ip', 'ip_informer': {'source_fields': ['ip', 'does_not_exist'], 'target_field': 'result', 'properties': ['teredo'], 'ignore_missing_fields': True}}message:
{'ip': '192.168.5.1'}processed:
{'ip': '192.168.5.1', 'result': {'192.168.5.1': {'teredo': False}}}
KeyChecker
The key_checker processor checks if all field names in a provided list are given in the processed event.
Processor Configuration
1- keycheckername:
2 type: key_checker
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.key_checker.processor.KeyChecker.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The key_checker processor needs a list with at least one element in it. The Rule contains this list and it also contains a custom field where the processor can store all missing keys.
1filter: testkey
2key_checker:
3 source_fields:
4 - key1
5 - key2
6 target_field: "missing_fields"
7description: '...'
1{
2 "testkey": "key1_value",
3 "_index": "value"
4}
1{
2 "testkey": "key1_value",
3 "_index": "value",
4 "missing_fields": "key1","key2"
5}
- class logprep.processor.key_checker.rule.KeyCheckerRule.Config
key_checker rule config
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: set
List of fields to check for.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Labeler
Processor Configuration
1- labelername:
2 type: labeler
3 schema: tests/testdata/labeler_rules/labeling/schema.json
4 include_parent_labels: true
5 rules:
6 - tests/testdata/labeler_rules/rules
- class logprep.processor.labeler.processor.Labeler.Config
Labeler Configurations
- schema: str
Path to a labeling schema file. For string format see Getters.
Security Best Practice - Processor - Labeler Schema File Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - Labeler Schema File Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- include_parent_labels: bool | None
If the option is deactivated only labels defined in a rule will be activated. Otherwise, also allowed labels in the path to the root of the corresponding category of a label will be added. This allows to search for higher level labels if this option was activated in the rule.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The labeler requires the additional field label.
The keys under label define the categories under which a label should be added.
The values are a list of labels that should be added under a category.
In the following example, the label execute will be added
to the labels of the category action:
1filter: 'command: "executing something"'
2labeler:
3 label:
4 action:
5 - execute
6description: '...'
- class logprep.processor.labeler.rule.LabelerRule.Config
RuleConfig for Labeler
- label: dict
Mapping of a category and a list of labels to add
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
ListComparison
The list_comparison processor allows to compare values of source fields against lists provided as files.
Processor Configuration
1- listcomparisonname:
2 type: list_comparison
3 rules:
4 - tests/testdata/rules/rules
5 list_search_base_path: /path/to/list/dir
- class logprep.processor.list_comparison.processor.ListComparison.Config
ListComparison config
- list_search_base_path: str
Relative list paths in rules will be relative to this path if this is set. This parameter is optional. For string format see Getters. You can also pass a template with keys from environment, e.g.,
${<your environment variable>}. The special key${LOGPREP_LIST}will be filled by this processor.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The list comparison enricher requires the additional field list_comparison.
The mandatory keys under list_comparison are source_fields
(as list with one element) and target_field. Former
is used to identify the field which is to be checked against the provided lists.
And the latter is used to define the parent field where the results should
be written to. Both fields can be dotted subfields.
Additionally, a list or array of lists can be provided underneath the
required field list_file_paths.
In the following example, the field user_agent will be checked against the provided list
(priviliged_users.txt).
Assuming that the value non_privileged_user will match the provided list,
the result of the list comparison (in_list) will be added to the
target field List_comparison.example.
1filter: 'user_agent'
2list_comparison:
3 source_fields: ['user_agent']
4 target_field: 'List_comparison.example'
5 list_file_paths:
6 - lists/privileged_users.txt
7description: '...'
Note
Currently, it is not possible to check in more than one source_field per rule.
- class logprep.processor.list_comparison.rule.ListComparisonRule.Config
RuleConfig for ListComparisonRule
- list_file_paths: List[str]
List of files. For string format see Getters.
Security Best Practice - Processor - List Comparison list file paths Memory Consumption
Be aware that all values of the remote files were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - List Comparison list file paths Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- list_search_base_path: str
Base Path from where to find relative files from
list_file_paths. You can also pass a template with keys from environment, e.g.,${<your environment variable>}. The special key${LOGPREP_LIST}will be filled by this processor.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
PreDetector
The pre_detector is a processor that creates alerts for matching events. It adds MITRE ATT&CK data to the alerts.
Processor Configuration
1- predetectorname:
2 type: pre_detector
3 rules:
4 - tests/testdata/rules/rules
5 outputs:
6 - kafka: sre_topic
7 alert_ip_list_path: /tmp/ip_list.yml
- class logprep.processor.pre_detector.processor.PreDetector.Config
PreDetector config
- outputs: tuple[dict[str, str]]
list of output mappings in form of
output_name:topic. Only one mapping is allowed per list element
- alert_ip_list_path: str | None
Path to a YML file or a list of paths to YML files with dictionaries of IPs. For string format see Getters. It is used by the PreDetector to throw alerts if one of the IPs is found in fields that were defined in a rule.
It uses IPs or networks in the CIDR format as keys and can contain expiration dates in the ISO format as values. If a value is empty, then there is no expiration date for the IP check. If a checked IP is covered by an IP and a network in the dictionary (i.e. IP 127.0.0.1 and network 127.0.0.0/24 when checking 127.0.0.1), then the expiration date of the IP is being used.
Security Best Practice - Processor - PreDetector alert_ip_list_path Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - PreDetector alert_ip_list_path Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The predetector requires the additional field pre_detector.
The rule fields and a pre_detector_id are written into a custom output of the current output connector. The pre_detector_id will be furthermore added to the triggering event so that an event can be linked with its detection.
The following example shows a complete rule:
1filter: 'some_field: "very malicious!"'
2pre_detector:
3 case_condition: directly
4 id: RULE_ONE_ID
5 mitre:
6 - attack.something1
7 - attack.something2
8 severity: critical
9 title: Rule one
10description: Some malicious event.
Applying this rule to the event
1{
2 "some_field": "very malicious!",
3}
would result in the following output and event enrichment
1{
2 "some_field": "very malicious!",
3 "pre_detection_id": "80bfea3f-c24e-41d0-b82d-b2f02fc03ba9"
4}
1{
2 "@timestamp": "2023-06-16T08:23:41.000Z",
3 "id": "RULE_ONE_ID",
4 "title": "Rule one",
5 "mitre": ["attack.something1", "attack.something2"],
6 "case_condition": "directly",
7 "rule_filter": "(some_field: 'very malicious!')",
8 "severity": "critical",
9 "pre_detection_id": "80bfea3f-c24e-41d0-b82d-b2f02fc03ba9",
10 "description": "Some malicious event."
11}
This generated extra output contains a corresponding rule_filter in lucene notation, which
can be used to further investigate this rule in an existing OpenSearch.
Additionally, the optional field ip_fields can be specified.
It allows to specify a list of fields that can be compared to a list of IPs,
which can be configured in the pipeline for the predetector.
If this field was specified, then the rule will only trigger in case one of
the IPs from the list is also available in the specified fields.
1filter: 'some_field: something AND some_ip_field'
2pre_detector:
3 id: RULE_ONE_ID
4 title: Rule one
5 severity: critical
6 mitre:
7 - some_tag
8 case_condition: directly
9description: Some malicous event.
10ip_fields:
11- some_ip_field
The pre_detector also has the option to normalize the timestamp. To configure this the following parameters can be set in the rule configuration.
1filter: 'some_field: "very malicious!"'
2pre_detector:
3 case_condition: directly
4 id: RULE_ONE_ID
5 mitre:
6 - attack.something1
7 - attack.something2
8 severity: critical
9 title: Rule one
10 timestamp_field: <field which includes the timestamp to be normalized>
11 source_format: <the format of the timestamp in strftime format or ISO8601 or UNIX>
12 sorce_timezone: <the timezone of the timestamp>
13 target_timezone: <the timezone after normalization>
14description: Some malicious event.
All of these new parameters are configurable and default to standard values if not explicitly set.
- class logprep.processor.pre_detector.rule.PreDetectorRule.Config
RuleConfig for Predetector
- title: str
A description for the triggered rule.
- severity: str
Rating how dangerous an Event is, i.e. critical.
- mitre: list
A list of MITRE ATT&CK tags.
- case_condition: str
The type of the triggered rule, mostly directly.
- ip_fields: list
Specify a list of fields that can be compared to a list of IPs, which can be configured in the pipeline for the pre_detector. If this field was specified, then the rule will only trigger in case one of the IPs from the list is also available in the specified fields.
- sigma_fields: list | bool
tbd
- link: str | None
A link to the rule if applicable.
- source_format: str
the source format that can be given for normalizing the timestamp defaults to
ISO8601
- timestamp_field: str
the field which has the given timestamp to be normalized defaults to
@timestamp
- source_timezone: ZoneInfo
timezone of source_fields defaults to
UTC
- target_timezone: ZoneInfo
timezone for target_field defaults to
UTC
- failure_tags: list
tags to be added if processing of the rule fails
- copy_fields_to_detection_event: set[str]
Field (names) from the triggering event to be added to the detection events. Defaults to [“host.name”] for downwards compatibility reasons. Collissions with field names which are already written by this processor are not allowed and will be rejected with a configuration validation error.
- description: str
A description for the Rule. This has only documentation character.
- id: str | int
An ID for the triggered rule.
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Pseudonymizer
The pseudonymizer is a processor that pseudonymizes certain fields of log messages to ensure
privacy regulations can be adhered to.
Processor Configuration
1- pseudonymizername:
2 type: pseudonymizer
3 rules:
4 - tests/testdata/rules/rules
5 outputs:
6 - kafka: pseudonyms_topic
7 pubkey_analyst: /path/to/analyst_pubkey.pem
8 pubkey_depseudo: /path/to/depseudo_pubkey.pem
9 hash_salt: secret_salt
10 regex_mapping: /path/to/regex_mapping.json
11 max_cached_pseudonyms: 1000000
12 mode: GCM
- class logprep.processor.pseudonymizer.processor.Pseudonymizer.Config
Pseudonymizer config
- outputs: tuple[dict[str, str]]
list of output mappings in form of
output_name:topic. Only one mapping is allowed per list element
- pubkey_analyst: str
Path to the public key of an analyst. For string format see Getters.
Security Best Practice - Processor - Pseudonymizer pubkey analyst Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- pubkey_depseudo: str
Path to the public key for depseudonymization. For string format see Getters.
Security Best Practice - Processor - Pseudonymizer pubkey depseudo Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- hash_salt: str
A salt that is used for hashing.
- regex_mapping: str
Path to a file (for string format see Getters) with a regex mapping for pseudonymization, i.e.:
Security Best Practice - Processor - Pseudonymizer regex mapping Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - Pseudonymizer regex mapping Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- max_cached_pseudonyms: int
The maximum number of cached pseudonyms. One cache entry requires ~250 Byte, thus 10 million elements would require about 2.3 GB RAM. The cache is not persisted. Restarting Logprep does therefore clear the cache. This caching reduces the CPU load of Logprep (no demanding encryption must be performed repeatedly) and the load on subsequent components (i.e. Logstash or Opensearch). In case the cache size has been exceeded, the least recently used entry is deleted. Has to be greater than 0.
Security Best Practice - Processor - Pseudonymizer max_cached_pseudonyms
Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.
- max_cached_pseudonymized_urls: int
The maximum number of cached pseudonymized urls. Default is 10000. Behaves similarly to the max_cached_pseudonyms. Has to be greater than 0.
Security Best Practice - Processor - Pseudonymizer max_cached_pseudonymized_urls
Ensure to set this to a reasonable value to avoid excessive memory usage and OOM situations by the domain resolver cache.
- mode: str
Optional mode of operation for the encryption. Can be either ‘GCM’ or ‘CTR’. Default is ‘GCM’.
Security Best Practice - Processor - Pseudonymizer
The
pseudonymizerworks with two public keys for different roles. It is suggested to ensure that two different keys are being used such that the separation of the roles can be maintained. It is suggested to use theGCMmode for encryption as it decouples the key length of the depseudo and analyst keys. This leads to additional 152 bytes of overhead for the encryption compared to theCTRmode encrypter.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The pseudonymizer requires the additional field pseudonymizer.mapping.
It contains key value pairs that define what will be pseudonymized.
They key represents the field that will be pseudonymized and the value contains a regex keyword. The regex keyword defines which parts of the value are being pseudonymized. Only the regex matches are being pseudonymized that are also in a capture group. An arbitrary amount of capture groups can be used. The definitions of regex keywords are located in a separate file.
In the following the field event_data.param1 is being completely pseudonymized.
This is achieved by using the predefined keyword RE_WHOLE_FIELD,
which will be resolved to a regex expression.
RE_WHOLE_FIELD resolves to (.*) which puts the whole match
in a capture group and therefore pseudonymizes it completely.
1filter: 'event_id: 1 AND source_name: "Test"'
2pseudonymizer:
3 mapping:
4 event_data.param1: RE_WHOLE_FIELD
5description: '...'
1{
2 "RE_WHOLE_FIELD": "(.*)",
3 "RE_DOMAIN_BACKSLASH_USERNAME": "\w+\\(.*)",
4 "RE_IP4_COLON_PORT": "([\d.]+):\d+"
5}
- class logprep.processor.pseudonymizer.rule.PseudonymizerRule.Config
RuleConfig for Pseudonymizer
- url_fields: list
url fields to pseudonymize
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
mapping of field to regex string
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Replacer
The replacer is a processor that replaces parts of a string with strings defined in rules.
Processor Configuration
1- samplename:
2 type: replacer
3 rules:
4 - tests/testdata/rules/
- class logprep.processor.replacer.processor.Replacer.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: message
2replacer:
3 mapping:
4 message: "Message %{*} was created by user %{USER_ID}."
5description: '...'
1{"message": "Message 123 was created by user 456."}
1{ "message": "Message 123 was created by user USER_ID." }
- class logprep.processor.replacer.rule.ReplacerRule.Config
Config for ReplacerRule
- extend_target_list: bool
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A mapping of fieldnames to patterns to replace
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
True
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Defaults to the source fields in mapping.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Replacement Pattern Language
The replacement pattern describes the textual format of the source field.
Given a replacement pattern of %{replaced_1}and%{replaced_2} the source field value
will be replaced with a value where everything before and will be replaced with
replaced_1 and everything after and will be replaced with replaced_2.
The string between %{ and } is the desired replacement value.
Additionally, there exists a colon notation MATCH_VALUE:REPLACE_VALUE to achieve more
specific results.
MATCH_VALUE is a specific value that will be replaced and REPLACE_VALUE is the value
it will be replaced with.
The value : needs to be escaped with :code:`` if it is to be used as a character.
The escaping works like sigma wildcard escaping.
foo {VAL1:VAL2} bar results in foo VAL1:VAL2 bar for input foo SOME bar.
foo {VAL1\:VAL2} bar results in foo VAL2 bar only for input foo VAL1bar.
{REPLACE_VALUE} is a shorthand for {*:REPLACE_VALUE}.
{*} is a shorthand for {*:*}.
The special replacement value %{*} acts as a wildcard that allows to match a string without
replacing it.
If you want to use the symbol * as a replacement value, you have to escape it with
\ (e.g. %{\*}).
* does not have to be escaped if it occurs in combination with other values
(e.g. %{**} or %{foo*}).
The exception is if a single wildcard gets escaped multiple times
(e.g. %{\\*} becomes \* and %{\\\*} becomes \\*).
The character %{|g} at the end of a replacement means that values will be matched greedily.
Given an input 1.2.3.4. and a pattern %{IP}. it would replace to IP.2.3.4..
Given an input 1.2.3.4. and a pattern %{IP|g}. it would replace to IP..
An empty replacement value %{} will remove the matching parts from the new value.
- class logprep.processor.dissector.rule.DissectorRule.Config
Config for Dissector
- convert_datatype: dict
A mapping from source field and desired datatype [optional]. The datatypes could be
float,int,bool,string
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A mapping from source fields to a dissect pattern [optional]. Dotted field notation is possible in key and in the dissect pattern.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for replacer:
replace the beginning:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{X} login attempts.'}}}message:
{'field': '123 login attempts.'}processed:
{'field': 'X login attempts.'}
replace with a different target field:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{X} login attempts.'}, 'target_field': 'new_target'}}message:
{'field': '123 login attempts.'}processed:
{'field': '123 login attempts.', 'new_target': 'X login attempts.'}
replace with dotted field:
rule:
{'filter': 'some.field', 'replacer': {'mapping': {'some.field': '%{X} login attempts.'}}}message:
{'some': {'field': '123 login attempts.'}}processed:
{'some': {'field': 'X login attempts.'}}
replace with colon notation:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{*:X} login attempts.'}}}message:
{'field': '123 login attempts.'}processed:
{'field': 'X login attempts.'}
replace wildcard with colon notation:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{*:*} login attempts.'}}}message:
{'field': '123 login attempts.'}processed:
{'field': '123 login attempts.'}
replace specific with colon notation matches:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts.'}}}message:
{'field': '123 login attempts.'}processed:
{'field': 'X login attempts.'}
replace specific with colon notation at beginning does not match:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts by %{USER_ID}.'}}}message:
{'field': '456 login attempts by 789.'}processed:
{'field': '456 login attempts by 789.'}
replace specific with colon notation at beginning matches:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts by %{USER_ID}.'}}}message:
{'field': '123 login attempts by 789.'}processed:
{'field': 'X login attempts by USER_ID.'}
replace specific with colon notation at middle does not match:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} performed %{789:X} login attempts.'}}}message:
{'field': 'User 123 performed 456 login attempts.'}processed:
{'field': 'User 123 performed 456 login attempts.'}
replace specific with colon notation at middle matches:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} performed %{456:X} login attempts.'}}}message:
{'field': 'User 123 performed 456 login attempts.'}processed:
{'field': 'User USER_ID performed X login attempts.'}
replace specific with colon notation at end does not match:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} login count: %{789:X}'}}}message:
{'field': 'User 123 login count: 456'}processed:
{'field': 'User 123 login count: 456'}
replace specific with colon notation at end matches:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} login count: %{456:X}'}}}message:
{'field': 'User 123 login count: 456'}processed:
{'field': 'User USER_ID login count: X'}
replace specific with colon notation matches combined without colon notation:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{123:X} login attempts within %{Y} minutes.'}}}message:
{'field': '123 login attempts within 456 minutes.'}processed:
{'field': 'X login attempts within Y minutes.'}
replace specific with colon notation matches combined without colon notation:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '/%{*}/foo/%{_:}%{ID}/%{*}'}}}message:
{'field': '/some/path/foo/_123/bar'}processed:
{'field': '/some/path/foo/ID/bar'}
replace specific with colon notation starting with wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{*}/%{_:}%{ID}/%{*}'}}}message:
{'field': '/some/path/foo/_123/bar'}processed:
{'field': '/some/path/foo/ID/bar'}
replace specific with colon notation without wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '/some/path/%{_:}%{ID}'}}}message:
{'field': '/some/path/_123'}processed:
{'field': '/some/path/ID'}
replace the middle:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Attempted to login %{X} times.'}}}message:
{'field': 'Attempted to login 123 times.'}processed:
{'field': 'Attempted to login X times.'}
replace the end:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Delete user %{USER_ID}'}}}message:
{'field': 'Delete user 123'}processed:
{'field': 'Delete user USER_ID'}
replace beginning and the middle:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{A user} tried to call /users/%{USER_ID}/delete'}}}message:
{'field': 'User 123 tried to call /users/456/delete'}processed:
{'field': 'A user tried to call /users/USER_ID/delete'}
replace twice in middle:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} tried %{ATTEMPTS} times to log in.'}}}message:
{'field': 'User 123 tried 456 times to log in.'}processed:
{'field': 'User USER_ID tried ATTEMPTS times to log in.'}
replace the middle and the end:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Attempted to login %{ATTEMPTS} times to %{IP}'}}}message:
{'field': 'Attempted to login 123 times to 1.2.3.4'}processed:
{'field': 'Attempted to login ATTEMPTS times to IP'}
replace three times:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{USER_ID} tried to login %{ATTEMPTS} to %{IP}'}}}message:
{'field': 'User 123 tried to login 456 to 1.2.3.4'}processed:
{'field': 'User USER_ID tried to login ATTEMPTS to IP'}
replace with empty string:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{}login attempts%{}.'}}}message:
{'field': '123 login attempts by user 456.'}processed:
{'field': 'login attempts.'}
don’t replace greedily if part of variable string is contained in unchanging part:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Connected to %{IP|g}.'}}}message:
{'field': 'Connected to 1.2.3.4.'}processed:
{'field': 'Connected to IP.'}
twice don’t replace greedily if part of variable string is contained in unchanging part:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Disconnected from %{IP|g}. Connected to %{IP|g}.'}}}message:
{'field': 'Disconnected from 1.2.3.4. Connected to 1.2.3.4.'}processed:
{'field': 'Disconnected from IP. Connected to IP.'}
replace wildcard greedily:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Disconnected from %{IP|g}. Connected to %{*|g}.'}}}message:
{'field': 'Disconnected from 1.2.3.4. Connected to 1.2.3.4.'}processed:
{'field': 'Disconnected from IP. Connected to 1.2.3.4.'}
replace multiple fields:
rule:
{'filter': 'field_a AND field_b', 'replacer': {'mapping': {'field_a': 'do %{replace this}!', 'field_b': 'do also %{replace this}!'}}}message:
{'field_a': 'do something!', 'field_b': 'do also something!'}processed:
{'field_a': 'do replace this!', 'field_b': 'do also replace this!'}
replace by matching with wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has%{*}.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged in.'}
replace by matching only with wildcard does not change anything:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User has logged %{*}.'}}}message:
{'field': 'User has logged in.'}processed:
{'field': 'User has logged in.'}
replace by matching with wildcard at the end:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{*}.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged in.'}
replace by matching with wildcard at the beginning:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{*} with ID %{USER_ID} has logged in.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged in.'}
replace by matching with wildcard in the middle before other replacement:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User %{*} with ID %{USER_ID} has logged in.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged in.'}
replace by matching with multiple wildcards:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{*} with ID %{USER_ID} has %{*}in%{*}'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged in.'}
replace with star by escaping single wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{\*}.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged *.'}
replace with backslash and star by escaping single wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has%{\\*}'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has\*'}
replace with multiple backslashes and star by escaping single wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{\\\*}.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged \\*.'}
replacement of multiple stars does not require escaping wildcard:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'User with ID %{USER_ID} has logged %{**}.'}}}message:
{'field': 'User with ID 123 has logged in.'}processed:
{'field': 'User with ID USER_ID has logged **.'}
replacement without matching end fails:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'Call /some_path/user/%{USER_ID}/'}}}message:
{'field': 'Call /some_path/user/123/delete'}processed:
{'field': 'Call /some_path/user/123/delete'}
replacement without matching beginning fails:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'failed logins: %{COUNT}'}}}message:
{'field': 'logins: 123'}processed:
{'field': 'logins: 123'}
replacement without matching beginning and end fails:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': 'failed to login %{COUNT} times during the last hour'}}}message:
{'field': 'succeeded to login 123 times during the last minute'}processed:
{'field': 'succeeded to login 123 times during the last minute'}
replacement without matching middle fails:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{USER} created by %{USER}'}}}message:
{'field': '123 deleted by 456'}processed:
{'field': '123 deleted by 456'}
nested replacement ignores second start token and terminates with first end token:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{%{replace this} not}!'}}}message:
{'field': 'something not}!'}processed:
{'field': '%{replace this not}!'}
nested replacement ignores second start token and terminates with first end token:
rule:
{'filter': 'field', 'replacer': {'mapping': {'field': '%{do %{replace this}}!'}}}message:
{'field': 'do %{something not}!'}processed:
{'field': 'do %{replace this}!'}
Requester
A processor to invoke http requests. Can be used to enrich events from an external api or to trigger external systems by and with event field values.
Security Best Practice - Processor - Requester
As the requester can execute arbitrary http requests it is advised to execute requests only
against known and trusted endpoints and that the communication is protected with a valid
SSL-Certificate. Do so by setting a certificate path with the option cert.
To ensure that the communication is trusted it is also recommended to set either an
Authorization-Header or a corresponding authentication with a username and password, via
auth.
Processor Configuration
1- requestername:
2 type: requester
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.requester.processor.Requester.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The Requester is configured by the keyword requester.
It can be used to trigger external systems via web request or enrich eventdata by external
apis.
A speaking example for event enrichment via external api:
1filter: 'domain'
2requester:
3 url: https://internal.cmdb.local/api/v1/locations
4 method: POST
5 target_field: cmdb.location
6 headers:
7 Authorization: Bearer askdfjpiowejf283u9r
8 json:
9 hostname: ${message.hostname}
10description: '...'
1{"message": {"hostname": "BB37293hhj"}}
1{
2 "city": "Montreal",
3 "Building": "L76",
4 "Floor": 3,
5 "Room": 34
6}
1{"message": {"hostname": "BB37293hhj"},
2 "cmdb": {
3 "location": {
4 "city": "Montreal",
5 "Building": "L76",
6 "Floor": 3,
7 "Room": 34
8 }
9 }
10}
- class logprep.processor.requester.rule.RequesterRule.Config
Config for RequesterRule
- target_field_mapping: dict
(Optional) A mapping from dotted_fields to dotted_fields to extract data from response json to target fields. If target_field is given too, this is made additionally
- method: str
The method for the request. must be one of
GET,OPTIONS,HEAD,POST,PUT,PATCH,DELETE
- url: str
The url for the request. You can use dissect pattern language to add field values
- json: dict
(Optional) The json payload. Can be enriched with event data by using the pattern
${the.dotted.field}to retrieve nested field values.
- data: str
(Optional) The data payload. Can be enriched with event data by using the pattern
${the.dotted.field}to retrieve nested field values.
- params: dict
(Optional) The query parameters as dictionary. Can be enriched with event data by using the pattern
${the.dotted.field}to retrieve nested field values.
- headers: dict
(Optional) The http headers as dictionary.
- auth: tuple
(Optional) The authentication tuple. Defined as list. Will be converted to tuple
- timeout: float
(Optional) The timeout in seconds as float for the request. Defaults to 2 seconds
- verify: bool
(Optional) Whether or not verify the ssl context. Defaults to
True.
- proxies: dict
(Optional) Dictionary mapping protocol or protocol and host to the URL of the proxy (e.g.
{"http": "foo.bar:3128", "http://host.name": "foo.bar:4012"}) to be used on the request
- cert: str
(Optional) SSL client certificate as path to ssl client cert file (.pem).
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
(Optional) The target field to write the complete response json or body to
- tests: List[Dict[str, str]]
Custom tests for this rule.
SelectiveExtractor
The selective_extractor is a processor that allows to write field values of a given log message to a different Kafka topic. The output topic is configured via the pipeline yml, while the fields to be extracted are specified by means of a list which is also specified in the pipeline configuration as a file path. This processor is applied to all messages, because of that it does not need further rules to specify it’s behavior.
Processor Configuration
1- selectiveextractorname:
2 type: selective_extractor
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.selective_extractor.processor.SelectiveExtractor.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The selective extractor requires the additional field selective_extractor.
It contains a list of field names that should be extracted (source_fields)
and list of output mappings to which they should be send to (outputs).
If dotted notation is being used, then all fields on the path are being automatically
created.
In the following example, the field field.extract with
the value extracted value is being extracted
and send to the output named kafka and the topic named topic_to_send_to.
1filter: extract_test
2selective_extractor:
3 source_fields: ["field.extract", "field2", "field3"]
4 outputs:
5 - kafka: topic_to_send_to
6description: '...'
{
"extract_test": {
"field": {
"extract": "extracted value"
}
}
}
{
"extract": "extracted value"
}
Alternatively, the additional field selective_extractor.extract.extract_from_file
can be added.
It contains the path to a text file with a list of fields per line to be extracted.
1filter: extract_test
2selective_extractor:
3 extract_from_file: /path/to/file
4 outputs:
5 - opensearch: topic_to_send_to
6description: '...'
field1
field2
field3
The file has to exist.
It is possible to mix both extraction sources. They will be merged to one list without duplicates.
1filter: extract_test
2selective_extractor:
3 extract_from_file: /path/to/file
4 source_fields: ["field1", "field2", "field4"]
5 outputs:
6 - kafka: topic_to_send_to
7description: '...'
field1
field2
field3
- class logprep.processor.selective_extractor.rule.SelectiveExtractorRule.Config
RuleConfig for SelectiveExtractor
- outputs: tuple[dict[str, str]]
list of output mappings in form of
output_name:topic. Only one mapping is allowed per list element
- extract_from_file: str
The path or url to a file with a flat list of fields to extract. For string format see Getters.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toTrue
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
List of fields in dotted field notation
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
StringSplitter
The string_splitter processor splits string by whitespace (default) or a given delimiter and writes the resulting list to a target field.
Processor Configuration
1- samplename:
2 type: string_splitter
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.string_splitter.processor.StringSplitter.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: message
2string_splitter:
3 source_fields: ["message"]
4 target_field: result
5description: '...'
1{"message": "this is the message"}
1{"message": "this is the message", "result": ["this", "is", "the", "message"]}
- class logprep.processor.string_splitter.rule.StringSplitterRule.Config
Config for StringSplitterRule
- delimiter: str
The delimiter for splitting. Defaults to whitespace
- drop_empty: bool
If empty list values (as a result of the splitting operation) should be dropped or kept. By this definition, the empty string (no characters) and strings containing only whitespace count as ‘empty’. The default setting is to keep empty list values.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for string_splitter:
splits_without_explicit_set_delimiter_on_whitespace:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'drop_empty': True}}message:
{'message': 'this is the message'}processed:
['this', 'is', 'the', 'message']
splits_with_delimiter:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ', ', 'drop_empty': True}}message:
{'message': 'this, is, the, message'}processed:
['this', 'is', 'the', 'message']
splits_one_item_with_delimiter:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': 'this,'}processed:
['this']
splits_one_item_with_multiple_delimiter_and_drop_empty:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': ',,this,,'}processed:
['this']
splits_one_item_with_multiple_delimiter_and_no_drop_empty:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': False}}message:
{'message': ',,this,,'}processed:
['', '', 'this', '', '']
splits_one_item_with_multiple_delimiter_and_empty_fields:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': ' , ,this, ,'}processed:
['this']
splits_one_item_with_multiple_delimiter_and_whitespace:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': ',, this , , '}processed:
[' this ']
splits_one_item_with_multiple_delimiter_and_newline:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': 'n,,this,t, '}processed:
['this']
splits_one_item_with_multiple_delimiter_and_whitespace_only_in_front:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': ',, this, , '}processed:
[' this']
splits_one_item_with_multiple_delimiter_and_whitespace_only_in_front:
rule:
{'filter': 'message', 'string_splitter': {'source_fields': ['message'], 'target_field': 'result', 'delimiter': ',', 'drop_empty': True}}message:
{'message': 'hello , world,this, is a very complex,n , and even multiline, text,,, '}processed:
['hello ', ' world', 'this', ' is a very complex', ' and even multiline', ' text']
TemplateReplacer
The template_replacer is a processor that can replace parts of a text field to anonymize those parts. The replacement is based on a template file.
Processor Configuration
1- templatereplacername:
2 type: template_replacer
3 rules:
4 - tests/testdata/rules/rules
5 template: /tmp/template.yml
6 pattern:
7 delimiter: ","
8 fields:
9 - field.name.a
10 - field.name.b
11 allowed_delimiter_field: field.name.b
12 target_field: target.field
- class logprep.processor.template_replacer.processor.TemplateReplacer.Config
TemplateReplacer config
- template: str
Path to a YML file (for path format see Getters) with a list of replacements in the format %{provider_name}-%{event_id}: %{new_message}.
Security Best Practice - Processor - TemplateReplacer template Memory Consumption
Be aware that all values of the remote file were loaded into memory. Consider to avoid dynamic increasing lists without setting limits for Memory consumption. Additionally avoid loading large files all at once to avoid exceeding http body limits.
Security Best Practice - Processor - TemplateReplacer template Authenticity and Integrity
Consider to use TLS protocol with authentication via mTLS or Oauth to ensure authenticity and integrity of the loaded values.
- pattern: dict
Configures how to use the template file by specifying the following subfields:
delimiter - Delimiter to use to split the template
fields - A list of dotted fields that are being checked by the template.
allowed_delimiter_field - One of the fields in the fields list can contain the delimiter. This must be specified here.
target_field - The field that gets replaced by the template.
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The template replacer requires the additional field template_replacer.
No additional configuration parameters are required for the rules.
The module is completely configured over the pipeline configuration.
In the following example the target field specified in the processor configuration
is replaced for all log messages that have winlog.provider_name and
winlog.event_id if it is defined in the template file.
1filter: winlog.provider_name AND winlog.event_id
2template_replacer: {}
3description: ''
- class logprep.processor.template_replacer.rule.TemplateReplacerRule.Config
Config for FieldManagerRule
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Timestamper
The timestamper processor normalizes timestamps to iso8601 compliant output format.
Processor Configuration
1- myteimestamper:
2 type: timestamper
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.timestamper.processor.Timestamper.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
A speaking example:
1filter: "winlog.event_id: 123456789"
2timestamper:
3 source_fields: ["winlog.event_data.some_timestamp_utc"]
4 target_field: "@timestamp"
5 source_format: UNIX
6 source_timezone: UTC
7 target_timezone: Europe/Berlin
8description: example timestamper rule
1 {
2 "winlog": {
3 "api": "wineventlog",
4 "event_id": 123456789,
5 "event_data": {"some_timestamp_utc": "1642160449"},
6 }
7 }
1 {
2 "@timestamp": "2022-01-14T12:40:49+01:00",
3 "winlog": {
4 "api": "wineventlog",
5 "event_id": 123456789,
6 "event_data": {"some_timestamp_utc": "1642160449"},
7 },
8 }
- class logprep.processor.timestamper.rule.TimestamperRule.Config
Config for TimestamperRule
- source_format: list
A list of possible source formats if source_fields is not an iso8601 compliant time format string the format must be given in the syntax of the python builtin
datetime.strptime(see: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes). Additionally, the valueISO8601(default) andUNIXcan be used in the list of the source_formats field. The former can be used if the timestamp already exists in the ISO8601 format, such that only a timezone conversion should be applied. And the latter can be used if the timestamp is given in the UNIX Epoch Time. This supports the Unix timestamps in seconds and milliseconds. Be aware thatUNIXandISO8601formats do not validate the completeness of input string. If you want to ensure, the completeness of the input string, you have to use thedatetime.strptimesyntax.For example, the following time formats are valid
ISO8601formats:hh:mmhh:mm:sshh:mm:ss.ssshhmmss.sssssshhmmhhmmss
The output string will always be in this format:
2000-12-31T22:59:59Z. As you can see the output string has a time with seconds. If the input string does not have a time or the time does not have seconds, the output string will have seconds or times set to zero.If you don’t want this behavior, you have to use the
datetime.strptimesyntax. With this syntax, thetimestamper`errors out with a :code:`TimeParserExceptionand a tag_timestamper_failurewill be added to the event.
- source_timezone: ZoneInfo
timezone of source_fields. defaults to
UTC
- target_timezone: ZoneInfo
timezone for target_field. defaults to
UTC
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The field from where to get the time from as list with one element
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to, defaults to
@timestamp
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for timestamper:
parses iso8601 without pattern:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'target_field': '@timestamp'}}message:
{'message': '2009-06-15 13:45:30Z'}processed:
{'message': '2009-06-15 13:45:30Z', '@timestamp': '2009-06-15T13:45:30Z'}
parses iso8601 to default target field:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message']}}message:
{'message': '2009-06-15 13:45:30Z'}processed:
{'message': '2009-06-15 13:45:30Z', '@timestamp': '2009-06-15T13:45:30Z'}
parses by datetime source format:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': '%Y %m %d - %H:%M:%S'}}message:
{'message': '2000 12 31 - 22:59:59'}processed:
{'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}
converts timezone information:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}message:
{'message': '2000 12 31 - 22:59:59'}processed:
{'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T23:59:59+01:00'}
parses unix timestamp:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}message:
{'message': '1642160449843'}processed:
{'message': '1642160449843', '@timestamp': '2022-01-14T12:40:49.843000+01:00'}
normalization from timestamp berlin to utc:
rule:
{'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_berlin'], 'target_field': '@timestamp', 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'Europe/Berlin', 'target_timezone': 'UTC'}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_berlin': '1999 12 12 - 12:12:22'}}}processed:
{'@timestamp': '1999-12-12T11:12:22Z', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_berlin': '1999 12 12 - 12:12:22'}}}
normalization from timestamp same timezone:
rule:
{'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': '%Y %m %d - %H:%M:%S', 'source_timezone': 'UTC', 'target_timezone': 'UTC'}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1999 12 12 - 12:12:22'}}}processed:
{'@timestamp': '1999-12-12T12:12:22Z', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1999 12 12 - 12:12:22'}}}
normalization from unix with millis timestamp:
rule:
{'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449843'}}}processed:
{'@timestamp': '2022-01-14T12:40:49.843000+01:00', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449843'}}}
normalization from unix with seconds timestamp:
rule:
{'filter': 'winlog.event_id: 123456789', 'timestamper': {'source_fields': ['winlog.event_data.some_timestamp_utc'], 'target_field': '@timestamp', 'source_format': 'UNIX', 'source_timezone': 'UTC', 'target_timezone': 'Europe/Berlin'}}message:
{'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449'}}}processed:
{'@timestamp': '2022-01-14T12:40:49+01:00', 'winlog': {'api': 'wineventlog', 'event_id': 123456789, 'event_data': {'some_timestamp_utc': '1642160449'}}}
attempt parsing with multiple patterns, second one successful:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': ['%Y %m %d', '%Y %m %d - %H:%M:%S']}}message:
{'message': '2000 12 31 - 22:59:59'}processed:
{'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}
attempt parsing with multiple patterns, both successful but stopping after first:
rule:
{'filter': 'message', 'timestamper': {'source_fields': ['message'], 'source_format': ['%Y %m %d - %H:%M:%S', '%Y %m %d - %H:%M:%S']}}message:
{'message': '2000 12 31 - 22:59:59'}processed:
{'message': '2000 12 31 - 22:59:59', '@timestamp': '2000-12-31T22:59:59Z'}
TimestampDiffer
The timestamp_differ can calculate the time difference between two timestamps.
Processor Configuration
1- timestampdiffer_name:
2 type: timestamp_differ
3 rules:
4 - tests/testdata/rules/rules
- class logprep.processor.timestamp_differ.processor.TimestampDiffer.Config
Common Configurations
- rules: list[str]
List of rule locations to load rules from. In addition to paths to file directories it is possible to retrieve rules from a URI. For valid URI formats see Getters. As last option it is possible to define entire rules with all their configuration parameters as list elements.
- tree_config: str | None
Path to a JSON file with a valid Rule Tree Configuration. For string format see Getters.
- apply_multiple_times: bool
Set if the processor should be applied multiple times. This enables further processing of an output with the same processor.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
Rule Configuration
The timestamp format can be specified per timestamp.
A speaking example:
1filter: 'ingest AND processed'
2timestamp_differ:
3 diff: ${processed:%Y-%m-%d %H:%M:%S} - ${ingest:%Y-%m-%d %H:%M:%S}
4 target_field: processing_time
5 output_format: seconds
6description: '...'
1{"ingest": "2022-12-06 10:00:00", "processed": "2022-12-06 10:00:05"}
1{"ingest": "2022-12-06 10:00:00", "processed": "2022-12-06 10:00:05", "processing_time": "5.0"}
- class logprep.processor.timestamp_differ.rule.TimestampDifferRule.Config
Config for TimestampDiffer
- diff: str
Specifies the timestamp subtraction and their respective timestamp formats. The fields and the timestamp format can be specified in the form of:
${dotted.field.path:timestamp-format}. If no timestamp format is given, e.g.${dotted.field.path}, the string will be assumed as an iso8601 compliant string and parsed. For more information on the format syntax see datetime strftime/strptime.
- source_field_formats: list
- output_format: str
(Optional) Specifies the desired output format of the timestamp difference, allowed values are:
seconds,milliseconds,nanoseconds, defaults to:seconds.
- show_unit: bool
(Optional) Specifies whether the unit (s, ms, ns) should be part of the output. Defaults to
False.
- delete_source_fields: bool
Whether to delete all the source fields or not. Defaults to
False
- description: str
A description for the Rule. This has only documentation character.
- id: str | int | None
A uuid for the rule. Is generated by logprep.
- ignore_missing_fields: bool
If set to
Truemissing fields will be ignored, no warning is logged and the event is not tagged with the failure tag. Defaults toFalse
- mapping: dict
A key-value mapping from source fields to target fields. Can be used to copy/move multiple fields at once. If you want to move fields set
delete_source_fieldsto true. Works independent ofsource_fieldsandtarget_field.
- merge_with_target: bool
If the target field exists and is a list, the list will be extended with the values of the source fields. If the source field is a list, the lists will be merged by appending the source fields list to the target list. If the source field is a dict, the dict will be merged with the target dict. If the source keys exist in the target dict, the values will be overwritten. So this is not e deep merge. If the target field does not exist, a new field will be added with the source field value as list or dict. Defaults to
False.
- overwrite_target: bool
Overwrite the target field value if exists. Defaults to
False
- regex_fields: list
It is possible to use regex expressions to match values. For this, the field name with the regex pattern in the rule filter must be added to the optional field
regex_fieldsin the rule definition.
- source_fields: list
The fields from where to get the values which should be processed, requires
target_field.
- tag_on_failure: list
A list of tags which will be appended to the event on non critical errors, defaults to
["_<rule_type>_failure"]. Is currently only used by the Dissector and FieldManager.
- target_field: str
The field where to write the processed values to. Can be used to move/copy single values, merge multiple values to one list or extend a list. Requires
source_field.
- tests: List[Dict[str, str]]
Custom tests for this rule.
Examples for timestamp_differ:
Time difference between two timestamps:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278.0'}
Time difference between two timestamps with day change:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}message:
{'field1': '2022-12-04 12:00:00', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-04 12:00:00', 'field2': '2022-12-05 12:00:00', 'time_diff': '86400.0'}
Time difference between two timestamps with timezone information:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S %z} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}message:
{'field2': '2022-05-09 03:56:47 -03:00', 'field1': '2022-05-08'}processed:
{'field2': '2022-05-09 03:56:47 -03:00', 'field1': '2022-05-08', 'time_diff': '111407.0'}
Time difference between two timestamps with full weekday and month:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%A, %d. %B %Y %I:%M%p} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}message:
{'field2': 'Monday, 05. December 2022 11:19AM', 'field1': '2022-12-05'}processed:
{'field2': 'Monday, 05. December 2022 11:19AM', 'field1': '2022-12-05', 'time_diff': '40740.0'}
Time difference between two timestamps with AM/PM :
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%a %b %d %I:%M:%S %p %Y} - ${field1:%Y-%m-%d}', 'target_field': 'time_diff'}}message:
{'field2': 'Wed Dec 4 1:14:31 PM 2022', 'field1': '2022-12-03'}processed:
{'field2': 'Wed Dec 4 1:14:31 PM 2022', 'field1': '2022-12-03', 'time_diff': '134071.0'}
Time difference between two timestamps with milliseconds output:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'milliseconds'}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000.0'}
Time difference between two timestamps with nanoseconds output:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'nanoseconds'}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000000000.0'}
Time difference between two timestamps in subfield:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff'}}message:
{'field1': '2022-12-05 11:38:42', 'subfield': {'field2': '2022-12-05 12:00:00'}}processed:
{'field1': '2022-12-05 11:38:42', 'subfield': {'field2': '2022-12-05 12:00:00'}, 'time_diff': '1278.0'}
Time difference between two timestamps without specific timestamp format:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff'}}message:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}}processed:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': '5922.0'}
Time difference between two timestamps with removal of source fields:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'delete_source_fields': True}}message:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}}processed:
{'time_diff': '5922.0'}
Time difference between two timestamps with overwriting of target:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'overwrite_target': True}}message:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': 'some content'}processed:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': '5922.0'}
Time difference between two timestamps with extension of existing list in target field:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'merge_with_target': True}}message:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': ['some content']}processed:
{'field1': '2022-12-05 12:00:00', 'subfield': {'field2': '2022-12-05T11:38:42-02:00'}, 'time_diff': ['some content', '5922.0']}
Timestamp diff with integer field (unix epoch):
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff'}}message:
{'field1': 1670234400, 'subfield': {'field2': '2022-12-05 12:00:00'}}processed:
{'field1': 1670234400, 'subfield': {'field2': '2022-12-05 12:00:00'}, 'time_diff': '7200.0'}
Timestamp diff with difference in milliseconds, output in seconds:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff'}}message:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}processed:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '0.3'}
Timestamp diff with difference in milliseconds, output in milliseconds:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff', 'output_format': 'milliseconds'}}message:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}processed:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '300.0'}
Timestamp diff with difference in milliseconds, output in nanoseconds:
rule:
{'filter': 'field1 AND subfield.field2', 'timestamp_differ': {'diff': '${subfield.field2} - ${field1}', 'target_field': 'time_diff', 'output_format': 'nanoseconds'}}message:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}}processed:
{'field1': '2022-12-05 12:00:00.200', 'subfield': {'field2': '2022-12-05 12:00:00.500'}, 'time_diff': '300000000.0'}
Time difference between two timestamps with negative result:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2} - ${field1}', 'target_field': 'time_diff'}}message:
{'field2': '2022-12-09', 'field1': '2022-12-10'}processed:
{'field2': '2022-12-09', 'field1': '2022-12-10', 'time_diff': '-86400.0'}
Time difference between two timestamps with visible second unit:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'seconds', 'show_unit': True}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278.0 s'}
Time difference between two timestamps with visible millisecond unit:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'milliseconds', 'show_unit': True}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000.0 ms'}
Time difference between two timestamps with visible nanosecond unit:
rule:
{'filter': 'field1 AND field2', 'timestamp_differ': {'diff': '${field2:%Y-%m-%d %H:%M:%S} - ${field1:%Y-%m-%d %H:%M:%S}', 'target_field': 'time_diff', 'output_format': 'nanoseconds', 'show_unit': True}}message:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00'}processed:
{'field1': '2022-12-05 11:38:42', 'field2': '2022-12-05 12:00:00', 'time_diff': '1278000000000.0 ns'}