Skip to content

feat: support Kafka protocol for functions#843

Open
jiangpengcheng wants to merge 26 commits into
masterfrom
support_kafka_runtime
Open

feat: support Kafka protocol for functions#843
jiangpengcheng wants to merge 26 commits into
masterfrom
support_kafka_runtime

Conversation

@jiangpengcheng
Copy link
Copy Markdown
Member

We have a generic runtime which supported Kafka protocol, so support it here

Motivation

Explain here the context, and why you're making that change. What is the problem you're trying to solve.

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@jiangpengcheng jiangpengcheng requested review from a team and freeznet as code owners May 29, 2026 03:22
@github-actions github-actions Bot added the no-need-doc This pr does not need any document label May 29, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Kafka (KoP) protocol support for Function resources (targeting the generic runtime), including CRD schema updates, controller-side config generation, webhook validation, and integration coverage to verify Kafka IO end-to-end in CI.

Changes:

  • Add Kafka messaging configuration to the API/CRDs and generate Kafka runtime config (_kafka_config) for generic-runtime functions.
  • Extend admission webhook validation to handle Kafka for Functions and explicitly reject Kafka messaging for Source/Sink resources.
  • Add unit + e2e/integration tests and adjust CI/e2e cluster settings to enable KoP and improve test stability.

Reviewed changes

Copilot reviewed 37 out of 38 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
Dockerfile Adjust Go build flags for the manager image build.
operator.Dockerfile Adjust Go build flags for the operator image build.
redhat.Dockerfile Adjust Go build flags for the Red Hat-certified build.
api/compute/v1alpha1/common.go Introduce Kafka messaging types and add kafka under Messaging (inlined into specs).
api/compute/v1alpha1/zz_generated.deepcopy.go Add generated deepcopy implementations for Kafka-related API types.
controllers/spec/common.go Extend generic runtime command building to support messaging service type + client auth args.
controllers/spec/function.go Wire Kafka-specific auth/env/args into generic runtime execution path; guard Pulsar-only wiring for Kafka functions.
controllers/spec/function_test.go Add test asserting generic runtime command/mount behavior when Kafka messaging is configured.
controllers/spec/utils.go Generate _kafka_config into user config and build Kafka input/output spec maps.
controllers/spec/utils_test.go Add test validating Kafka config is present/serialized correctly in function details.
pkg/webhook/validate.go Add Kafka-specific validation helpers and Function messaging validation branch for Kafka.
pkg/webhook/function_webhook.go Use Kafka-specific input/output validation and validate Kafka+runtime compatibility on create.
pkg/webhook/source_webhook.go Reject Kafka messaging for Source resources (create/update).
pkg/webhook/sink_webhook.go Reject Kafka messaging for Sink resources (create/update).
pkg/webhook/validate_test.go Add unit tests for Kafka-related validation helpers.
pkg/webhook/kafka_webhook_test.go Add webhook update tests ensuring Source/Sink reject Kafka messaging.
config/samples/compute_v1alpha1_function_crypto.yaml Add explicit schema type in sample source spec.
config/crd/bases/compute.functionmesh.io_sources.yaml Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields.
config/crd/bases/compute.functionmesh.io_sinks.yaml Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields.
config/crd/bases/compute.functionmesh.io_functions.yaml Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields.
config/crd/bases/compute.functionmesh.io_functionmeshes.yaml Regenerate CRD schema (controller-gen v0.16.5) including Kafka fields.
config/crd/bases/compute.functionmesh.io_backendconfigs.yaml Regenerate CRD schema (controller-gen v0.16.5).
charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sources.yaml Regenerate Helm CRD template for Sources (controller-gen v0.16.5 output).
charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-sinks.yaml Regenerate Helm CRD template for Sinks (controller-gen v0.16.5 output).
charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml Regenerate Helm CRD template for Functions (controller-gen v0.16.5 output).
charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml Regenerate Helm CRD template for FunctionMeshes (controller-gen v0.16.5 output).
charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-backendconfigs.yaml Regenerate Helm CRD template for BackendConfigs (controller-gen v0.16.5 output).
.github/workflows/test-integration-skywalking-e2e.yml Adjust runner tool cache usage and report disk usage for troubleshooting.
.github/actions/ssh-access/action.yml Update upterm install/version and simplify SSH/upterm session handling.
.ci/tests/integration/e2e.yaml Add a new e2e case for a generic Kafka function.
.ci/tests/integration/cases/generic-kafka-function/verify.sh New e2e verifier that drives Kafka IO via a Kafka client pod.
.ci/tests/integration/cases/generic-kafka-function/manifests.yaml New sample Function manifest configured for Kafka + generic runtime.
.ci/tests/integration/cases/generic-kafka-function/kafka-client.yaml New Kafka client pod/config used by the e2e verifier.
.ci/tests/integration/cases/logging-window-function/verify.sh Improve stability via retries and pre-test cleanup.
.ci/tests/integration/cases/crypto-function/verify.sh Improve stability by pre-creating topics/schemas and adding cleanup.
.ci/tests/integration/cases/crypto-function/manifests.yaml Add explicit schema type for crypto test inputs.
.ci/helm.sh Add Kafka helper functions and improve backlog verification robustness.
.ci/clusters/values_skywalking_e2e_cluster.yaml Enable KoP in the e2e cluster values and adjust broker/bookkeeper settings for tests.
Files not reviewed (1)
  • api/compute/v1alpha1/zz_generated.deepcopy.go: Language not supported

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread controllers/spec/utils.go
Comment thread pkg/webhook/validate.go
freeznet
freeznet previously approved these changes Jun 2, 2026
}
}

