Skip to content

Commit 7c67cc1

Browse files
whatasamechristophstrobl
authored andcommitted
Add IDLE argument to XPENDING command.
Closes: #2046 Original Pull Request: #3116 Signed-off-by: Jeonggyu Choi <[email protected]>
1 parent 46f63a1 commit 7c67cc1

15 files changed

+893
-42
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
* @author ihaohong
8282
* @author Dennis Neufeld
8383
* @author Shyngys Sapraliyev
84+
* @author Jeonggyu Choi
8485
*/
8586
@NullUnmarked
8687
@SuppressWarnings({ "ConstantConditions", "deprecation" })
@@ -2921,12 +2922,26 @@ public PendingMessages xPending(String key, String groupName, String consumer,
29212922
Converters.identityConverter());
29222923
}
29232924

2925+
@Override
2926+
public PendingMessages xPending(String key, String groupName, String consumerName,
2927+
org.springframework.data.domain.Range<String> range, Long count, Duration minIdleTime) {
2928+
return convertAndReturn(delegate.xPending(serialize(key), groupName, consumerName, range, count, minIdleTime),
2929+
Converters.identityConverter());
2930+
}
2931+
29242932
@Override
29252933
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
29262934
Long count) {
29272935
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count), Converters.identityConverter());
29282936
}
29292937

2938+
@Override
2939+
public PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
2940+
Long count, Duration minIdleTime) {
2941+
return convertAndReturn(delegate.xPending(serialize(key), groupName, range, count, minIdleTime),
2942+
Converters.identityConverter());
2943+
}
2944+
29302945
@Override
29312946
public PendingMessages xPending(String key, String groupName, XPendingOptions options) {
29322947
return convertAndReturn(delegate.xPending(serialize(key), groupName, options), Converters.identityConverter());

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 106 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
* @author Dengliming
6161
* @author Mark John Moreno
6262
* @author jinkshower
63+
* @author Jeonggyu Choi
6364
* @since 2.2
6465
*/
6566
public interface ReactiveStreamCommands {
@@ -743,6 +744,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
743744
.map(CommandResponse::getOutput);
744745
}
745746

747+
/**
748+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} within a
749+
* {@literal consumer group} and over a given {@link Duration} of idle time.
750+
*
751+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
752+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
753+
* @param range the range of messages ids to search within. Must not be {@literal null}.
754+
* @param count limit the number of results. Must not be {@literal null}.
755+
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
756+
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
757+
* transaction.
758+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
759+
* @since 3.5
760+
*/
761+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count,
762+
Duration minIdleTime) {
763+
return xPending(
764+
Mono.just(PendingRecordsCommand.pending(key, groupName).range(range, count).minIdleTime(minIdleTime))).next()
765+
.map(CommandResponse::getOutput);
766+
}
767+
746768
/**
747769
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
748770
* {@link Consumer} within a {@literal consumer group}.
@@ -759,6 +781,24 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
759781
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
760782
}
761783

784+
/**
785+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
786+
* {@link Consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
787+
*
788+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
789+
* @param consumer the name of the {@link Consumer}. Must not be {@literal null}.
790+
* @param range the range of messages ids to search within. Must not be {@literal null}.
791+
* @param count limit the number of results. Must not be {@literal null}.
792+
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
793+
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
794+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
795+
* @since 3.5
796+
*/
797+
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count,
798+
Duration minIdleTime) {
799+
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
800+
}
801+
762802
/**
763803
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
764804
* {@literal consumer} within a {@literal consumer group}.
@@ -779,6 +819,27 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
779819
.next().map(CommandResponse::getOutput);
780820
}
781821

822+
/**
823+
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
824+
* {@literal consumer} within a {@literal consumer group} and over a given {@link Duration} of idle time.
825+
*
826+
* @param key the {@literal key} the stream is stored at. Must not be {@literal null}.
827+
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
828+
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
829+
* @param range the range of messages ids to search within. Must not be {@literal null}.
830+
* @param count limit the number of results. Must not be {@literal null}.
831+
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
832+
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
833+
* when used in pipeline / transaction.
834+
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
835+
* @since 3.5
836+
*/
837+
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
838+
Long count, Duration minIdleTime) {
839+
return xPending(Mono.just(PendingRecordsCommand.pending(key, groupName).consumer(consumerName).range(range, count)
840+
.minIdleTime(minIdleTime))).next().map(CommandResponse::getOutput);
841+
}
842+
782843
/**
783844
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions
784845
* options}.
@@ -794,24 +855,26 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
794855
* Value Object holding parameters for obtaining pending messages.
795856
*
796857
* @author Christoph Strobl
858+
* @author Jeonggyu Choi
797859
* @since 2.3
798860
*/
799861
class PendingRecordsCommand extends KeyCommand {
800862

801863
private final String groupName;
802-
private final @Nullable String consumerName;
803-
private final Range<?> range;
804-
private final @Nullable Long count;
864+
private final XPendingOptions options;
805865

806866
private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
807-
@Nullable Long count) {
867+
@Nullable Long count, @Nullable Duration minIdleTime) {
868+
869+
this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
870+
}
871+
872+
private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) {
808873

809874
super(key);
810875

811876
this.groupName = groupName;
812-
this.consumerName = consumerName;
813-
this.range = range;
814-
this.count = count;
877+
this.options = options;
815878
}
816879

