Skip to content

Commit

Permalink
Merge pull request #56 from plainbanana/wait-any-event
Browse files Browse the repository at this point in the history
Add run_event_loop method
  • Loading branch information
plainbanana authored Jan 28, 2025
2 parents befa605 + 68e513c commit dabf5a0
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 4 deletions.
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ To run a Redis command in pipeline with arguments and a callback.
The command can also be expressed by concatenating the subcommands with underscores.

Commands issued to the same node are sent and received in pipeline mode.
In pipeline mode, commands are not sent to Redis until `wait_one_response` or `wait_all_responses` is issued.
In pipeline mode, commands are not sent to Redis until `run_event_loop`, `wait_one_response` or `wait_all_responses` is issued.

The callback is executed with two arguments.
The first is the result of the command, and the second is the error message.
`$result` will be a scalar value or an array reference, and `$error` will be an undefined value if no errors occur.
Also, `$error` may contain an error returned from Redis or an error that occurred on the client (e.g. Timeout).

You cannot call any client methods inside the callback.
You cannot call any client methods or exceptions inside the callback.

After issuing a command in pipeline mode,
do not execute fork() without issuing `disconnect` if all callbacks are not executed completely.
Expand All @@ -168,6 +168,49 @@ do not execute fork() without issuing `disconnect` if all callbacks are not exec
# some operations...
});

## run\_event\_loop()

This method allows you to issue commands without waiting for their responses.
You can then perform a blocking wait for those responses later, if needed.

Executes one iteration of the event loop to process any pending commands that have not yet been sent
and any incoming responses from Redis.

If there are events that can be triggered immediately, they will all be processed.
In other words, if there are unsent commands, they will be pipelined and sent,
and if there are already-received responses, their corresponding callbacks will be executed.

If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
but unprocessed callbacks remain, then this method will block for up to `command_timeout` while waiting for a response from Redis.
When a timeout occurs, an error will be propagated to the corresponding callback(s).

The return value can be either 1 for success (e.g., commands sent or responses read),
0 for no callbacks remained, or undef for other errors.

### Notes

- Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
If a timeout occurs, all remaining commands on that node will time out as well.
- Internally, this method calls `event_base_loop(..., EVLOOP_ONCE)`, which
performs a single iteration of the event loop. A command will not be fully processed in a single call.
- If you need to process multiple commands or wait for all responses, call
this method repeatedly or use `wait_all_responses`.
- For a simpler, synchronous-like usage where you need at least one response,
refer to `wait_one_response`. If you only need to block until all
pending commands are processed, see `wait_all_responses`.

### Example

# Queue multiple commands in pipeline mode
$redis->set('key1', 'value1', sub {});
$redis->get('key2', sub {});

# Send commands to Redis without waiting for responses
$redis->run_event_loop();

# Possibly wait for responses
$redis->run_event_loop();

## wait\_one\_response()

If there are any unexcuted callbacks, it will block until at least one is executed.
Expand Down
69 changes: 67 additions & 2 deletions lib/Redis/Cluster/Fast.pm
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ sub new {
return $self;
}

sub run_event_loop {
my $self = shift;
my $result = $self->__run_event_loop();
return undef if $result == -1;
return $result;
}

