Input

Security Best Practice - Input Connectors

It is advised to only use the ConfluentKafkaInput, HttpConnector or FileInput as input connectors in production environments. The connectors DummyInput, JsonInput and JsonlInput are mainly designed for debugging purposes.

Furthermore, it is suggested to enable the HMAC preprocessor to ensure no tempering of processed events.

hmac:
   target: <RAW_MSG>
   key: <SECRET>
   output_field: HMAC

ConfluentkafkaInput

Logprep uses confluent-kafka python client library to communicate with kafka-clusters. Important documentation sources are:

Example

 1input:
 2  mykafkainput:
 3    type: confluentkafka_input
 4    topic: consumer
 5    kafka_config:
 6        bootstrap.servers: "127.0.0.1:9092,127.0.0.1:9093"
 7        group.id: "cgroup"
 8        enable.auto.commit: "true"
 9        session.timeout.ms: "6000"
10        auto.offset.reset: "earliest"
class logprep.connector.confluent_kafka.input.ConfluentKafkaInput.Config

Kafka input connector specific configurations

topic: str

The topic from which new log messages will be fetched.

kafka_config: MappingProxyType

Kafka configuration for the kafka client. At minimum the following keys must be set:

  • bootstrap.servers (STRING): a comma separated list of kafka brokers

  • group.id (STRING): a unique identifier for the consumer group

The following keys are injected by the connector and should not be set:

  • “enable.auto.offset.store” is set to “false”,

  • “enable.auto.commit” is set to “true”,

For additional configuration options see the official: librdkafka configuration.

DEFAULTS:

  • enable.auto.offset.store: false

  • enable.auto.commit: true

  • client.id: <<hostname>>

  • auto.offset.reset: earliest

  • session.timeout.ms: 6000

  • statistics.interval.ms: 30000

Security Best Practice - Kafka Input Consumer Authentication and Encryption

Kafka authentication is a critical aspect of securing your data pipeline. Ensure that you have the following configurations in place:

  • Use SSL/mTLS encryption for data in transit.

  • Configure SASL or mTLS authentication for your Kafka clients.

  • Regularly rotate your Kafka credentials and secrets.

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

DummyInput

A dummy input that returns the documents it was initialized with.

If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.

Example

1input:
2  mydummyinput:
3    type: dummy_input
4    documents: [{"document":"one"}, "Exception", {"document":"two"}]
class logprep.connector.dummy.input.DummyInput.Config

DummyInput specific configuration

documents: list[dict | type | Exception]

A list of documents that should be returned.

repeat_documents: bool

If set to true, then the given input documents will be repeated after the last one is reached. Default: False

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

HTTPInput

A http input connector that spawns an uvicorn server and accepts http requests, parses them, puts them to an internal queue and pops them via get_next method.

HTTP Connector Config Example

An example config file would look like:

 1input:
 2  myhttpinput:
 3    type: http_input
 4    message_backlog_size: 15000
 5    collect_meta: False
 6    metafield_name: "@metadata"
 7    original_event_field:
 8        "target_field": "event.original"
 9        "format": "dict"
10    uvicorn_config:
11      host: 0.0.0.0
12      port: 9000
13    endpoints:
14      /firstendpoint: json
15      /second*: plaintext
16      /(third|fourth)/endpoint: jsonl
The endpoint config supports regex and wildcard patterns:
  • /second*: matches everything after asterisk

  • /(third|fourth)/endpoint matches either third or forth in the first part

The connector configuration includes an optional parameter called original_event_field. When set, the full event is stored as a string or dictionary in a specified field. The target field for this operation is set via the parameter target_field and the format (string or dictionary) ist specified with the format parameter.

Endpoint Credentials Config Example

By providing a credentials file in environment variable LOGPREP_CREDENTIALS_FILE you can add basic authentication for a specific endpoint. The format of this file would look like:

Example for credentials file
1input:
2  endpoints:
3    /firstendpoint:
4      username: user
5      password_file: examples/exampledata/config/user_password.txt
6    /second*:
7      username: user
8      password: secret_password

You can choose between a plain secret with the key password or a filebased secret with the key password_file.

Security Best Practice - Http Input Connector - Authentication

When using basic auth with the http input connector the following points should be taken into account:

  • basic auth must only be used with strong passwords

  • basic auth must only be used with TLS encryption

  • avoid to reveal your plaintext secrets in public repositories

