Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module SolaceSamples.com/PubSub+Go

go 1.17

require solace.dev/go/messaging v1.8.0
require solace.dev/go/messaging v1.9.0

require solace.dev/go/messaging-trace/opentelemetry v1.0.0

Expand Down
251 changes: 251 additions & 0 deletions howtos/how_to_use_pubsub_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"solace.dev/go/messaging"
"solace.dev/go/messaging/pkg/solace"
"solace.dev/go/messaging/pkg/solace/config"
"solace.dev/go/messaging/pkg/solace/message"
"solace.dev/go/messaging/pkg/solace/resource"
)

const (
ValidCachedMessageAge int32 = 0
ValidMaxCachedMessages int32 = 0
ValidCacheAccessTimeout int32 = 5000
)

func getEnv(key, def string) string {
if val, ok := os.LookupEnv(key); ok {
return val
}
return def
}

// HowToGetCacheRequestConfiguration - example of how to retrieve the cache request
// based on the different supported strategies
func HowToGetCacheRequestConfiguration(cacheRequestStrategy resource.CachedMessageSubscriptionStrategy) resource.CachedMessageSubscriptionRequest {
// specify the topic name
topic := "MaxMsgs3/default/notcached"
// specify the cache name
cacheName := "CacheMessages"
var cacheRequestConfig resource.CachedMessageSubscriptionRequest

switch cacheRequestStrategy {
// For cache request with strategy AsAvailable
case resource.CacheRequestStrategyAsAvailable:
cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest(
resource.CacheRequestStrategyAsAvailable,
cacheName,
resource.TopicSubscriptionOf(topic),
ValidCacheAccessTimeout, // specify cache access timeout
ValidMaxCachedMessages, // specify max cached messages
ValidCachedMessageAge, // specify max cached message age
)
// For cache request with strategy CachedFirst
case resource.CacheRequestStrategyCachedFirst:
cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest(
resource.CacheRequestStrategyCachedFirst,
cacheName,
resource.TopicSubscriptionOf(topic),
ValidCacheAccessTimeout, // specific cache access timeout
ValidMaxCachedMessages, // specific max cached messages
ValidCachedMessageAge, // specific max cached message age
)
// For cache request with strategy CachedOnly
case resource.CacheRequestStrategyCachedOnly:
cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest(
resource.CacheRequestStrategyCachedOnly,
cacheName,
resource.TopicSubscriptionOf(topic),
ValidCacheAccessTimeout, // specific cache access timeout
ValidMaxCachedMessages, // specific max cached messages
ValidCachedMessageAge, // specific max cached message age
)
// For cache request with strategy LiveCancelsCached
case resource.CacheRequestStrategyLiveCancelsCached:
cacheRequestConfig = resource.NewCachedMessageSubscriptionRequest(
resource.CacheRequestStrategyLiveCancelsCached,
cacheName,
resource.TopicSubscriptionOf(topic),
ValidCacheAccessTimeout, // specific cache access timeout
ValidMaxCachedMessages, // specific max cached messages
ValidCachedMessageAge, // specific max cached message age
)
}

// Dump the cache request
fmt.Println(fmt.Sprintf("\nConstructed Cache Request Strategy: %s \n", cacheRequestConfig))
return cacheRequestConfig
}

// CacheResponseCallback - this is the callback to handle the cache response
// (used by the directReceiver.RequestCachedAsyncWithCallback())
func CacheResponseCallback(cacheResponse solace.CacheResponse) {
// do something with the cache response
// ...
// ...

// dump the cache response here
// How to correlate the cache request ID of a request with a received message and response
fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID())

// How to check the CacheRequestOutcome of a cache response
fmt.Printf("Received Cache Response; CacheRequestOutcome %s\n", cacheResponse.GetCacheRequestOutcome())

// How to check the error of a cache response
fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError())
}

// HowToSendCacheRequestAndProcessCacheResponseWithCallback - example of send the cache request and retrieve the cache response using a callback
func HowToSendCacheRequestAndProcessCacheResponseWithCallback(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) {
// submit a cache request with a specific cache request ID (in this case we use 1)
cacheRequestID := message.CacheRequestID(1)

// call the method on the direct receiver to send the cache request and retrieve the cache response using a callback
err := directReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, CacheResponseCallback)

// we do not expect an error
if err != nil {
panic(err)
}

// ...
}

