about summary refs log tree commit diff
path: root/nixos
diff options
context:
space:
mode:
authorSarah Brofeldt <sarah@qtr.dk>2023-04-04 09:23:50 +0200
committergithub-actions[bot] <github-actions[bot]@users.noreply.github.com>2023-11-23 16:35:54 +0000
commit2bce5b9c9aa8e7026820636e6b0bdd532c595a30 (patch)
treed67876d43d314e243fcac8d50f2b857c2682048f /nixos
parentca3b90ebaf16cebdb25e1ce72212898d3aa5c2c9 (diff)
nixos/apache-kafka: structured settings
- Use lazyAttrs (for config references) settings for main server.properties.
- Drop dangerous default for "log.dirs"
- Drop apache-kafka homedir; unused and confusing
- Support formatting kraft logdirs

(cherry picked from commit 45f84cdfd53c954a78b3c73717d213c291636c67)
Diffstat (limited to 'nixos')
-rw-r--r--nixos/modules/services/misc/apache-kafka.nix184
-rw-r--r--nixos/tests/kafka.nix12
2 files changed, 133 insertions, 63 deletions
diff --git a/nixos/modules/services/misc/apache-kafka.nix b/nixos/modules/services/misc/apache-kafka.nix
index 598907aaf1c61..10ec3682f9421 100644
--- a/nixos/modules/services/misc/apache-kafka.nix
+++ b/nixos/modules/services/misc/apache-kafka.nix
@@ -5,75 +5,117 @@ with lib;
 let
   cfg = config.services.apache-kafka;
 
-  serverProperties =
-    if cfg.serverProperties != null then
-      cfg.serverProperties
-    else
-      ''
-        # Generated by nixos
-        broker.id=${toString cfg.brokerId}
-        port=${toString cfg.port}
-        host.name=${cfg.hostname}
-        log.dirs=${concatStringsSep "," cfg.logDirs}
-        zookeeper.connect=${cfg.zookeeper}
-        ${toString cfg.extraProperties}
-      '';
+  # The `javaProperties` generator takes care of various escaping rules and
+  # generation of the properties file, but we'll handle stringly conversion
+  # ourselves in mkPropertySettings and stringlySettings, since we know more
+  # about the specifically allowed format eg. for lists of this type, and we
+  # don't want to coerce-downsample values to str too early by having the
+  # coercedTypes from javaProperties directly in our NixOS option types.
+  #
+  # Make sure every `freeformType` and any specific option type in `settings` is
+  # supported here.
+
+  mkPropertyString = let
+    render = {
+      bool = boolToString;
+      int = toString;
+      list = concatMapStringsSep "," mkPropertyString;
+      string = id;
+    };
+  in
+    v: render.${builtins.typeOf v} v;
 
-  serverConfig = pkgs.writeText "server.properties" serverProperties;
-  logConfig = pkgs.writeText "log4j.properties" cfg.log4jProperties;
+  stringlySettings = mapAttrs (_: mkPropertyString)
+    (filterAttrs (_: v:  v != null) cfg.settings);
 
