feat: support Kafka protocol for functions#843
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds Kafka (KoP) protocol support for Function resources (targeting the generic runtime), including CRD schema updates, controller-side config generation, webhook validation, and integration coverage to verify Kafka IO end-to-end in CI.
Changes:
- Add Kafka messaging configuration to the API/CRDs and generate Kafka runtime config (
_kafka_config) for generic-runtime functions. - Extend admission webhook validation to handle Kafka for Functions and explicitly reject Kafka messaging for Source/Sink resources.
- Add unit + e2e/integration tests and adjust CI/e2e cluster settings to enable KoP and improve test stability.
Reviewed changes
Copilot reviewed 37 out of 38 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| Dockerfile | Adjust Go build flags for the manager image build. |
| operator.Dockerfile | Adjust Go build flags for the operator image build. |
| redhat.Dockerfile | Adjust Go build flags for the Red Hat-certified build. |
| api/compute/v1alpha1/common.go | Introduce Kafka messaging types and add kafka under Messaging (inlined into specs). |
| api/compute/v1alpha1/zz_generated.deepcopy.go | Add generated deepcopy implementations for Kafka-related API types. |
| controllers/spec/common.go | Extend generic runtime command building to support messaging service type + client auth args. |
| controllers/spec/function.go | Wire Kafka-specific auth/env/args into generic runtime execution path; guard Pulsar-only wiring for Kafka functions. |
| controllers/spec/function_test.go | Add test asserting generic runtime command/mount behavior when Kafka messaging is configured. |
| controllers/spec/utils.go | Generate _kafka_config into user config and build Kafka input/output spec maps. |
| controllers/spec/utils_test.go | Add test validating Kafka config is present/serialized correctly in function details. |
| pkg/webhook/validate.go | Add Kafka-specific validation helpers and Function messaging validation branch for Kafka. |
| pkg/webhook/function_webhook.go | Use Kafka-specific input/output validation and validate Kafka+runtime compatibility on create. |
| pkg/webhook/source_webhook.go | Reject Kafka messaging for Source resources (create/update). |
| pkg/webhook/sink_webhook.go | Reject Kafka messaging for Sink resources (create/update). |
| pkg/webhook/validate_test.go | Add unit tests for Kafka-related validation helpers. |
| pkg/webhook/kafka_webhook_test.go | Add webhook update tests ensuring Source/Sink reject Kafka messaging. |
| config/samples/compute_v1alpha1_function_crypto.yaml | Add explicit schema type in sample source spec. |
| config/crd/bases/compute.functionmesh.io_sources.yaml | Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields. |
| config/crd/bases/compute.functionmesh.io_sinks.yaml | Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields. |
| config/crd/bases/compute.functionmesh.io_functions.yaml | Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields. |
| config/crd/bases/compute.functionmesh.io_functionmeshes.yaml | Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields. |
| config/crd/bases/compute.functionmesh.io_backendconfigs.yaml | Regenerate CRD schema (controller-gen v0.16.5). |
| charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml | Regenerate Helm CRD template for Sources (controller-gen v0.16.5 output). |
| charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml | Regenerate Helm CRD template for Sinks (controller-gen v0.16.5 output). |
| charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml | Regenerate Helm CRD template for Functions (controller-gen v0.16.5 output). |
| charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml | Regenerate Helm CRD template for FunctionMeshes (controller-gen v0.16.5 output). |
| charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml | Regenerate Helm CRD template for BackendConfigs (controller-gen v0.16.5 output). |
| .github/workflows/test-integration-skywalking-e2e.yml | Adjust runner tool cache usage and report disk usage for troubleshooting. |
| .github/actions/ssh-access/action.yml | Update upterm install/version and simplify SSH/upterm session handling. |
| .ci/tests/integration/e2e.yaml | Add a new e2e case for a generic Kafka function. |
| .ci/tests/integration/cases/generic-kafka-function/verify.sh | New e2e verifier that drives Kafka IO via a Kafka client pod. |
| .ci/tests/integration/cases/generic-kafka-function/manifests.yaml | New sample Function manifest configured for Kafka + generic runtime. |
| .ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml | New Kafka client pod/config used by the e2e verifier. |
| .ci/tests/integration/cases/logging-window-function/verify.sh | Improve stability via retries and pre-test cleanup. |
| .ci/tests/integration/cases/crypto-function/verify.sh | Improve stability by pre-creating topics/schemas and adding cleanup. |
| .ci/tests/integration/cases/crypto-function/manifests.yaml | Add explicit schema type for crypto test inputs. |
| .ci/helm.sh | Add Kafka helper functions and improve backlog verification robustness. |
| .ci/clusters/values_skywalking_e2e_cluster.yaml | Enable KoP in the e2e cluster values and adjust broker/bookkeeper settings for tests. |
Files not reviewed (1)
- api/compute/v1alpha1/zz_generated.deepcopy.go: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } | ||
|
|
||
| func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job { |
There was a problem hiding this comment.
Do we need to make sure the cleanup job only works for pulsar?
There was a problem hiding this comment.
Fixed in 7dacb77. MakeFunctionCleanUpJob now returns nil for Kafka functions / missing Pulsar config, ApplyFunctionCleanUpJob handles that nil cleanup job without creating or triggering Pulsar cleanup, and the webhook rejects Kafka + cleanupSubscription early. Added regression coverage for the Kafka cleanup skip and validation rejection.
| for topic, conf := range function.Spec.Input.SourceSpecs { | ||
| inputSpecs[kafkaInputSpecKey(function, topic)] = makeKafkaInputSpecFromConsumerConfig(function, topic, conf) | ||
| } | ||
| for topic, schemaConfig := range kafka.InputSchemaConfigs { |
There was a problem hiding this comment.
In makeKafkaInputSpecs, the InputSchemaConfigs override loop recomputes the schema using function.Spec.Input.SourceSpecs[topic].SchemaType:
For a topic declared via Input.Topics (not SourceSpecs) with Input.TypeClassName: string, the first loop correctly sets {type: "string"}. But here SourceSpecs[topic] is the zero-value ConsumerConfig → SchemaType == "" → makeKafkaSchema falls back to "bytes". So a user who only wants to attach a subject/version (leaving KafkaSchemaConfig.Type empty) silently loses their declared type and gets bytes → wrong deserialization at runtime.
There was a problem hiding this comment.
Fixed in 22713e7. The InputSchemaConfigs override now preserves the existing kafka_schema type and only overlays the configured schema fields, so topic-level TypeClassName is not reset to bytes when only subject/version is provided. Added TestConvertFunctionDetailsKafkaInputSchemaConfigKeepsDeclaredTopicType.
| @@ -233,6 +238,8 @@ | |||
| func makeFunctionCommand(function *v1alpha1.Function) []string { | |||
There was a problem hiding this comment.
validateKafkaMessagingRuntime (webhook-only) is the single place that rejects Kafka + non-generic runtime. The spec builder (makeFunctionCommand) has no such guard: a Kafka function with a Java/Python runtime falls into the Java/Python branch, ignores Spec.Kafka entirely, and emits a broken Pulsar command (pulsar_serviceurl=$brokerServiceURL with no Pulsar config). Similarly, GenericRuntime.FunctionFile == "" makes makeFunctionCommand return nil → a StatefulSet with an empty container command, and validateKafkaMessagingRuntime only checks GenericRuntime != nil. With the webhook disabled (make run, or clusters that turn off validation) these produce silently broken pods instead of rejections.
There was a problem hiding this comment.
Fixed in 22713e7. validateKafkaMessagingRuntime now requires genericRuntime.functionFile for Kafka messaging, and makeFunctionCommand returns nil for Kafka configs that cannot use the generic runtime instead of falling through to Java/Python Pulsar commands. Added regression coverage for both cases.
| return allErrs | ||
| } | ||
|
|
||
| func validateKafkaInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf, |
There was a problem hiding this comment.
validateKafkaInputOutput duplicates almost the entire body of validateInputOutput (same empty-input check, receiver-queue-size loop, crypto check, output-collision loop) minus topic-name validation. They will drift. Parameterize the original with a skipTopicNameValidation bool instead of forking. Separately, the InputSchemaConfigs override loop re-applies schema that makeKafkaInputSpecFromConsumerConfig already set for SourceSpecs topics — wasted recompute and the source of the two bugs above; kafkaSecurityProtocol(kafka) is also computed twice in makeKafkaConfig (compute once, reuse).
There was a problem hiding this comment.
Fixed in 22713e7. validateKafkaInputOutput now delegates to the shared input/output validator with only topic-name validation disabled, so shared checks such as output producer crypto validation stay in sync. Also removed the schema re-compute path that caused the input schema override issue and reused the computed Kafka security.protocol in makeKafkaConfig.
We have a generic runtime which supported Kafka protocol, so support it here
Motivation
Explain here the context, and why you're making that change. What is the problem you're trying to solve.
Modifications
Describe the modifications you've done.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
Check the box below.
Need to update docs?
doc-required(If you need help on updating docs, create a doc issue)
no-need-doc(Please explain why)
doc(If this PR contains doc changes)