func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make sure the cleanup job only works for pulsar?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7dacb77. MakeFunctionCleanUpJob now returns nil for Kafka functions / missing Pulsar config, ApplyFunctionCleanUpJob handles that nil cleanup job without creating or triggering Pulsar cleanup, and the webhook rejects Kafka + cleanupSubscription early. Added regression coverage for the Kafka cleanup skip and validation rejection.

Comment thread controllers/spec/utils.go
for topic, conf := range function.Spec.Input.SourceSpecs {
inputSpecs[kafkaInputSpecKey(function, topic)] = makeKafkaInputSpecFromConsumerConfig(function, topic, conf)
}
for topic, schemaConfig := range kafka.InputSchemaConfigs {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In makeKafkaInputSpecs, the InputSchemaConfigs override loop recomputes the schema using function.Spec.Input.SourceSpecs[topic].SchemaType:

For a topic declared via Input.Topics (not SourceSpecs) with Input.TypeClassName: string, the first loop correctly sets {type: "string"}. But here SourceSpecs[topic] is the zero-value ConsumerConfig → SchemaType == "" → makeKafkaSchema falls back to "bytes". So a user who only wants to attach a subject/version (leaving KafkaSchemaConfig.Type empty) silently loses their declared type and gets bytes → wrong deserialization at runtime.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 22713e7. The InputSchemaConfigs override now preserves the existing kafka_schema type and only overlays the configured schema fields, so topic-level TypeClassName is not reset to bytes when only subject/version is provided. Added TestConvertFunctionDetailsKafkaInputSchemaConfigKeepsDeclaredTopicType.

@@ -233,6 +238,8 @@
func makeFunctionCommand(function *v1alpha1.Function) []string {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateKafkaMessagingRuntime (webhook-only) is the single place that rejects Kafka + non-generic runtime. The spec builder (makeFunctionCommand) has no such guard: a Kafka function with a Java/Python runtime falls into the Java/Python branch, ignores Spec.Kafka entirely, and emits a broken Pulsar command (pulsar_serviceurl=$brokerServiceURL with no Pulsar config). Similarly, GenericRuntime.FunctionFile == "" makes makeFunctionCommand return nil → a StatefulSet with an empty container command, and validateKafkaMessagingRuntime only checks GenericRuntime != nil. With the webhook disabled (make run, or clusters that turn off validation) these produce silently broken pods instead of rejections.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 22713e7. validateKafkaMessagingRuntime now requires genericRuntime.functionFile for Kafka messaging, and makeFunctionCommand returns nil for Kafka configs that cannot use the generic runtime instead of falling through to Java/Python Pulsar commands. Added regression coverage for both cases.

Comment thread pkg/webhook/validate.go Outdated
return allErrs
}

func validateKafkaInputOutput(input *v1alpha1.InputConf, output *v1alpha1.OutputConf,
Copy link
Copy Markdown
Member

@freeznet freeznet Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateKafkaInputOutput duplicates almost the entire body of validateInputOutput (same empty-input check, receiver-queue-size loop, crypto check, output-collision loop) minus topic-name validation. They will drift. Parameterize the original with a skipTopicNameValidation bool instead of forking. Separately, the InputSchemaConfigs override loop re-applies schema that makeKafkaInputSpecFromConsumerConfig already set for SourceSpecs topics — wasted recompute and the source of the two bugs above; kafkaSecurityProtocol(kafka) is also computed twice in makeKafkaConfig (compute once, reuse).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 22713e7. validateKafkaInputOutput now delegates to the shared input/output validator with only topic-name validation disabled, so shared checks such as output producer crypto validation stay in sync. Also removed the schema re-compute path that caused the input schema override issue and reused the computed Kafka security.protocol in makeKafkaConfig.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no-need-doc This pr does not need any document

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants