Skip to content

Commit

Permalink
Simplify TaskPoller's WorkflowTaskHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed May 10, 2024
1 parent 3de71f3 commit 4c7b2c4
Show file tree
Hide file tree
Showing 26 changed files with 278 additions and 467 deletions.
30 changes: 11 additions & 19 deletions tests/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ func (s *FunctionalSuite) TestActivityHeartBeatWorkflow_Success() {
activityCount := int32(1)
activityCounter := int32(0)

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if activityCounter < activityCount {
activityCounter++

Expand Down Expand Up @@ -222,8 +221,7 @@ func (s *FunctionalSuite) TestActivityRetry() {
activitiesScheduled := false
var activityAScheduled, activityAFailed, activityBScheduled, activityBTimeout *historypb.HistoryEvent

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if !activitiesScheduled {
activitiesScheduled = true

Expand Down Expand Up @@ -260,8 +258,8 @@ func (s *FunctionalSuite) TestActivityRetry() {
HeartbeatTimeout: durationpb.New(0 * time.Second),
}}},
}, nil
} else if previousStartedEventID > 0 {
for _, event := range history.Events[previousStartedEventID:] {
} else if task.PreviousStartedEventId > 0 {
for _, event := range task.History.Events[task.PreviousStartedEventId:] {
switch event.GetEventType() {
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
switch event.GetActivityTaskScheduledEventAttributes().GetActivityId() {
Expand Down Expand Up @@ -432,8 +430,7 @@ func (s *FunctionalSuite) TestActivityRetry_Infinite() {
workflowComplete := false
activitiesScheduled := false

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if !activitiesScheduled {
activitiesScheduled = true

Expand Down Expand Up @@ -538,8 +535,7 @@ func (s *FunctionalSuite) TestActivityHeartBeatWorkflow_Timeout() {
activityCount := int32(1)
activityCounter := int32(0)

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

s.Logger.Info("Calling WorkflowTask Handler", tag.Counter(int(activityCounter)), tag.Number(int64(activityCount)))

Expand Down Expand Up @@ -643,14 +639,13 @@ func (s *FunctionalSuite) TestTryActivityCancellationFromWorkflow() {
requestCancellation := false
activityScheduledID := int64(0)

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if scheduleActivity {
activityCounter++
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter))

activityScheduledID = startedEventID + 2
activityScheduledID = task.StartedEventId + 2
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
Expand Down Expand Up @@ -788,14 +783,13 @@ func (s *FunctionalSuite) TestActivityCancellationNotStarted() {
requestCancellation := false
activityScheduledID := int64(0)

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if scheduleActivity {
activityCounter++
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter))
s.Logger.Info("Scheduling activity")
activityScheduledID = startedEventID + 2
activityScheduledID = task.StartedEventId + 2
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK,
Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{
Expand Down Expand Up @@ -1022,9 +1016,7 @@ func (s *FunctionalSuite) TestActivityHeartBeat_RecordIdentity() {

workflowComplete := false
workflowNextCmd := enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK
wtHandler := func(
execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History,
) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
switch workflowNextCmd {
case enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
workflowNextCmd = enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION
Expand Down
26 changes: 4 additions & 22 deletions tests/advanced_visibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
filterpb "go.temporal.io/api/filter/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
Expand Down Expand Up @@ -280,9 +279,7 @@ func (s *AdvancedVisibilitySuite) TestListWorkflow_SearchAttribute() {

searchAttributes := s.createSearchAttributes()
// test upsert
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
upsertCommand := &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,
Attributes: &commandpb.Command_UpsertWorkflowSearchAttributesCommandAttributes{UpsertWorkflowSearchAttributesCommandAttributes: &commandpb.UpsertWorkflowSearchAttributesCommandAttributes{
Expand Down Expand Up @@ -1205,14 +1202,7 @@ func (s *AdvancedVisibilitySuite) TestUpsertWorkflowExecutionSearchAttributes()
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

commandCount := 0
wtHandler := func(
execution *commonpb.WorkflowExecution,
wt *commonpb.WorkflowType,
previousStartedEventID,
startedEventID int64,
history *historypb.History,
) ([]*commandpb.Command, error) {

wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
upsertCommand := &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,
Attributes: &commandpb.Command_UpsertWorkflowSearchAttributesCommandAttributes{
Expand Down Expand Up @@ -1503,14 +1493,7 @@ func (s *AdvancedVisibilitySuite) TestModifyWorkflowExecutionProperties() {
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

commandCount := 0
wtHandler := func(
execution *commonpb.WorkflowExecution,
wt *commonpb.WorkflowType,
previousStartedEventID,
startedEventID int64,
history *historypb.History,
) ([]*commandpb.Command, error) {

wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
modifyCommand := &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES,
Attributes: &commandpb.Command_ModifyWorkflowPropertiesCommandAttributes{
Expand Down Expand Up @@ -1779,8 +1762,7 @@ func (s *AdvancedVisibilitySuite) TestUpsertWorkflowExecution_InvalidKey() {

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

upsertCommand := &commandpb.Command{
CommandType: enumspb.COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES,
Expand Down
13 changes: 3 additions & 10 deletions tests/archival.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
Expand Down Expand Up @@ -343,18 +342,12 @@ func (s *ArchivalSuite) startAndFinishWorkflow(
expectedActivityID := int32(1)
runCounter := 1

wtHandler := func(
execution *commonpb.WorkflowExecution,
wt *commonpb.WorkflowType,
previousStartedEventID int64,
startedEventID int64,
history *historypb.History,
) ([]*commandpb.Command, error) {
branchToken, err := s.getBranchToken(namespace, execution)
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
branchToken, err := s.getBranchToken(namespace, task.WorkflowExecution)
s.NoError(err)

workflowInfos[runCounter-1] = archivalWorkflowInfo{
execution: execution,
execution: task.WorkflowExecution,
branchToken: branchToken,
}

Expand Down
41 changes: 17 additions & 24 deletions tests/cancel_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (s *FunctionalSuite) TestExternalRequestCancelWorkflowExecution() {

s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION,
Expand Down Expand Up @@ -177,8 +176,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetRunnin
s.Logger.Info("StartWorkflowExecution on foreign namespace", tag.WorkflowNamespace(s.foreignNamespace), tag.WorkflowRunID(we2.RunId))

cancellationSent := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

if !cancellationSent {
cancellationSent = true
Expand All @@ -194,7 +192,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetRunnin

// Find cancel requested event and verify it.
var cancelRequestEvent *historypb.HistoryEvent
for _, x := range history.Events {
for _, x := range task.History.Events {
if x.EventType == enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
cancelRequestEvent = x
}
Expand All @@ -219,12 +217,11 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetRunnin
T: s.T(),
}

foreignwtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
foreignwtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

// Find cancel requested event and verify it.
var cancelRequestEvent *historypb.HistoryEvent
for _, x := range history.Events {
for _, x := range task.History.Events {
if x.EventType == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
cancelRequestEvent = x
}
Expand Down Expand Up @@ -313,8 +310,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetFinish
s.Logger.Info("StartWorkflowExecution on foreign namespace", tag.WorkflowNamespace(s.foreignNamespace), tag.WorkflowRunID(we2.RunId))

cancellationSent := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

if !cancellationSent {
cancellationSent = true
Expand All @@ -330,7 +326,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetFinish

// Find cancel requested event and verify it.
var cancelRequestEvent *historypb.HistoryEvent
for _, x := range history.Events {
for _, x := range task.History.Events {
if x.EventType == enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
cancelRequestEvent = x
}
Expand All @@ -355,12 +351,11 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetFinish
T: s.T(),
}

foreignwtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
foreignwtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

// Find cancel requested event not present
var cancelRequestEvent *historypb.HistoryEvent
for _, x := range history.Events {
for _, x := range task.History.Events {
if x.EventType == enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
cancelRequestEvent = x
}
Expand Down Expand Up @@ -430,8 +425,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetNotFou
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

cancellationSent := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {

if !cancellationSent {
cancellationSent = true
Expand All @@ -446,7 +440,7 @@ func (s *FunctionalSuite) TestRequestCancelWorkflowCommandExecution_TargetNotFou

// Find cancel requested event and verify it.
var cancelRequestEvent *historypb.HistoryEvent
for _, x := range history.Events {
for _, x := range task.History.Events {
if x.EventType == enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED {
cancelRequestEvent = x
}
Expand Down Expand Up @@ -528,20 +522,19 @@ func (s *FunctionalSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
var requestCancelEvent *historypb.HistoryEvent
var workflowtaskFailedEvent *historypb.HistoryEvent
workflowComplete := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {
wtHandler := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
if !childCancelled {
startEvent := history.Events[0]
startEvent := task.History.Events[0]
if startEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED {
return nil, errors.New("first event is not workflow execution started")
}

workflowTaskScheduledEvent := history.Events[1]
workflowTaskScheduledEvent := task.History.Events[1]
if workflowTaskScheduledEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED {
return nil, errors.New("second event is not workflow task scheduled")
}

cancelRequestedEvent := history.Events[2]
cancelRequestedEvent := task.History.Events[2]
if cancelRequestedEvent.EventType != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED {
return nil, errors.New("third event is not cancel requested")
}
Expand All @@ -567,11 +560,11 @@ func (s *FunctionalSuite) TestImmediateChildCancellation_WorkflowTaskFailed() {
}}, nil
}

if previousStartedEventID != 0 {
if task.PreviousStartedEventId != 0 {
return nil, errors.New("previous started decision moved unexpectedly after first failed workflow task")
}
// Validate child workflow as cancelled
for _, event := range history.Events[previousStartedEventID:] {
for _, event := range task.History.Events[task.PreviousStartedEventId:] {
s.Logger.Info(fmt.Sprintf("Processing EventID: %v, Event: %v", event.GetEventId(), event))
switch event.GetEventType() {
case enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED:
Expand Down

0 comments on commit 4c7b2c4

Please sign in to comment.