diff options
author | Sarah Brofeldt <sarah@qtr.dk> | 2023-04-04 09:23:50 +0200 |
---|---|---|
committer | github-actions[bot] <github-actions[bot]@users.noreply.github.com> | 2023-11-23 16:35:54 +0000 |
commit | 2bce5b9c9aa8e7026820636e6b0bdd532c595a30 (patch) | |
tree | d67876d43d314e243fcac8d50f2b857c2682048f /nixos/modules | |
parent | ca3b90ebaf16cebdb25e1ce72212898d3aa5c2c9 (diff) |
nixos/apache-kafka: structured settings
- Use lazyAttrs (for config references) settings for main server.properties. - Drop dangerous default for "log.dirs" - Drop apache-kafka homedir; unused and confusing - Support formatting kraft logdirs (cherry picked from commit 45f84cdfd53c954a78b3c73717d213c291636c67)
Diffstat (limited to 'nixos/modules')
-rw-r--r-- | nixos/modules/services/misc/apache-kafka.nix | 184 |
1 files changed, 126 insertions, 58 deletions
diff --git a/nixos/modules/services/misc/apache-kafka.nix b/nixos/modules/services/misc/apache-kafka.nix index 598907aaf1c61..10ec3682f9421 100644 --- a/nixos/modules/services/misc/apache-kafka.nix +++ b/nixos/modules/services/misc/apache-kafka.nix @@ -5,75 +5,117 @@ with lib; let cfg = config.services.apache-kafka; - serverProperties = - if cfg.serverProperties != null then - cfg.serverProperties - else - '' - # Generated by nixos - broker.id=${toString cfg.brokerId} - port=${toString cfg.port} - host.name=${cfg.hostname} - log.dirs=${concatStringsSep "," cfg.logDirs} - zookeeper.connect=${cfg.zookeeper} - ${toString cfg.extraProperties} - ''; + # The `javaProperties` generator takes care of various escaping rules and + # generation of the properties file, but we'll handle stringly conversion + # ourselves in mkPropertySettings and stringlySettings, since we know more + # about the specifically allowed format eg. for lists of this type, and we + # don't want to coerce-downsample values to str too early by having the + # coercedTypes from javaProperties directly in our NixOS option types. + # + # Make sure every `freeformType` and any specific option type in `settings` is + # supported here. + + mkPropertyString = let + render = { + bool = boolToString; + int = toString; + list = concatMapStringsSep "," mkPropertyString; + string = id; + }; + in + v: render.${builtins.typeOf v} v; - serverConfig = pkgs.writeText "server.properties" serverProperties; - logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties; + stringlySettings = mapAttrs (_: mkPropertyString) + (filterAttrs (_: v: v != null) cfg.settings); + generator = (pkgs.formats.javaProperties {}).generate; in { options.services.apache-kafka = { - enable = mkOption { - description = lib.mdDoc "Whether to enable Apache Kafka."; - default = false; - type = types.bool; - }; - - brokerId = mkOption { - description = lib.mdDoc "Broker ID."; - default = -1; - type = types.int; - }; + enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker"); - port = mkOption { - description = lib.mdDoc "Port number the broker should listen on."; - default = 9092; - type = types.port; + settings = mkOption { + description = lib.mdDoc '' + [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) + {file}`server.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). + ''; + type = types.submodule { + freeformType = with types; let + primitive = oneOf [bool int str]; + in lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "broker.id" = mkOption { + description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode."; + default = null; + type = with types; nullOr int; + }; + + "log.dirs" = mkOption { + description = lib.mdDoc "Log file directories."; + # Deliberaly leave out old default and use the rewrite opportunity + # to have users choose a safer value -- /tmp might be volatile and is a + # slightly scary default choice. + # default = [ "/tmp/apache-kafka" ]; + type = with types; listOf path; + }; + + "listeners" = mkOption { + description = lib.mdDoc '' + Kafka Listener List. + See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). + ''; + type = types.listOf types.str; + default = [ "PLAINTEXT://localhost:9092" ]; + }; + }; + }; }; - hostname = mkOption { - description = lib.mdDoc "Hostname the broker should bind to."; - default = "localhost"; - type = types.str; + clusterId = mkOption { + description = lib.mdDoc '' + KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid` + ''; + type = with types; nullOr str; + default = null; }; - logDirs = mkOption { - description = lib.mdDoc "Log file directories"; - default = [ "/tmp/kafka-logs" ]; - type = types.listOf types.path; + configFiles.serverProperties = mkOption { + description = lib.mdDoc '' + Kafka server.properties configuration file path. + Defaults to the rendered `settings`. + ''; + type = types.path; }; - zookeeper = mkOption { - description = lib.mdDoc "Zookeeper connection string"; - default = "localhost:2181"; - type = types.str; + configFiles.log4jProperties = mkOption { + description = lib.mdDoc "Kafka log4j property configuration file path"; + type = types.path; + default = pkgs.writeText "log4j.properties" cfg.log4jProperties; + defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties''; }; - extraProperties = mkOption { - description = lib.mdDoc "Extra properties for server.properties."; - type = types.nullOr types.lines; - default = null; + formatLogDirs = mkOption { + description = lib.mdDoc '' + Whether to format log dirs in KRaft mode if all log dirs are + unformatted, ie. they contain no meta.properties. + ''; + type = types.bool; + default = false; }; - serverProperties = mkOption { + formatLogDirsIgnoreFormatted = mkOption { description = lib.mdDoc '' - Complete server.properties content. Other server.properties config - options will be ignored if this option is used. + Whether to ignore already formatted log dirs when formatting log dirs, + instead of failing. Useful when replacing or adding disks. ''; - type = types.nullOr types.lines; - default = null; + type = types.bool; + default = false; }; log4jProperties = mkOption { @@ -112,35 +154,61 @@ in { defaultText = literalExpression "pkgs.apacheKafka.passthru.jre"; type = types.package; }; - }; - config = mkIf cfg.enable { + imports = [ + (mkRenamedOptionModule + [ "services" "apache-kafka" "brokerId" ] + [ "services" "apache-kafka" "settings" ''broker.id'' ]) + (mkRenamedOptionModule + [ "services" "apache-kafka" "logDirs" ] + [ "services" "apache-kafka" "settings" ''log.dirs'' ]) + (mkRenamedOptionModule + [ "services" "apache-kafka" "zookeeper" ] + [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ]) + + (mkRemovedOptionModule [ "services" "apache-kafka" "port" ] + "Please see services.apache-kafka.settings.listeners and its documentation instead") + (mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ] + "Please see services.apache-kafka.settings.listeners and its documentation instead") + (mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ] + "Please see services.apache-kafka.settings and its documentation instead") + (mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ] + "Please see services.apache-kafka.settings and its documentation instead") + ]; - environment.systemPackages = [cfg.package]; + config = mkIf cfg.enable { + services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; users.users.apache-kafka = { isSystemUser = true; group = "apache-kafka"; description = "Apache Kafka daemon user"; - home = head cfg.logDirs; }; users.groups.apache-kafka = {}; - systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs; + systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs"; systemd.services.apache-kafka = { description = "Apache Kafka Daemon"; wantedBy = [ "multi-user.target" ]; after = [ "network.target" ]; + preStart = mkIf cfg.formatLogDirs + (if cfg.formatLogDirsIgnoreFormatted then '' + ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted + '' else '' + if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then + ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} + fi + ''); serviceConfig = { ExecStart = '' ${cfg.jre}/bin/java \ -cp "${cfg.package}/libs/*" \ - -Dlog4j.configuration=file:${logConfig} \ + -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \ ${toString cfg.jvmOptions} \ kafka.Kafka \ - ${serverConfig} + ${cfg.configFiles.serverProperties} ''; User = "apache-kafka"; SuccessExitStatus = "0 143"; |