Input
Security Best Practice - Input Connectors
It is advised to only use the ConfluentKafkaInput, HttpConnector or
FileInput as input connectors in production environments.
The connectors DummyInput, JsonInput and JsonlInput are mainly designed
for debugging purposes.
Furthermore, it is suggested to enable the HMAC preprocessor to ensure no tempering of
processed events.
hmac:
target: <RAW_MSG>
key: <SECRET>
output_field: HMAC
ConfluentkafkaInput
Logprep uses confluent-kafka python client library to communicate with kafka-clusters. Important documentation sources are:
Example
1input:
2 mykafkainput:
3 type: confluentkafka_input
4 topic: consumer
5 kafka_config:
6 bootstrap.servers: "127.0.0.1:9092,127.0.0.1:9093"
7 group.id: "cgroup"
8 enable.auto.commit: "true"
9 session.timeout.ms: "6000"
10 auto.offset.reset: "earliest"
- class logprep.connector.confluent_kafka.input.ConfluentKafkaInput.Config
Kafka input connector specific configurations
- topic: str
The topic from which new log messages will be fetched.
- kafka_config: MappingProxyType
Kafka configuration for the kafka client. At minimum the following keys must be set:
bootstrap.servers (STRING): a comma separated list of kafka brokers
group.id (STRING): a unique identifier for the consumer group
The following keys are injected by the connector and should not be set:
“enable.auto.offset.store” is set to “false”,
“enable.auto.commit” is set to “true”,
For additional configuration options see the official: librdkafka configuration.
DEFAULTS:
enable.auto.offset.store:falseenable.auto.commit:trueclient.id:<<hostname>>auto.offset.reset:earliestsession.timeout.ms:6000statistics.interval.ms:30000
Security Best Practice - Kafka Input Consumer Authentication and Encryption
Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:
Use SSL/mTLS encryption for data in transit.
Configure SASL or mTLS authentication for your Kafka clients.
Regularly rotate your Kafka credentials and secrets.
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
DummyInput
A dummy input that returns the documents it was initialized with.
If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.
Example
1input:
2 mydummyinput:
3 type: dummy_input
4 documents: [{"document":"one"}, "Exception", {"document":"two"}]
- class logprep.connector.dummy.input.DummyInput.Config
DummyInput specific configuration
- documents: list[dict | type | Exception]
A list of documents that should be returned.
- repeat_documents: bool
If set to
true, then the given input documents will be repeated after the last one is reached. Default:False
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
HTTPInput
A http input connector that spawns an uvicorn server and accepts http requests, parses them,
puts them to an internal queue and pops them via get_next method.
HTTP Connector Config Example
An example config file would look like:
1input:
2 myhttpinput:
3 type: http_input
4 message_backlog_size: 15000
5 collect_meta: False
6 metafield_name: "@metadata"
7 original_event_field:
8 "target_field": "event.original"
9 "format": "dict"
10 uvicorn_config:
11 host: 0.0.0.0
12 port: 9000
13 endpoints:
14 /firstendpoint: json
15 /second*: plaintext
16 /(third|fourth)/endpoint: jsonl
- The endpoint config supports regex and wildcard patterns:
/second*: matches everything after asterisk/(third|fourth)/endpointmatches either third or forth in the first part
The connector configuration includes an optional parameter called original_event_field. When set, the full event is stored as a string or dictionary in a specified field. The target field for this operation is set via the parameter target_field and the format (string or dictionary) ist specified with the format parameter.
Endpoint Credentials Config Example
By providing a credentials file in environment variable LOGPREP_CREDENTIALS_FILE you can
add basic authentication for a specific endpoint. The format of this file would look like:
1input:
2 endpoints:
3 /firstendpoint:
4 username: user
5 password_file: examples/exampledata/config/user_password.txt
6 /second*:
7 username: user
8 password: secret_password
You can choose between a plain secret with the key password or a filebased secret
with the key password_file.
Security Best Practice - Http Input Connector - Authentication
When using basic auth with the http input connector the following points should be taken into account:
basic auth must only be used with strong passwords
basic auth must only be used with TLS encryption
avoid to reveal your plaintext secrets in public repositories
Behaviour of HTTP Requests
GET:
Responds always with 200 (ignores configured Basic Auth)
When Messages Queue is full, it responds with 429
POST:
Responds with 200 on non-Basic Auth Endpoints
Responds with 401 on Basic Auth Endpoints (and 200 with appropriate credentials)
When Messages Queue is full, it responds wiht 429
ALL OTHER:
Responds with 405
- class logprep.connector.http.input.HttpInput.Config
Config for HTTPInput
- uvicorn_config: dict[str, str | int]
Configure uvicorn server. For possible settings see uvicorn settings page.
Security Best Practice - Uvicorn Webserver Configuration
Additionally to the below it is recommended to configure ssl on the metrics server endpoint <https://www.uvicorn.org/settings/#https>`_
uvicorn_config: access_log: true server_header: false date_header: false workers: 2
- endpoints: dict[str, str]
Configure endpoint routes with a Mapping of a path to an endpoint. Possible endpoints are:
json,jsonl,plaintext. It’s possible to use wildcards and regexps for pattern matching.- class PlaintextHttpEndpoint
plaintextendpoint to get the body from request and put it inmessagefield
- class JSONLHttpEndpoint
jsonlendpoint to get jsonl from request
- class JSONHttpEndpoint
jsonendpoint to get json from request
- message_backlog_size: int
Configures maximum size of input message queue for this connector. When limit is reached the server will answer with 429 Too Many Requests. For reasonable throughput this shouldn’t be smaller than default value of 15.000 messages.
- copy_headers_to_logs: set[str]
Defines what metadata should be collected from Http Headers Special cases: - remote_addr (Gets the inbound client ip instead of header) - url (Get the requested url from http request and not technically a header)
Defaults: - remote_addr - url - User-Agent
The output header names in Events are stored as json strings, and are transformed from “User-Agent” to “user_agent”
- collect_meta: bool
Deprecated use copy_headers_to_logs instead, to turn off collecting metadata set copy_headers_to_logs to an empty list ([]). Defines if metadata should be collected -
True: Collect metadata -False: Won’t collect metadataSecurity Best Practice - Input Connector - HttpConnector
It is suggested to enable the collection of meta data (
collect_meta: True) to ensure transparency of the incoming events.
- metafield_name: str
Defines the name of the key for the collected metadata fields. Logs a Warning if metadata field overwrites preexisting field in Event
- original_event_field: dict[str, str] | None
Optional config parameter that writes the full event to one single target field. The format can be specified with the parameter
format. Possible arestranddictwhere dict is the default format. The target field can be specified with the parametertarget_field.
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
JsonInput
A json input that returns the documents it was initialized with.
If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.
Example
1input:
2 myjsoninput:
3 type: json_input
4 documents_path: path/to/a/document.json
5 repeat_documents: true
- class logprep.connector.json.input.JsonInput.Config
JsonInput connector specific configuration
- documents_path: str
A path to a file in json format, with can also include multiple jsons dicts wrapped in a list.
- repeat_documents: bool
If set to
true, then the given input documents will be repeated after the last one is reached. Default:False
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
JsonlInput
A json line input that returns the documents it was initialized with.
If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.
Example
1input:
2 myjsonlinput:
3 type: jsonl_input
4 documents_path: path/to/a/document.jsonl
5 repeat_documents: true
- class logprep.connector.jsonl.input.JsonlInput.Config
JsonInput connector specific configuration
- documents_path: str
A path to a file in json format, with can also include multiple jsons dicts wrapped in a list.
- repeat_documents: bool
If set to
true, then the given input documents will be repeated after the last one is reached. Default:False
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check
FileInput
A generic line input that returns the documents it was initialized with. If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.
Example
1input:
2 myfileinput:
3 type: file_input
4 logfile_path: path/to/a/document
5 start: begin
6 interval: 1
7 watch_file: True
- class logprep.connector.file.input.FileInput.Config
FileInput connector specific configuration
- logfile_path: str
A path to a file in generic raw format, which can be in any string based format. Needs to be parsed with dissector or another processor
- start: str
Defines the behaviour of the file monitor with the following options: -
begin: starts to read from the beginning of a file -end: goes initially to the end of the file and waits for new content
- watch_file: bool
Defines the behaviour of the file monitor with the following options: -
True: Read the file like defined in start param and monitor continuously for newly appended log lines or file changes -False: Read the file like defined in start param only once and exit afterwards
- interval: int
Defines the refresh interval, how often the file is checked for changes
- preprocessing: PreprocessingConfig
See
PreprocessingConfigfor more details.
- type: str
Type of the component
- health_timeout: float
Default is 1 seconds
- Type:
Timeout in seconds for health check