Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/build-native-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ jobs:
- name: Build native test server (Docker non-musl)
if: matrix.os_family == 'linux' && matrix.musl == false
run: |
IMAGE_ID=$(docker build -q ./docker/native-image)
IMAGE_ID_FILE="$(mktemp)"
docker build --iidfile "$IMAGE_ID_FILE" ./docker/native-image
IMAGE_ID="$(cat "$IMAGE_ID_FILE")"
docker run \
--rm -w /github/workspace -v "$(pwd):/github/workspace" \
"$IMAGE_ID" \
Expand All @@ -96,7 +98,9 @@ jobs:
- name: Build native test server (Docker musl)
if: matrix.os_family == 'linux' && matrix.musl == true
run: |
IMAGE_ID=$(docker build -q ./docker/native-image-musl)
IMAGE_ID_FILE="$(mktemp)"
docker build --iidfile "$IMAGE_ID_FILE" ./docker/native-image-musl
IMAGE_ID="$(cat "$IMAGE_ID_FILE")"
docker run \
--rm -w /github/workspace -v "$(pwd):/github/workspace" \
"$IMAGE_ID" \
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
name: Continuous Integration
permissions:
checks: write
contents: read
on:
pull_request:
Expand Down
12 changes: 6 additions & 6 deletions docker/native-image-musl/dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ FROM ubuntu:24.04
ENV JAVA_HOME=/usr/lib64/graalvm/graalvm-community-java23
COPY --from=ghcr.io/graalvm/native-image-community:23 $JAVA_HOME $JAVA_HOME
ENV PATH="${JAVA_HOME}/bin:${PATH}"
RUN apt-get -y update --allow-releaseinfo-change && apt-get install -y -V git build-essential curl binutils
RUN apt-get -y update --allow-releaseinfo-change && apt-get install -y -V git build-essential curl ca-certificates binutils
COPY install-musl.sh /opt/install-musl.sh
RUN chmod +x /opt/install-musl.sh
WORKDIR /opt
# We need to build musl and zlibc with musl to for a static build
# See https://www.graalvm.org/21.3/reference-manual/native-image/StaticImages/index.html
# We need to build musl and zlib with musl for a static build.
# See https://www.graalvm.org/21.3/reference-manual/native-image/StaticImages/index.html.
RUN ./install-musl.sh
ENV MUSL_HOME=/opt/musl-toolchain
ENV PATH="$MUSL_HOME/bin:$PATH"
# Verify installation
# Verify installation.
RUN x86_64-linux-musl-gcc --version
# Avoid errors like: "fatal: detected dubious ownership in repository"
RUN git config --global --add safe.directory '*'
# Avoid errors like: "fatal: detected dubious ownership in repository".
RUN git config --global --add safe.directory '*'
41 changes: 24 additions & 17 deletions docker/native-image-musl/install-musl.sh
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
# Specify an installation directory for musl:
export MUSL_HOME=$PWD/musl-toolchain
#!/usr/bin/env bash
set -euo pipefail

# Download musl and zlib sources:
curl -O https://musl.libc.org/releases/musl-1.2.5.tar.gz
curl -O https://zlib.net/fossils/zlib-1.2.13.tar.gz
readonly MUSL_VERSION=1.2.5
readonly ZLIB_VERSION=1.2.13

# Build musl from source
tar -xzvf musl-1.2.5.tar.gz
cd musl-1.2.5 || exit
export MUSL_HOME="$PWD/musl-toolchain"

curl --fail --location --retry 5 --retry-all-errors --output "musl-${MUSL_VERSION}.tar.gz" \
"https://musl.libc.org/releases/musl-${MUSL_VERSION}.tar.gz"
curl --fail --location --retry 5 --retry-all-errors --output "zlib-${ZLIB_VERSION}.tar.gz" \
"https://github.com/madler/zlib/releases/download/v${ZLIB_VERSION}/zlib-${ZLIB_VERSION}.tar.gz"

