Skip to content

Commit

Permalink
client adaptation and functional version of subscribeEvents
Browse files Browse the repository at this point in the history
Note: at the moment it's not possible to omit optional parameters in json-rpc calls using array as structure type with Juno, and the current client implementation only supports sending parameters as arrays. Therefore, I changed the Subscribe function and now we are able to send optional parameters as object too. That way Juno doesn't return an error
  • Loading branch information
thiagodeev committed Jan 14, 2025
1 parent 76d01c8 commit f678b3f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 36 deletions.
18 changes: 11 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
if result != nil && reflect.TypeOf(result).Kind() != reflect.Ptr {
return fmt.Errorf("call result parameter must be pointer or nil interface: %v", result)
}
msg, err := c.newMessage(method, args...)
msg, err := c.newMessage(method, args)
if err != nil {
return err
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
resp: make(chan []*jsonrpcMessage, 1),
}
for i, elem := range b {
msg, err := c.newMessage(elem.Method, elem.Args...)
msg, err := c.newMessage(elem.Method, elem.Args)
if err != nil {
return err
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
// Notify sends a notification, i.e. a method call that doesn't expect a response.
func (c *Client) Notify(ctx context.Context, method string, args ...interface{}) error {
op := new(requestOp)
msg, err := c.newMessage(method, args...)
msg, err := c.newMessage(method, args)
if err != nil {
return err
}
Expand All @@ -480,7 +480,11 @@ func (c *Client) Notify(ctx context.Context, method string, args ...interface{})
// EthSubscribe registers a subscription under the "eth" namespace.
// Note: this was kept for compatibility with the ethereum client tests
func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
return c.Subscribe(ctx, "eth", subscribeMethodSuffix, channel, args...)
return c.SubscribeWithSliceArgs(ctx, "eth", subscribeMethodSuffix, channel, args)
}

func (c *Client) SubscribeWithSliceArgs(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
return c.Subscribe(ctx, namespace, methodSuffix, channel, args)
}

// Subscribe calls the "<namespace>_subscribe" method with the given arguments,
Expand All @@ -495,7 +499,7 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...
// before considering the subscriber dead. The subscription Err channel will receive
// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure
// that the channel usually has at least one reader to prevent this issue.
func (c *Client) Subscribe(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args ...interface{}) (*ClientSubscription, error) {
func (c *Client) Subscribe(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args interface{}) (*ClientSubscription, error) {
// Check type of channel first.
chanVal := reflect.ValueOf(channel)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
Expand All @@ -508,7 +512,7 @@ func (c *Client) Subscribe(ctx context.Context, namespace string, methodSuffix s
return nil, ErrNotificationsUnsupported
}

msg, err := c.newMessage(namespace+methodSuffix, args...)
msg, err := c.newMessage(namespace+methodSuffix, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -536,7 +540,7 @@ func (c *Client) SupportsSubscriptions() bool {
return !c.isHTTP
}

func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMessage, error) {
func (c *Client) newMessage(method string, paramsIn interface{}) (*jsonrpcMessage, error) {
msg := &jsonrpcMessage{Version: vsn, ID: c.nextID(), Method: method}
if paramsIn != nil { // prevent sending "params":null
var err error
Expand Down
14 changes: 7 additions & 7 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestClientSubscribe(t *testing.T) {

nc := make(chan int)
count := 10
sub, err := client.Subscribe(context.Background(), "nftest", subscribeMethodSuffix, nc, "someSubscription", count, 0)
sub, err := client.SubscribeWithSliceArgs(context.Background(), "nftest", subscribeMethodSuffix, nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func TestClientSubscribeClose(t *testing.T) {
err error
)
go func() {
sub, err = client.Subscribe(context.Background(), "nftest2", subscribeMethodSuffix, nc, "hangSubscription", 999)
sub, err = client.SubscribeWithSliceArgs(context.Background(), "nftest2", subscribeMethodSuffix, nc, "hangSubscription", 999)
errc <- err
}()

Expand Down Expand Up @@ -526,7 +526,7 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
for i := 0; i < 20; i++ {
client := DialInProc(server)
nc := make(chan int)
sub, err := client.Subscribe(context.Background(), "nftest", subscribeMethodSuffix, nc, "someSubscription", 3, 1)
sub, err := client.SubscribeWithSliceArgs(context.Background(), "nftest", subscribeMethodSuffix, nc, "someSubscription", 3, 1)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -584,7 +584,7 @@ func TestUnsubscribeTimeout(t *testing.T) {
defer client.Close()

// Start subscription.
sub, err := client.Subscribe(context.Background(), "nftest", subscribeMethodSuffix, make(chan int), "someSubscription", 1, 1)
sub, err := client.SubscribeWithSliceArgs(context.Background(), "nftest", subscribeMethodSuffix, make(chan int), "someSubscription", 1, 1)
if err != nil {
t.Fatalf("failed to subscribe: %v", err)
}
Expand Down Expand Up @@ -652,7 +652,7 @@ func TestClientSubscriptionUnsubscribeServer(t *testing.T) {

// Create the subscription.
ch := make(chan int)
sub, err := client.Subscribe(context.Background(), "nftest", subscribeMethodSuffix, ch, "someSubscription", 1, 1)
sub, err := client.SubscribeWithSliceArgs(context.Background(), "nftest", subscribeMethodSuffix, ch, "someSubscription", 1, 1)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestClientSubscriptionChannelClose(t *testing.T) {

for i := 0; i < 100; i++ {
ch := make(chan int, 100)
sub, err := client.Subscribe(context.Background(), "nftest", subscribeMethodSuffix, ch, "someSubscription", 100, 1)
sub, err := client.SubscribeWithSliceArgs(context.Background(), "nftest", subscribeMethodSuffix, ch, "someSubscription", 100, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -712,7 +712,7 @@ func TestClientNotificationStorm(t *testing.T) {
// Subscribe on the server. It will start sending many notifications
// very quickly.
nc := make(chan int)
sub, err := client.Subscribe(ctx, "nftest", subscribeMethodSuffix, nc, "someSubscription", count, 0)
sub, err := client.SubscribeWithSliceArgs(ctx, "nftest", subscribeMethodSuffix, nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
Expand Down
3 changes: 2 additions & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type callCloser interface {

type wsConn interface {
callCloser
Subscribe(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args ...interface{}) (*client.ClientSubscription, error)
Subscribe(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args interface{}) (*client.ClientSubscription, error)
SubscribeWithSliceArgs(ctx context.Context, namespace string, methodSuffix string, channel interface{}, args ...interface{}) (*client.ClientSubscription, error)
}

// do is a function that performs a remote procedure call (RPC) using the provided callCloser.
Expand Down
3 changes: 2 additions & 1 deletion rpc/types_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ type EventsInput struct {
type EventSubscriptionInput struct {
FromAddress *felt.Felt `json:"from_address,omitempty"` // Optional. Filter events by from_address which emitted the event
Keys [][]*felt.Felt `json:"keys,omitempty"` // Optional. Per key (by position), designate the possible values to be matched for events to be returned. Empty array designates 'any' value
BlockID BlockID `json:"block_id,omitempty"` // Optional. The block to get notifications from, default is latest, limited to 1024 blocks back
BlockID BlockID `json:"block,omitempty"` // Optional. The block to get notifications from, default is latest, limited to 1024 blocks back
// TODO: change 'block' to 'block_id' as soon as Juno fixes the issue with the 'block' field
}
22 changes: 2 additions & 20 deletions rpc/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (provider *WsProvider) SubscribeNewHeads(ctx context.Context, headers chan<
params[i] = v
}

sub, err := provider.c.Subscribe(ctx, "starknet", "_subscribeNewHeads", headers, params...)
sub, err := provider.c.SubscribeWithSliceArgs(ctx, "starknet", "_subscribeNewHeads", headers, params...)
if err != nil {
return nil, tryUnwrapToRPCErr(err, ErrTooManyBlocksBack, ErrBlockNotFound, ErrCallOnPending)
}
Expand All @@ -41,25 +41,7 @@ func (provider *WsProvider) SubscribeNewHeads(ctx context.Context, headers chan<
// - clientSubscription: The client subscription object, used to unsubscribe from the stream and to get errors
// - error: An error, if any
func (provider *WsProvider) SubscribeEvents(ctx context.Context, events chan<- *EmittedEvent, input EventSubscriptionInput) (*client.ClientSubscription, error) {
// Convert struct fields to []any, only including non-empty fields
var params []any

switch {
case input.BlockID.Number != nil:
params = append(params, input.BlockID.Number)
case input.BlockID.Hash != nil:
params = append(params, input.BlockID.Hash)
case input.BlockID.Tag != "":
params = append(params, input.BlockID.Tag)
}
if input.FromAddress != nil {
params = append(params, input.FromAddress)
}
if len(input.Keys) > 0 {
params = append(params, input.Keys)
}

sub, err := provider.c.Subscribe(ctx, "starknet", "_subscribeEvents", events, params...)
sub, err := provider.c.Subscribe(ctx, "starknet", "_subscribeEvents", events, input)
if err != nil {
return nil, tryUnwrapToRPCErr(err, ErrTooManyKeysInFilter, ErrTooManyBlocksBack, ErrBlockNotFound, ErrCallOnPending)
}
Expand Down

0 comments on commit f678b3f

Please sign in to comment.