Skip to content

Commit

Permalink
Fix nil pointer dereferences (#17)
Browse files Browse the repository at this point in the history
* fix nil pointer dereferences
  • Loading branch information
radito3 authored Jun 6, 2024
1 parent 76b74c2 commit 471bdde
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 41 deletions.
6 changes: 2 additions & 4 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,8 @@ Loop:
continue
}
if operation == nil {
// this flow is when the gRPC connection is closed (either by the server or the context has been cancelled)
connAttemptChan <- struct{}{}
// do not increment the retries, as this isn't a failure
continue
// this flow is when the gRPC connection is closed by the server
break Loop
}

log.Printf("Creating processor for operation: %T\n", operation.Body)
Expand Down
1 change: 1 addition & 0 deletions internal/executors/http/authorization_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func CreateAuthorizationHeader(params *HttpRequestParameters) (string, error) {
authHeader := params.GetAuthorizationHeader()

if authHeader != "" {
log.Println("HTTP Client: using raw authorization header value")
return authHeader, nil
}

Expand Down
47 changes: 23 additions & 24 deletions internal/executors/http/http_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,36 @@ func (e *HttpRequestExecutor) Execute(ctx executors.Context) *executors.Executor
}

resp, err := e.ExecuteWithParameters(params)

switch typedErr := err.(type) {
case *executors.RetryableError:
log.Println("Returning Task state Failed Retryable Error...")
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_RETRYABLE),
executors.Error(typedErr),
)
case *executors.NonRetryableError:
log.Println("Returning Task state Failed Non-Retryable Error...")
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.Error(typedErr),
)
default:
m := resp.ToMap()
if !resp.successful {
log.Println("Returning Task state Failed Retryable Error from HTTP response...")
if err != nil {
if errors.Is(err, &executors.RetryableError{}) {
log.Println("Returning Task state Failed Retryable Error...")
return executors.NewExecutorResult(
executors.Output(m),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_RETRYABLE),
executors.ErrorString(buildHttpError(resp)),
executors.Error(err),
)
}
log.Println("Returning Task state Failed Non-Retryable Error...")
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.Error(err),
)
}

log.Println("Returning Task state Completed...")
m := resp.ToMap()
if !resp.successful {
log.Println("Returning Task state Failed Retryable Error from HTTP response...")
return executors.NewExecutorResult(
executors.Output(m),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_COMPLETED),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_RETRYABLE),
executors.ErrorString(buildHttpError(resp)),
)
}

log.Println("Returning Task state Completed...")
return executors.NewExecutorResult(
executors.Output(m),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_COMPLETED),
)
}

func (e *HttpRequestExecutor) ExecuteWithParameters(p *HttpRequestParameters) (*HttpResponse, error) {
Expand Down Expand Up @@ -112,13 +111,13 @@ func execute(c *http.Client, p *HttpRequestParameters, authHeader string) (*Http
log.Printf("HTTP Client: executing request %s %s...\n", p.method, p.url)
resp, err := c.Do(req)
if requestTimedOut(err) {
log.Println("HTTP Client: request timed out after", p.timeout, "seconds")
log.Println("HTTP Client: request timed out after", c.Timeout, "seconds")
if p.succeedOnTimeout {
log.Println("HTTP Client: SucceedOnTimeout has been configured. Returning successful response...")
return newTimedOutHttpResponse(req, resp)
}

return nil, executors.NewRetryableError("HTTP request timed out after %d seconds", p.timeout).WithCause(err)
return nil, executors.NewRetryableError("HTTP request timed out after %d seconds", c.Timeout).WithCause(err)
}

if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion internal/executors/http/oauth_header_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type cachedToken struct {
func NewOAuthorizationHeaderGenerator(tokenType TokenType, tokenUrl string, executor HttpExecutor, requestBody string,
opts ...OAuthorizationHeaderOption) CacheableAuthorizationHeaderGenerator {
h := &oAuthorizationHeaderGenerator{
tokenType: tokenType,
tokenType: tokenType,
requestStore: make(map[string]string),
}

for _, opt := range opts {
Expand Down Expand Up @@ -64,6 +65,12 @@ func WithCachingKey(cacheKey string) OAuthorizationHeaderOption {
}
}

func WithCacheStore(store map[string]string) OAuthorizationHeaderOption {
return func(h *oAuthorizationHeaderGenerator) {
h.requestStore = store
}
}

func (h *oAuthorizationHeaderGenerator) Generate() (string, error) {
oAuthToken, err := h.fetchToken()
if err != nil {
Expand All @@ -82,6 +89,8 @@ func (h *oAuthorizationHeaderGenerator) GenerateWithCacheAside() (string, error)
log.Println("OAuth token header: error decoding cached token:", err)
return "", fmt.Errorf("failed to deserialize cached OAuth token: %v", err)
}
} else {
cached.OAuthToken = &OAuthToken{}
}

if h.tokenAboutToExpire(cached) {
Expand Down
37 changes: 27 additions & 10 deletions internal/executors/http/oauth_header_generator_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"

"github.com/SAP/remote-work-processor/internal/executors"
"github.com/SAP/remote-work-processor/internal/executors/http/tls"
)

Expand All @@ -18,6 +19,16 @@ const (
REFRESH_TOKEN_FORMAT_WITH_CERT string = "grant_type=refresh_token&client_id=%s&refresh_token=%s"
)

type errorTokenGenerator struct{}

func (errorTokenGenerator) Generate() (string, error) {
return "", executors.NewNonRetryableError("missing user, client ID or refresh token")
}

func (errorTokenGenerator) GenerateWithCacheAside() (string, error) {
return "", executors.NewNonRetryableError("missing user, client ID or refresh token")
}

func NewOAuthHeaderGenerator(p *HttpRequestParameters) CacheableAuthorizationHeaderGenerator {
user := p.GetUser()
clientId := p.GetClientId()
Expand All @@ -43,7 +54,7 @@ func NewOAuthHeaderGenerator(p *HttpRequestParameters) CacheableAuthorizationHea
return clientCredentialsGenerator(p, clientId, p.GetClientSecret())
}

return nil // what happens here?
return errorTokenGenerator{}
}

func passwordGrantGenerator(p *HttpRequestParameters) CacheableAuthorizationHeaderGenerator {
Expand All @@ -57,7 +68,8 @@ func passwordGrantGenerator(p *HttpRequestParameters) CacheableAuthorizationHead
NewDefaultHttpRequestExecutor(),
body,
WithAuthenticationHeader(generateBasicAuthorizationHeader(clientId, clientSecret)),
WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)))
WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)),
WithCacheStore(p.store))
}