sub wait_one_response {
my $self = shift;
my $result = $self->__wait_one_response();
Expand Down Expand Up @@ -278,14 +285,14 @@ To run a Redis command in pipeline with arguments and a callback.
The command can also be expressed by concatenating the subcommands with underscores.
Commands issued to the same node are sent and received in pipeline mode.
In pipeline mode, commands are not sent to Redis until C<wait_one_response> or C<wait_all_responses> is issued.
In pipeline mode, commands are not sent to Redis until C<run_event_loop>, C<wait_one_response> or C<wait_all_responses> is issued.
The callback is executed with two arguments.
The first is the result of the command, and the second is the error message.
C<$result> will be a scalar value or an array reference, and C<$error> will be an undefined value if no errors occur.
Also, C<$error> may contain an error returned from Redis or an error that occurred on the client (e.g. Timeout).
You cannot call any client methods inside the callback.
You cannot call any client methods or exceptions inside the callback.
After issuing a command in pipeline mode,
do not execute fork() without issuing C<disconnect> if all callbacks are not executed completely.
Expand All @@ -295,6 +302,64 @@ do not execute fork() without issuing C<disconnect> if all callbacks are not exe
# some operations...
});
=head2 run_event_loop()
This method allows you to issue commands without waiting for their responses.
You can then perform a blocking wait for those responses later, if needed.
Executes one iteration of the event loop to process any pending commands that have not yet been sent
and any incoming responses from Redis.
If there are events that can be triggered immediately, they will all be processed.
In other words, if there are unsent commands, they will be pipelined and sent,
and if there are already-received responses, their corresponding callbacks will be executed.
If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
but unprocessed callbacks remain, then this method will block for up to C<command_timeout> while waiting for a response from Redis.
When a timeout occurs, an error will be propagated to the corresponding callback(s).
The return value can be either 1 for success (e.g., commands sent or responses read),
0 for no callbacks remained, or undef for other errors.
=head3 Notes
=over 4
=item *
Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
If a timeout occurs, all remaining commands on that node will time out as well.
=item *
Internally, this method calls C<event_base_loop(..., EVLOOP_ONCE)>, which
performs a single iteration of the event loop. A command will not be fully processed in a single call.
=item *
If you need to process multiple commands or wait for all responses, call
this method repeatedly or use C<wait_all_responses>.
=item *
For a simpler, synchronous-like usage where you need at least one response,
refer to C<wait_one_response>. If you only need to block until all
pending commands are processed, see C<wait_all_responses>.
=back
=head3 Example
# Queue multiple commands in pipeline mode
$redis->set('key1', 'value1', sub {});
$redis->get('key2', sub {});
# Send commands to Redis without waiting for responses
$redis->run_event_loop();
# Possibly wait for responses
$redis->run_event_loop();
=head2 wait_one_response()
If there are any unexcuted callbacks, it will block until at least one is executed.
Expand Down
20 changes: 20 additions & 0 deletions src/Fast.xs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,19 @@ void run_cmd_impl_pipeline(pTHX_ Redis__Cluster__Fast self, int argc, const char
DEBUG_MSG("pipeline callback remain: %ld", self->pipeline_callback_remain);
}

int Redis__Cluster__Fast_run_event_loop(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
if (self->pipeline_callback_remain <= 0) {
return 0;
}
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
if (event_loop_error != 0) {
return -1;
}
return 1;
}

int Redis__Cluster__Fast_wait_one_response(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
int64_t callback_remain_current = self->pipeline_callback_remain;
Expand Down Expand Up @@ -681,6 +694,13 @@ PPCODE:

XSRETURN(2);

int
__run_event_loop(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_run_event_loop(aTHX_ self);
OUTPUT:
RETVAL

int
__wait_one_response(Redis::Cluster::Fast self)
CODE:
Expand Down
67 changes: 67 additions & 0 deletions xt/01_simple.t
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,71 @@ for my $case (
like $@, qr/$case->[1]/, 'startup_nodes validation';
}

{
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
ok $result;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
ok $redis->wait_all_responses;
is $redis->wait_all_responses, 0;
}

{
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
ok $result;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
ok $redis->wait_one_response;
is $redis->wait_one_response, 0;
}

{
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
ok $result;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
is $result, 12345;
});
ok $redis->run_event_loop;
ok $redis->run_event_loop;
is $redis->run_event_loop, 0;
}

done_testing;
19 changes: 19 additions & 0 deletions xt/02_leak.t
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,23 @@ no_leaks_ok {
$redis->wait_one_response;
} "No Memory leak - pipeline wait_one_response";

no_leaks_ok {
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->run_event_loop; # send only
undef $redis;
} "No Memory leak - pipeline run_event_loop";

done_testing;
20 changes: 20 additions & 0 deletions xt/05_valgrind.t
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,24 @@ eval {
is $redis->wait_one_response, 0;
}

{
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
ok $redis->run_event_loop;
ok $redis->run_event_loop;
is $redis->run_event_loop, 0;
}

done_testing;
19 changes: 19 additions & 0 deletions xt/08_leak_srandom.t
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,23 @@ no_leaks_ok {
$redis->wait_one_response;
} "No Memory leak - pipeline wait_one_response";

no_leaks_ok {
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->run_event_loop; # send only
undef $redis;
} "No Memory leak - pipeline run_event_loop";

done_testing;
20 changes: 20 additions & 0 deletions xt/09_valgrind_srandom.t
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,24 @@ eval {
is $redis->wait_one_response, 0;
}

{
my $redis = Redis::Cluster::Fast->new(
startup_nodes => get_startup_nodes,
);
$redis->del('pipeline');

$redis->set('pipeline', 12345, sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
$redis->get('pipeline', sub {
my ($result, $error) = @_;
});
ok $redis->run_event_loop;
ok $redis->run_event_loop;
is $redis->run_event_loop, 0;
}

done_testing;

0 comments on commit dabf5a0

Please sign in to comment.