+  generator = (pkgs.formats.javaProperties {}).generate;
 in {
 
   options.services.apache-kafka = {
-    enable = mkOption {
-      description = lib.mdDoc "Whether to enable Apache Kafka.";
-      default = false;
-      type = types.bool;
-    };
-
-    brokerId = mkOption {
-      description = lib.mdDoc "Broker ID.";
-      default = -1;
-      type = types.int;
-    };
+    enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");
 
-    port = mkOption {
-      description = lib.mdDoc "Port number the broker should listen on.";
-      default = 9092;
-      type = types.port;
+    settings = mkOption {
+      description = lib.mdDoc ''
+        [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
+        {file}`server.properties`.
+
+        Note that .properties files contain mappings from string to string.
+        Keys with dots are NOT represented by nested attrs in these settings,
+        but instead as quoted strings (ie. `settings."broker.id"`, NOT
+        `settings.broker.id`).
+     '';
+      type = types.submodule {
+        freeformType = with types; let
+          primitive = oneOf [bool int str];
+        in lazyAttrsOf (nullOr (either primitive (listOf primitive)));
+
+        options = {
+          "broker.id" = mkOption {
+            description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
+            default = null;
+            type = with types; nullOr int;
+          };
+
+          "log.dirs" = mkOption {
+            description = lib.mdDoc "Log file directories.";
+            # Deliberaly leave out old default and use the rewrite opportunity
+            # to have users choose a safer value -- /tmp might be volatile and is a
+            # slightly scary default choice.
+            # default = [ "/tmp/apache-kafka" ];
+            type = with types; listOf path;
+          };
+
+          "listeners" = mkOption {
+            description = lib.mdDoc ''
+              Kafka Listener List.
+              See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
+            '';
+            type = types.listOf types.str;
+            default = [ "PLAINTEXT://localhost:9092" ];
+          };
+        };
+      };
     };
 
-    hostname = mkOption {
-      description = lib.mdDoc "Hostname the broker should bind to.";
-      default = "localhost";
-      type = types.str;
+    clusterId = mkOption {
+      description = lib.mdDoc ''
+        KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
+      '';
+      type = with types; nullOr str;
+      default = null;
     };
 
-    logDirs = mkOption {
-      description = lib.mdDoc "Log file directories";
-      default = [ "/tmp/kafka-logs" ];
-      type = types.listOf types.path;
+    configFiles.serverProperties = mkOption {
+      description = lib.mdDoc ''
+        Kafka server.properties configuration file path.
+        Defaults to the rendered `settings`.
+      '';
+      type = types.path;
     };
 
-    zookeeper = mkOption {
-      description = lib.mdDoc "Zookeeper connection string";
-      default = "localhost:2181";
-      type = types.str;
+    configFiles.log4jProperties = mkOption {
+      description = lib.mdDoc "Kafka log4j property configuration file path";
+      type = types.path;
+      default = pkgs.writeText "log4j.properties" cfg.log4jProperties;
+      defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties'';
     };
 
-    extraProperties = mkOption {
-      description = lib.mdDoc "Extra properties for server.properties.";
-      type = types.nullOr types.lines;
-      default = null;
+    formatLogDirs = mkOption {
+      description = lib.mdDoc ''
+        Whether to format log dirs in KRaft mode if all log dirs are
+        unformatted, ie. they contain no meta.properties.
+      '';
+      type = types.bool;
+      default = false;
     };
 
-    serverProperties = mkOption {
+    formatLogDirsIgnoreFormatted = mkOption {
       description = lib.mdDoc ''
-        Complete server.properties content. Other server.properties config
-        options will be ignored if this option is used.
+        Whether to ignore already formatted log dirs when formatting log dirs,
+        instead of failing. Useful when replacing or adding disks.
       '';
-      type = types.nullOr types.lines;
-      default = null;
+      type = types.bool;
+      default = false;
     };
 
     log4jProperties = mkOption {
@@ -112,35 +154,61 @@ in {
       defaultText = literalExpression "pkgs.apacheKafka.passthru.jre";
       type = types.package;
     };
-
   };
 
-  config = mkIf cfg.enable {
+  imports = [
+    (mkRenamedOptionModule
+      [ "services" "apache-kafka" "brokerId" ]
+      [ "services" "apache-kafka" "settings" ''broker.id'' ])
+    (mkRenamedOptionModule
+      [ "services" "apache-kafka" "logDirs" ]
+      [ "services" "apache-kafka" "settings" ''log.dirs'' ])
+    (mkRenamedOptionModule
+      [ "services" "apache-kafka" "zookeeper" ]
+      [ "services" "apache-kafka" "settings" ''zookeeper.connect'' ])
+
+    (mkRemovedOptionModule [ "services" "apache-kafka" "port" ]
+      "Please see services.apache-kafka.settings.listeners and its documentation instead")
+    (mkRemovedOptionModule [ "services" "apache-kafka" "hostname" ]
+      "Please see services.apache-kafka.settings.listeners and its documentation instead")
+    (mkRemovedOptionModule [ "services" "apache-kafka" "extraProperties" ]
+      "Please see services.apache-kafka.settings and its documentation instead")
+    (mkRemovedOptionModule [ "services" "apache-kafka" "serverProperties" ]
+      "Please see services.apache-kafka.settings and its documentation instead")
+  ];
 
-    environment.systemPackages = [cfg.package];
+  config = mkIf cfg.enable {
+    services.apache-kafka.configFiles.serverProperties = generator "server.properties" stringlySettings;
 
     users.users.apache-kafka = {
       isSystemUser = true;
       group = "apache-kafka";
       description = "Apache Kafka daemon user";
-      home = head cfg.logDirs;
     };
     users.groups.apache-kafka = {};
 
-    systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.logDirs;
+    systemd.tmpfiles.rules = map (logDir: "d '${logDir}' 0700 apache-kafka - - -") cfg.settings."log.dirs";
 
     systemd.services.apache-kafka = {
       description = "Apache Kafka Daemon";
       wantedBy = [ "multi-user.target" ];
       after = [ "network.target" ];
+      preStart = mkIf cfg.formatLogDirs
+        (if cfg.formatLogDirsIgnoreFormatted then ''
+          ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties} --ignore-formatted
+        '' else ''
+          if ${concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then
+            ${cfg.package}/bin/kafka-storage.sh format -t "${cfg.clusterId}" -c ${cfg.configFiles.serverProperties}
+          fi
+        '');
       serviceConfig = {
         ExecStart = ''
           ${cfg.jre}/bin/java \
             -cp "${cfg.package}/libs/*" \
-            -Dlog4j.configuration=file:${logConfig} \
+            -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \
             ${toString cfg.jvmOptions} \
             kafka.Kafka \
-            ${serverConfig}
+            ${cfg.configFiles.serverProperties}
         '';
         User = "apache-kafka";
         SuccessExitStatus = "0 143";
diff --git a/nixos/tests/kafka.nix b/nixos/tests/kafka.nix
index 864253fd8c73b..8635cbd3ff632 100644
--- a/nixos/tests/kafka.nix
+++ b/nixos/tests/kafka.nix
@@ -23,12 +23,14 @@ let
       kafka = { ... }: {
         services.apache-kafka = {
           enable = true;
-          extraProperties = ''
-            offsets.topic.replication.factor = 1
-            zookeeper.session.timeout.ms = 600000
-          '';
+          settings = {
+            "offsets.topic.replication.factor" = 1;
+            "zookeeper.session.timeout.ms" = 600000;
+            "zookeeper.connect" = [ "zookeeper1:2181" ];
+            "log.dirs" = [ "/tmp/apache-kafka" ];
+          };
+
           package = kafkaPackage;
-          zookeeper = "zookeeper1:2181";
         };
 
         networking.firewall.allowedTCPPorts = [ 9092 ];