817880
/**
@@ -822,7 +885,7 @@ private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nulla
822885
* @return new instance of {@link PendingRecordsCommand}.
823886
*/
824887
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
825-
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null);
888+
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
826889
}
827890

828891
/**
@@ -837,7 +900,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
837900
Assert.notNull(range, "Range must not be null");
838901
Assert.isTrue(count > -1, "Count must not be negative");
839902

840-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
903+
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count));
841904
}
842905

843906
/**
@@ -847,7 +910,20 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
847910
* @return new instance of {@link PendingRecordsCommand}.
848911
*/
849912
public PendingRecordsCommand consumer(String consumerName) {
850-
return new PendingRecordsCommand(getKey(), groupName, consumerName, range, count);
913+
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName));
914+
}
915+
916+
/**
917+
* Append given minimum idle time.
918+
*
919+
* @param minIdleTime must not be {@literal null}.
920+
* @return new instance of {@link PendingRecordsCommand}.
921+
*/
922+
public PendingRecordsCommand minIdleTime(Duration minIdleTime) {
923+
924+
Assert.notNull(minIdleTime, "Idle must not be null");
925+
926+
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime));
851927
}
852928

853929
public String getGroupName() {
@@ -858,35 +934,50 @@ public String getGroupName() {
858934
* @return can be {@literal null}.
859935
*/
860936
public @Nullable String getConsumerName() {
861-
return consumerName;
937+
return options.getConsumerName();
862938
}
863939

864940
/**
865941
* @return never {@literal null}.
866942
*/
867943
public Range<?> getRange() {
868-
return range;
944+
return options.getRange();
869945
}
870946

871947
/**
872948
* @return can be {@literal null}.
873949
*/
874950
public @Nullable Long getCount() {
875-
return count;
951+
return options.getCount();
952+
}
953+
954+
/**
955+
* @return can be {@literal null}.
956+
*/
957+
@Nullable
958+
public Duration getMinIdleTime() {
959+
return options.getMinIdleTime();
876960
}
877961

878962
/**
879963
* @return {@literal true} if a consumer name is present.
880964
*/
881965
public boolean hasConsumer() {
882-
return StringUtils.hasText(consumerName);
966+
return StringUtils.hasText(options.getConsumerName());
883967
}
884968

885969
/**
886970
* @return {@literal true} count is set.
887971
*/
888972
public boolean isLimited() {
889-
return count != null;
973+
return options.getCount() != null;
974+
}
975+
976+
/**
977+
* @return {@literal true} if idle is set.
978+
*/
979+
public boolean hasIdle() {
980+
return options.getMinIdleTime() != null;
890981
}
891982
}
892983

0 commit comments

Comments
 (0)