func passwordGrantWithClientCertificateGenerator(p *HttpRequestParameters) CacheableAuthorizationHeaderGenerator {
Expand All @@ -71,7 +83,8 @@ func passwordGrantWithClientCertificateGenerator(p *HttpRequestParameters) Cache
NewDefaultHttpRequestExecutor(),
body,
UseCertificateAuthentication(p.GetCertificateAuthentication()),
WithCachingKey(generateCachingKey(tokenUrl, clientId, "", body)))
WithCachingKey(generateCachingKey(tokenUrl, clientId, "", body)),
WithCacheStore(p.store))
}

func clientCredentialsGenerator(p *HttpRequestParameters, clientId string, clientSecret string) CacheableAuthorizationHeaderGenerator {
Expand All @@ -91,7 +104,8 @@ func clientCredentialsGenerator(p *HttpRequestParameters, clientId string, clien
NewDefaultHttpRequestExecutor(),
body,
opt,
WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)))
WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)),
WithCacheStore(p.store))
}

func refreshTokenGenerator(p *HttpRequestParameters) CacheableAuthorizationHeaderGenerator {
Expand All @@ -101,31 +115,34 @@ func refreshTokenGenerator(p *HttpRequestParameters) CacheableAuthorizationHeade
refreshToken := p.GetRefreshToken()

if p.GetCertificateAuthentication().GetClientCertificate() == "" {
return refreshTokenGrant(tokenUrl, clientId, clientSecret, refreshToken)
return refreshTokenGrant(tokenUrl, clientId, clientSecret, refreshToken, p.store)
} else {
return refreshTokenGrantWithClientCert(tokenUrl, clientId, refreshToken, p.GetCertificateAuthentication())
return refreshTokenGrantWithClientCert(tokenUrl, clientId, refreshToken, p.GetCertificateAuthentication(), p.store)
}
}

func refreshTokenGrantWithClientCert(tokenUrl, clientId, refreshToken string, certAuthentication *tls.CertificateAuthentication) CacheableAuthorizationHeaderGenerator {
func refreshTokenGrantWithClientCert(tokenUrl, clientId, refreshToken string, certAuthentication *tls.CertificateAuthentication,
store map[string]string) CacheableAuthorizationHeaderGenerator {
body := fmt.Sprintf(REFRESH_TOKEN_FORMAT_WITH_CERT, urlEncoded(clientId), urlEncoded(refreshToken))

return NewOAuthorizationHeaderGenerator(TokenType_ACCESS,
tokenUrl,
NewDefaultHttpRequestExecutor(),
body,
UseCertificateAuthentication(certAuthentication),
WithCachingKey(generateCachingKey(tokenUrl, clientId, "", body)))
WithCachingKey(generateCachingKey(tokenUrl, clientId, "", body)),
WithCacheStore(store))
}

func refreshTokenGrant(tokenUrl, clientId, clientSecret, refreshToken string) CacheableAuthorizationHeaderGenerator {
func refreshTokenGrant(tokenUrl, clientId, clientSecret, refreshToken string, store map[string]string) CacheableAuthorizationHeaderGenerator {
body := fmt.Sprintf(REFRESH_TOKEN_FORMAT, urlEncoded(refreshToken))

var opts []OAuthorizationHeaderOption
if clientId != "" {
opts = append(opts, WithAuthenticationHeader(generateBasicAuthorizationHeader(clientId, clientSecret)))
}
opts = append(opts, WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)))
opts = append(opts, WithCachingKey(generateCachingKey(tokenUrl, clientId, clientSecret, body)),
WithCacheStore(store))

return NewOAuthorizationHeaderGenerator(TokenType_ACCESS,
tokenUrl,
Expand Down
4 changes: 2 additions & 2 deletions internal/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func (gc *RemoteWorkProcessorGrpcClient) ReceiveMsg() (*pb.ServerMessage, error)
log.Println("Waiting for server message...")
msg, err := gc.stream.Recv()
if err == io.EOF {
log.Println("Server closed the connection.")
log.Println("Server closed the connection. Stopping Remote Work Processor...")
gc.closeConn()
return nil, nil
}

if err != nil {
rpcErr, isRpcErr := status.FromError(err)
if isRpcErr && rpcErr.Code() == codes.Canceled {
// context was cancelled
log.Println("Context cancelled. Stopping Remote Work Processor...")
return nil, nil
}
return nil, fmt.Errorf("error occurred while receiving message from server: %v", err)
Expand Down

0 comments on commit 471bdde

Please sign in to comment.