Skip to content

Commit

Permalink
Add way to disable caching for http_client
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Nov 27, 2024
1 parent 9b470b3 commit 03efe58
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions gateway/mw_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ func (s *StreamingMiddleware) createStreamManager(r *http.Request) *StreamManage
configJSON, _ := json.Marshal(streamsConfig)
cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON))

// Determine if caching should be disabled
disableCache := s.shouldDisableCache(streamsConfig)

// Critical section starts here
// This section is called by ProcessRequest method of the middleware implementation
// Concurrent requests can call this method at the same time and those requests
Expand All @@ -221,9 +224,11 @@ func (s *StreamingMiddleware) createStreamManager(r *http.Request) *StreamManage

s.Logger().Debug("Attempting to load stream manager from cache")
s.Logger().Debugf("Cache key: %s", cacheKey)
if cachedManager, found := s.streamManagerCache.Load(cacheKey); found {
s.Logger().Debug("Found cached stream manager")
return cachedManager.(*StreamManager)
if !disableCache {
if cachedManager, found := s.streamManagerCache.Load(cacheKey); found {
s.Logger().Debug("Found cached stream manager")
return cachedManager.(*StreamManager)
}
}

newStreamManager := &StreamManager{
Expand All @@ -234,12 +239,35 @@ func (s *StreamingMiddleware) createStreamManager(r *http.Request) *StreamManage
}
newStreamManager.initStreams(r, streamsConfig)

if r != nil {
if !disableCache && r != nil {
s.streamManagerCache.Store(cacheKey, newStreamManager)
}
return newStreamManager
}

func (s *StreamingMiddleware) shouldDisableCache(streamsConfig *StreamsConfig) bool {
for _, stream := range streamsConfig.Streams {
if streamMap, ok := stream.(map[string]interface{}); ok {
inputType := s.getComponentType(streamMap, "input")
outputType := s.getComponentType(streamMap, "output")
if inputType == "http_client" && outputType == "http_server" {
return true
}
}
}
return false
}

// getComponentType returns the type of the input or output component from the stream configuration
func (s *StreamingMiddleware) getComponentType(streamConfig map[string]interface{}, component string) string {
if componentMap, ok := streamConfig[component].(map[string]interface{}); ok {
if typeStr, ok := componentMap["type"].(string); ok {
return typeStr
}
}
return ""
}

// Helper function to extract paths from an http_server configuration
func extractPaths(httpConfig map[string]interface{}) map[string]string {
paths := make(map[string]string)
Expand Down Expand Up @@ -547,7 +575,7 @@ func (h *handleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter,
defer h.sm.activityCounter.Add(-1)

hasInput := h.inputHandlers[path] != nil
hasOutput := h.inputHandlers[path] != nil
hasOutput := h.outputHandlers[path] != nil

if !hasInput || !hasOutput {
h.logger.Debugf("Only output handler found for path: %s, executing directly", path)
Expand Down Expand Up @@ -576,6 +604,17 @@ func (h *handleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter,
}

handler(w, r)
if handlerType == "output" {
streamsConfig := &StreamsConfig{
Streams: map[string]any{
"stream": h.config,
},
}
if h.mw.shouldDisableCache(streamsConfig) {
h.logger.Debugf("Cache disabled, removing stream %s after output handler", h.streamID)
h.sm.removeStream(h.streamID)
}
}
case pathKey == "ws_path" && websocket.IsWebSocketUpgrade(r):
h.handleWebSocket(f, w, r, path)
default:
Expand All @@ -589,7 +628,7 @@ func (h *handleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter,

func (h *handleFuncAdapter) handleWebSocket(f func(w http.ResponseWriter, r *http.Request), w http.ResponseWriter, r *http.Request, path string) {
if h.inputHandlers[path] == nil || h.outputHandlers[path] == nil {
h.logger.Debugf("Executing directly", path)
h.logger.Debugf("Executing directly %s", path)
f(w, r)
return
}
Expand Down

0 comments on commit 03efe58

Please sign in to comment.