Feat: implement the processor server#4263
Feat: implement the processor server#4263lacroixthomas wants to merge 11 commits intoagones-dev:mainfrom
Conversation
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
1 similar comment
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: 0dfb8b31-e31f-4756-9245-fc6a5ef6c6fe Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
/gcbrun |
|
Build Succeeded 🥳 Build Id: be7478b2-5c1c-498c-8498-6d3e35aaedb5 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
1 similar comment
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: 91746f34-8ce0-429a-b492-139ab1d0e898 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
/gcbrun |
|
Build Succeeded 🥳 Build Id: e64c3f64-8657-4721-8904-83a4c840bd47 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
Build Succeeded 🥳 Build Id: 1241bbbd-7bde-46ec-95ec-3b8712be910f The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
6351400 to
c239fdb
Compare
|
Build Failed 😭 Build Id: 97a4c8b6-9fdc-444b-8694-653b14e80797 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
Build Failed 😭 Build Id: 04dff753-1c63-4882-bb56-0f2ec1158e07 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
Build Failed 😭 Build Id: 4aa098c0-e599-4386-8bd8-7f51ecf4034f Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
56cf40e to
de22bdc
Compare
|
Build Failed 😭 Build Id: 0e0967ca-55e6-4012-9f36-c8c00cf9108c Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
I'll come back to this task in the next days / week, didn't forget about it ! |
de22bdc to
3078f81
Compare
|
Build Failed 😭 Build Id: af923d1a-67cc-48c6-9039-634cda0feb28 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
3078f81 to
ddd9a28
Compare
|
Build Failed 😭 Build Id: 1c732008-568e-45a8-9455-40c393278ecd Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: 792e8178-8cf5-4479-b4f8-1f6dbd845439 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
87c55c4 to
2518c80
Compare
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: 50aac12c-ba88-4aaa-ab6e-cba8ea5abe7d Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: ab5a33cf-8db6-4a17-aea3-5f6e08f2cdd7 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: d29eaa3f-625e-4ebb-99cb-cb732238001a The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: 58c73013-cbea-4bb8-8ef0-b0f2bc15ac98 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
|
I'll create another PRs for TLS, new metrics and doc |
|
So good for review now? |
|
Yep all good 👍🏼 |
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: b32d103d-691d-4165-a193-c5ecd6e44722 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
markmandel
left a comment
There was a problem hiding this comment.
Big PR!
I reviewed what I could - but mostly trusting that you remember the design in question 😄 I went over it again, and i think I remember.
Found some old things along the way as well I flagged as well.
| replicas: 2 | ||
| maxBatchSize: 100 | ||
| pullInterval: 200ms | ||
| allocationBatchWaitTime: 50ms |
There was a problem hiding this comment.
Just noting we're going smaller than the 500ms which was the original.
Not necessarily a bad thing, just noticing that we're doing that. Do we have a reason as to why? Guessng it's a combo deal with the pullInterval as well?
There was a problem hiding this comment.
The flow might need more explanations, but basically this is the steps for the allocations::
- Requests goes to the clients hot batch
- Server sends a pull request to each clients (pullInterval 200 ms)
- Client send the hotbatch to the server
- Server calls the processallocation for each client batch requests (each requests is pushed into the allocator's internal queue, if there is new ones in the 50ms interval they will be batched / processed togethers)
- Returns to each clients
I'll definitively need to add documentation about it, or maybe simplify some part if they don't make sense, or too much batch
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| p.logger.Info("Stream handling stopping due to context cancellation") | ||
| return ctx.Err() | ||
| default: | ||
| // Receive message from processor | ||
| msg, err := stream.Recv() | ||
| if err != nil { | ||
| p.logger.WithError(err).Error("Failed to receive message from processor") | ||
| return errors.Wrap(err, "stream recv error") | ||
| } | ||
|
|
||
| // Handle message based on its payload type | ||
| switch payload := msg.GetPayload().(type) { | ||
| case *allocationpb.ProcessorMessage_Pull: | ||
| // Pull request: queue for async handling | ||
| select { | ||
| case pullRequestChan <- struct{}{}: | ||
| p.logger.Debug("Pull request queued successfully") | ||
| default: | ||
| p.logger.Warn("Pull request queue full - dropping request") | ||
| } | ||
|
|
||
| case *allocationpb.ProcessorMessage_BatchResponse: | ||
| // Batch response: handle immediately | ||
| p.handleBatchResponse(payload.BatchResponse) | ||
|
|
||
| default: | ||
| // Unknown message type | ||
| p.logger.WithField("messageType", fmt.Sprintf("%T", payload)).Warn("Received unknown message type from processor") | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
This is old, but got picked up.
The select/default pattern around stream.Recv() causes a tight CPU spin when there are no messages:
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
msg, err := stream.Recv() // blocks, but the select/default makes this a spin loopstream.Recv() is a blocking call, so the default branch is always taken immediately, and the goroutine never yields to the scheduler in a meaningful way. This will peg a CPU core. The fix is to remove the select/default entirely and just call stream.Recv() directly in the loop, handling ctx.Done() via the stream's context:
for {
msg, err := stream.Recv()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return errors.Wrap(err, "stream recv error")
}
// handle msg...
}
There was a problem hiding this comment.
Ooh good point, will update that
| gsa.Status.State = allocationv1.GameServerAllocationUnAllocated | ||
| return gsa, http.StatusCreated | ||
| case codes.Aborted: | ||
| gsa.Status.State = allocationv1.GameServerAllocationContention | ||
| return gsa, http.StatusCreated |
There was a problem hiding this comment.
This looks wrong - HTTP 201 - Created for allocation unallocated and contention?
Is this what we do now?
There was a problem hiding this comment.
I will double check, but from what I remember, it's the same behaviour as what is currently happening, we return a 201 with the state as alloc contention, but might be wrong
|
|
||
| // Re-add the request to the hot batch and pendingRequests for the next pull | ||
| // Re-add retryable requests to the hot batch | ||
| p.batchMutex.Lock() |
There was a problem hiding this comment.
Noticing this map has a lot of locks and unlocks - just triple checking we're good with that and being careful we don't read a state and then have something else come along and write a state while it's running that makes things go all skewiff.
There was a problem hiding this comment.
Oh indeed, I'll double check that, there is 8 locks within this file, might be a lot / will ensure they are needed and also checking that there is no race conditions or so 👌🏼
cmd/processor/handler.go
Outdated
| type processorHandler struct { | ||
| allocationpb.UnimplementedProcessorServer | ||
| ctx context.Context | ||
| cancel context.CancelFunc |
There was a problem hiding this comment.
What's the cancelFunc doing - we don't call it 😄 or is this a placeholder "UnimplementedProcessorServer" ?
There was a problem hiding this comment.
That's a good question hehe, I was adding the context and it's cancel in case I needed it, but actually I don't need it (gonna get rid of both of them from the struct, I only need it from the newServiceHandler which send it to other part)
pkg/processor/client.go
Outdated
There was a problem hiding this comment.
Also old - but magic number. Also why 20? 😄
There was a problem hiding this comment.
Gonna switch that to 1, I added it for some manual tests / debugging, was to see how it would handle if the server would send multiple pull requests at the same time, will update it 😅
cmd/processor/handler.go
Outdated
| wg.Add(1) | ||
| go func(index int, requestWrapper *allocationpb.RequestWrapper) { | ||
| defer wg.Done() | ||
| results[index] = h.processAllocation(ctx, requestWrapper.Request) | ||
| }(i, reqWrapper) | ||
| } |
There was a problem hiding this comment.
| wg.Add(1) | |
| go func(index int, requestWrapper *allocationpb.RequestWrapper) { | |
| defer wg.Done() | |
| results[index] = h.processAllocation(ctx, requestWrapper.Request) | |
| }(i, reqWrapper) | |
| } | |
| for i, reqWrapper := range requestWrappers { | |
| wg.Go(func() { | |
| results[i] = h.processAllocation(ctx, reqWrapper.Request) | |
| }) | |
| } |
New!
There was a problem hiding this comment.
Oooh nice ! Gonna update it
pkg/processor/client.go
Outdated
| delete(p.requestIDMapping, id) | ||
| } | ||
|
|
There was a problem hiding this comment.
Nit:
| delete(p.requestIDMapping, id) | |
| } | |
| } | |
| clear(p.requestIDMapping) |
Less operations
We don't do TLS for within cluster gRPC calls anywhere else. So I don't think we need this (local SDK for example). |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Failed 😭 Build Id: fbbb6519-e025-4887-a02a-4980f339c0d7 Status: FAILURE To get permission to view the Cloud Build view, join the agones-discuss Google Group. |
Signed-off-by: Thomas Lacroix <[email protected]>
|
This PR exceeds the recommended size of 1000 lines. Please make sure you are NOT addressing multiple issues with one PR. Note this PR might be rejected due to its size. |
|
Build Succeeded 🥳 Build Id: 23225fbb-9e18-48ab-a173-2e6e3dba2b08 The following development artifacts have been built, and will exist for the next 30 days:
A preview of the website (the last 30 builds are retained): To install this version: |
What type of PR is this?
/kind feature
What this PR does / Why we need it:
Implementation of the processor server behind dev feature flag
TODO (some might be on another MR):
Which issue(s) this PR fixes:
Part of #4190
Special notes for your reviewer:
Opening the draft for early feedbacks