# Build musl from source.
tar -xzf "musl-${MUSL_VERSION}.tar.gz"
cd "musl-${MUSL_VERSION}"
./configure --prefix=$MUSL_HOME --static
# The next operation may require privileged access to system resources, so use sudo
make && make install
make -j"$(nproc)"
make install
cd ..

# Install a symlink for use by native-image
ln -s $MUSL_HOME/bin/musl-gcc $MUSL_HOME/bin/x86_64-linux-musl-gcc
# Install a symlink for use by native-image.
ln -sf "$MUSL_HOME/bin/musl-gcc" "$MUSL_HOME/bin/x86_64-linux-musl-gcc"

# Extend the system path and confirm that musl is available by printing its version
# Extend the system path and confirm that musl is available by printing its version.
export PATH="$MUSL_HOME/bin:$PATH"
x86_64-linux-musl-gcc --version

# Build zlib with musl from source and install into the MUSL_HOME directory
tar -xzvf zlib-1.2.13.tar.gz
cd zlib-1.2.13 || exit
# Build zlib with musl from source and install into the MUSL_HOME directory.
tar -xzf "zlib-${ZLIB_VERSION}.tar.gz"
cd "zlib-${ZLIB_VERSION}"
CC=musl-gcc ./configure --prefix=$MUSL_HOME --static
make && make install
make -j"$(nproc)"
make install
cd ..
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ public enum SdkFlag {
* condition is resolved before the timeout.
*/
CANCEL_AWAIT_TIMER_ON_CONDITION(4),
/*
* Changes replay behavior of GetVersion to wait for the matching marker event before executing
* the callback.
*
* Introduced: 1.34.0
*
* Enabled: (pending)
*
* Bug: https://github.com/temporalio/sdk-java/issues/2796
*/
VERSION_WAIT_FOR_MARKER(5),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're getting more of these. We should think ahead at how to make sure this remains maintainable in the future.

Would you mind adding some comments on this flag and others above, indicating:

  • Which release introduced support for the flag (assume next patch release for new flags)
  • Which release turned it on (or "pending" if not yet turned on)
  • Ticket to the bug that the flag resolves.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done this.

UNKNOWN(Integer.MAX_VALUE);

private final int value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,20 @@ class InvocationStateMachine

private final int minSupported;
private final int maxSupported;
private final boolean waitForMarkerRecordedReplaying;
private final Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback;
private final Functions.Proc2<Integer, RuntimeException> resultCallback;

InvocationStateMachine(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
super(STATE_MACHINE_DEFINITION, VersionStateMachine.this.commandSink, stateMachineSink);
this.minSupported = minSupported;
this.maxSupported = maxSupported;
this.waitForMarkerRecordedReplaying = waitForMarkerRecordedReplaying;
this.upsertSearchAttributeCallback = upsertSearchAttributeCallback;
this.resultCallback = Objects.requireNonNull(callback);
}
Expand Down Expand Up @@ -264,9 +267,14 @@ void notifySkippedExecuting() {
}

void notifyMarkerCreatedReplaying() {
if (waitForMarkerRecordedReplaying) {
// Replay already preloads the version value, so delay the callback until the real marker
// event is matched.
return;
}
try {
// it's a replay and the version to return from the getVersion call should be preloaded from
// the history
// It's a replay and the version to return from the getVersion call should be preloaded
// from the history.
final boolean usePreloadedVersion = true;
validateVersionAndThrow(usePreloadedVersion);
notifyFromVersion(usePreloadedVersion);
Expand Down Expand Up @@ -295,6 +303,14 @@ void flushPreloadedVersionAndUpdateFromEventReplaying() {
Preconditions.checkState(
preloadedVersion != null, "preloadedVersion is expected to be initialized");
flushPreloadedVersionAndUpdateFromEvent(currentEvent);
if (waitForMarkerRecordedReplaying) {
try {
validateVersionAndThrow(false);
notifyFromVersion(false);
} catch (RuntimeException ex) {
notifyFromException(ex);
}
}
}

void notifySkippedReplaying() {
Expand Down Expand Up @@ -393,11 +409,16 @@ private VersionStateMachine(
public Integer getVersion(
int minSupported,
int maxSupported,
boolean waitForMarkerRecordedReplaying,
Functions.Func1<Integer, SearchAttributes> upsertSearchAttributeCallback,
Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism =
new InvocationStateMachine(
minSupported, maxSupported, upsertSearchAttributeCallback, callback);
minSupported,
maxSupported,
waitForMarkerRecordedReplaying,
upsertSearchAttributeCallback,
callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
// If the state is SKIPPED_REPLAYING that means we:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ public Integer getVersion(
return stateMachine.getVersion(
minSupported,
maxSupported,
checkSdkFlag(SdkFlag.VERSION_WAIT_FOR_MARKER),
(version) -> {
if (!workflowImplOptions.isEnableUpsertVersionSearchAttributes()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.temporal.internal.replay;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;

import com.uber.m3.tally.NoopScope;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.common.WorkflowExecutionHistory;
import io.temporal.internal.worker.QueryReplayHelper;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.worker.Worker;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest;
import io.temporal.workflow.versionTests.GetVersionInterleavedUpdateReplayTest.GreetingWorkflowImpl;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.junit.Test;

public class GetVersionInterleavedUpdateReplayTaskHandlerTest {
private static final String EXPECTED_FIRST_CHANGE_ID = "ChangeId1";
private static final String EXPECTED_SECOND_CHANGE_ID = "ChangeId2";

/** Regression test for the lower-level replay path behind the public replayer API. */
@Test
public void testReplayDirectQueryWorkflowTaskSucceeds() throws Throwable {
WorkflowExecutionHistory history =
GetVersionInterleavedUpdateReplayTest.captureReplayableHistory();
assertEquals(
Arrays.asList(EXPECTED_FIRST_CHANGE_ID, EXPECTED_SECOND_CHANGE_ID),
GetVersionInterleavedUpdateReplayTest.extractVersionChangeIds(history.getEvents()));

TestWorkflowEnvironment testEnvironment = TestWorkflowEnvironment.newInstance();
ReplayWorkflowRunTaskHandler runTaskHandler = null;
try {
Worker worker = testEnvironment.newWorker(GetVersionInterleavedUpdateReplayTest.TASK_QUEUE);
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);

ReplayWorkflowTaskHandler replayTaskHandler = getNonStickyReplayTaskHandler(worker);
PollWorkflowTaskQueueResponse.Builder replayTask = newReplayTask(history);
runTaskHandler = createStatefulHandler(replayTaskHandler, replayTask);

WorkflowServiceStubs service =
getField(replayTaskHandler, "service", WorkflowServiceStubs.class);
String namespace = getField(replayTaskHandler, "namespace", String.class);
ServiceWorkflowHistoryIterator historyIterator =
new ServiceWorkflowHistoryIterator(service, namespace, replayTask, new NoopScope());

QueryResult result =
runTaskHandler.handleDirectQueryWorkflowTask(replayTask, historyIterator);
assertNotNull(result);
assertFalse(result.isWorkflowMethodCompleted());
assertFalse(result.getResponsePayloads().isPresent());
} finally {
if (runTaskHandler != null) {
runTaskHandler.close();
}
testEnvironment.close();
}
}

private static PollWorkflowTaskQueueResponse.Builder newReplayTask(
WorkflowExecutionHistory history) {
return PollWorkflowTaskQueueResponse.newBuilder()
.setWorkflowExecution(history.getWorkflowExecution())
.setWorkflowType(
history
.getHistory()
.getEvents(0)
.getWorkflowExecutionStartedEventAttributes()
.getWorkflowType())
.setStartedEventId(Long.MAX_VALUE)
.setPreviousStartedEventId(Long.MAX_VALUE)
.setHistory(history.getHistory())
.setQuery(WorkflowQuery.newBuilder().setQueryType(WorkflowClient.QUERY_TYPE_REPLAY_ONLY));
}

private static ReplayWorkflowTaskHandler getNonStickyReplayTaskHandler(Worker worker)
throws Exception {
Object workflowWorker = getField(worker, "workflowWorker", Object.class);
QueryReplayHelper queryReplayHelper =
getField(workflowWorker, "queryReplayHelper", QueryReplayHelper.class);
return getField(queryReplayHelper, "handler", ReplayWorkflowTaskHandler.class);
}

private static ReplayWorkflowRunTaskHandler createStatefulHandler(
ReplayWorkflowTaskHandler replayTaskHandler, PollWorkflowTaskQueueResponse.Builder replayTask)
throws Exception {
Method method =
ReplayWorkflowTaskHandler.class.getDeclaredMethod(
"createStatefulHandler",
PollWorkflowTaskQueueResponse.Builder.class,
com.uber.m3.tally.Scope.class);
method.setAccessible(true);
return (ReplayWorkflowRunTaskHandler)
method.invoke(replayTaskHandler, replayTask, new NoopScope());
}

private static <T> T getField(Object target, String fieldName, Class<T> expectedType)
throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return expectedType.cast(field.get(target));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void testBasicWorkerVersioning() {
DescribeWorkerDeploymentResponse describeResp1 = waitUntilWorkerDeploymentVisible(v1);

setCurrentVersion(v1, describeResp1.getConflictToken());
waitForRoutingConfigPropagation(v1);

// Start workflow 1 which will use the 1.0 worker on auto-upgrade
TestWorkflows.QueryableWorkflow wf1 =
Expand All @@ -160,6 +161,7 @@ public void testBasicWorkerVersioning() {
new WorkerDeploymentVersion(testWorkflowRule.getDeploymentName(), "2.0");
DescribeWorkerDeploymentResponse describeResp2 = waitUntilWorkerDeploymentVisible(v2);
setCurrentVersion(v2, describeResp2.getConflictToken());
waitForRoutingConfigPropagation(v2);

TestWorkflows.QueryableWorkflow wf2 =
testWorkflowRule.newWorkflowStubTimeoutOptions(
Expand All @@ -173,6 +175,7 @@ public void testBasicWorkerVersioning() {

// Set current version to 3.0
setCurrentVersion(v3, describeResp3.getConflictToken());
waitForRoutingConfigPropagation(v3);

TestWorkflows.QueryableWorkflow wf3 =
testWorkflowRule.newWorkflowStubTimeoutOptions(
Expand Down Expand Up @@ -224,8 +227,10 @@ public void testRampWorkerVersioning() {
// Set cur ver to 1 & ramp 100% to 2
SetWorkerDeploymentCurrentVersionResponse setCurR =
setCurrentVersion(v1, describeResp1.getConflictToken());
waitForRoutingConfigPropagation(v1);
SetWorkerDeploymentRampingVersionResponse rampResp =
setRampingVersion(v2, 100, setCurR.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
// Run workflows and verify they've both started & run on v2
for (int i = 0; i < 3; i++) {
String res = runWorkflow("versioning-ramp-100");
Expand All @@ -234,12 +239,14 @@ public void testRampWorkerVersioning() {
// Set ramp to 0, and see them start on v1
SetWorkerDeploymentRampingVersionResponse rampResp2 =
setRampingVersion(v2, 0, rampResp.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
for (int i = 0; i < 3; i++) {
String res = runWorkflow("versioning-ramp-0");
Assert.assertEquals("version-v1", res);
}
// Set to 50% and see we eventually will have one run on v1 and one on v2
setRampingVersion(v2, 50, rampResp2.getConflictToken());
waitForRoutingConfigPropagation(v1, v2);
HashSet<String> seenRanOn = new HashSet<>();
Eventually.assertEventually(
Duration.ofSeconds(30),
Expand Down
Loading
Loading