// HowToSendCacheRequestAndProcessCacheResponseWithChannel - example of how to send the cache request and retrieve the cache response on a channel
func HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver solace.DirectMessageReceiver, cacheRequestConfig resource.CachedMessageSubscriptionRequest) {
// submit a cache request with a specific cache request ID (in this case we use 1)
cacheRequestID := message.CacheRequestID(1)

// call the method on the direct receiver to send the cache request and retrieve the cache response on a channel
cacheResponseChannel, err := directReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID)

// we do not expect an error
if err != nil {
panic(err)
}

select {
// this will hold the cache response that was received on the channel
case cacheResponse := <-cacheResponseChannel:
// do something with the cache response
// ...
// ...

// dump the cache response here
// How to correlate the cache request ID of a request with a received message and response
fmt.Printf("Received Cache Response; CacheRequestID %s\n", cacheResponse.GetCacheRequestID())

// How to check the CacheRequestOutcome of a cache response
fmt.Printf("Received Cache Response; CacheRequestOutcome %s\n", cacheResponse.GetCacheRequestOutcome())

// How to check the error of a cache response
fmt.Printf("Received Cache Response; Error %s\n", cacheResponse.GetError())

case <-time.After(1 * time.Second):
fmt.Printf("timed out waiting for cache response to be received")
}

// ...
}

// Code examples of how to use the Go PubSub+ cache.
func main() {
// logging.SetLogLevel(logging.LogLevelInfo)

// Configuration parameters
brokerConfig := config.ServicePropertyMap{
config.TransportLayerPropertyHost: getEnv("SOLACE_HOST", "tcp://localhost:55555,tcp://localhost:55554"),
config.ServicePropertyVPNName: getEnv("SOLACE_VPN", "default"),
config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD", "default"),
config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USERNAME", "default"),
config.TransportLayerPropertyReconnectionAttempts: 0,
}

messagingService, err := messaging.NewMessagingServiceBuilder().
FromConfigurationProvider(brokerConfig).
Build()

if err != nil {
panic(err)
}

// Connect to the messaging service
if err := messagingService.Connect(); err != nil {
panic(err)
}

fmt.Println("Connected to the broker? ", messagingService.IsConnected())

// Build a Direct message receivers with given topics
directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder().
// we are using an arbitrary value for back pressure (you can configure this based on your use case)
OnBackPressureDropOldest(100100).
Build()

if err != nil {
panic(err)
}

// Start Direct Message Receiver
if err := directReceiver.Start(); err != nil {
panic(err)
}

fmt.Println("Direct Receiver running? ", directReceiver.IsRunning())

/////////////////////////////////////////
// Cache EXAMPLES
/////////////////////////////////////////

// For cache request with strategy AsAvailable
var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyAsAvailable)

// // For cache request with strategy CachedFirst
// var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyCachedFirst)

// // For cache request with strategy CachedOnly
// var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyCachedOnly)

// // For cache request with strategy LiveCancelsCached
// var cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest = HowToGetCacheRequestConfiguration(resource.CacheRequestStrategyLiveCancelsCached)

// Send the cache request and retrieve the cache response using a callback
HowToSendCacheRequestAndProcessCacheResponseWithCallback(directReceiver, cachedMessageSubscriptionRequest)

// Send the cache request and retrieve the cache response using a callback
HowToSendCacheRequestAndProcessCacheResponseWithChannel(directReceiver, cachedMessageSubscriptionRequest)

fmt.Println("\n===Interrupt (CTR+C) to handle graceful termination of the messaging service===")

// cleanup after the main calling function has finished execution
defer func() {
// A graceful shutdown of the directReceiver is attempted within the specified
// grace period (in this case, we are using 1 second).
var gracePeriod time.Duration = 1 * time.Second

// Terminate the Direct Receiver
// The receiver can be terminated before the cache response has been completed,
// but this is not possible to demonstrate in this HowTo due to infrastructure limitations.
// Setting gracePeriod to 0 implies a non-graceful shutdown that ignores unfinished tasks or in-flight messages.
// This function returns an error if one occurred, or nil if it successfully and gracefully terminated.
// If gracePeriod is set to less than 0, the function waits indefinitely.
directReceiver.Terminate(gracePeriod)
fmt.Println("\nDirect Receiver Terminated? ", directReceiver.IsTerminated())

// Disconnect the Message Service
messagingService.Disconnect()
fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected())
}()

// Run forever until an interrupt signal is received
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
// Block until a interrupt signal is received.
<-c
}