Behaviour of HTTP Requests

  • GET:

    • Responds always with 200 (ignores configured Basic Auth)

    • When Messages Queue is full, it responds with 429

  • POST:

    • Responds with 200 on non-Basic Auth Endpoints

    • Responds with 401 on Basic Auth Endpoints (and 200 with appropriate credentials)

    • When Messages Queue is full, it responds wiht 429

  • ALL OTHER:

    • Responds with 405

class logprep.connector.http.input.HttpInput.Config

Config for HTTPInput

uvicorn_config: dict[str, str | int]

Configure uvicorn server. For possible settings see uvicorn settings page.

Security Best Practice - Uvicorn Webserver Configuration

Additionally to the below it is recommended to configure ssl on the metrics server endpoint <https://www.uvicorn.org/settings/#https>`_

uvicorn_config:
    access_log: true
    server_header: false
    date_header: false
    workers: 2
endpoints: dict[str, str]

Configure endpoint routes with a Mapping of a path to an endpoint. Possible endpoints are: json, jsonl, plaintext. It’s possible to use wildcards and regexps for pattern matching.

class PlaintextHttpEndpoint

plaintext endpoint to get the body from request and put it in message field

class JSONLHttpEndpoint

jsonl endpoint to get jsonl from request

class JSONHttpEndpoint

json endpoint to get json from request

message_backlog_size: int

Configures maximum size of input message queue for this connector. When limit is reached the server will answer with 429 Too Many Requests. For reasonable throughput this shouldn’t be smaller than default value of 15.000 messages.

copy_headers_to_logs: set[str]

Defines what metadata should be collected from Http Headers Special cases: - remote_addr (Gets the inbound client ip instead of header) - url (Get the requested url from http request and not technically a header)

Defaults: - remote_addr - url - User-Agent

The output header names in Events are stored as json strings, and are transformed from “User-Agent” to “user_agent”

collect_meta: bool

Deprecated use copy_headers_to_logs instead, to turn off collecting metadata set copy_headers_to_logs to an empty list ([]). Defines if metadata should be collected - True: Collect metadata - False: Won’t collect metadata

Security Best Practice - Input Connector - HttpConnector

It is suggested to enable the collection of meta data (collect_meta: True) to ensure transparency of the incoming events.

metafield_name: str

Defines the name of the key for the collected metadata fields. Logs a Warning if metadata field overwrites preexisting field in Event

original_event_field: dict[str, str] | None

Optional config parameter that writes the full event to one single target field. The format can be specified with the parameter format. Possible are str and dict where dict is the default format. The target field can be specified with the parameter target_field.

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

JsonInput

A json input that returns the documents it was initialized with.

If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.

Example

1input:
2  myjsoninput:
3    type: json_input
4    documents_path: path/to/a/document.json
5    repeat_documents: true
class logprep.connector.json.input.JsonInput.Config

JsonInput connector specific configuration

documents_path: str

A path to a file in json format, with can also include multiple jsons dicts wrapped in a list.

repeat_documents: bool

If set to true, then the given input documents will be repeated after the last one is reached. Default: False

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

JsonlInput

A json line input that returns the documents it was initialized with.

If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.

Example

1input:
2  myjsonlinput:
3    type: jsonl_input
4    documents_path: path/to/a/document.jsonl
5    repeat_documents: true
class logprep.connector.jsonl.input.JsonlInput.Config

JsonInput connector specific configuration

documents_path: str

A path to a file in json format, with can also include multiple jsons dicts wrapped in a list.

repeat_documents: bool

If set to true, then the given input documents will be repeated after the last one is reached. Default: False

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check

FileInput

A generic line input that returns the documents it was initialized with. If a “document” is derived from Exception, that exception will be thrown instead of returning a document. The exception will be removed and subsequent calls may return documents or throw other exceptions in the given order.

Example

1input:
2  myfileinput:
3    type: file_input
4    logfile_path: path/to/a/document
5    start: begin
6    interval: 1
7    watch_file: True
class logprep.connector.file.input.FileInput.Config

FileInput connector specific configuration

logfile_path: str

A path to a file in generic raw format, which can be in any string based format. Needs to be parsed with dissector or another processor

start: str

Defines the behaviour of the file monitor with the following options: - begin: starts to read from the beginning of a file - end: goes initially to the end of the file and waits for new content

watch_file: bool

Defines the behaviour of the file monitor with the following options: - True: Read the file like defined in start param and monitor continuously for newly appended log lines or file changes - False: Read the file like defined in start param only once and exit afterwards

interval: int

Defines the refresh interval, how often the file is checked for changes

preprocessing: PreprocessingConfig

See PreprocessingConfig for more details.

type: str

Type of the component

health_timeout: float

Default is 1 seconds

Type:

Timeout in seconds for health check