Skip to content

Commit 8dbd654

Browse files
Polishing.
Fix some minor issues after rebase and update since tags/javadoc/file headers. Reuse options for xpending. Original Pull Request: #3116
1 parent 7c67cc1 commit 8dbd654

15 files changed

+193
-199
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.springframework.data.redis.connection.stream.StreamReadOptions;
5050
import org.springframework.data.redis.connection.stream.StreamRecords;
5151
import org.springframework.util.Assert;
52-
import org.springframework.util.StringUtils;
5352

5453
/**
5554
* Stream-specific Redis commands executed using reactive infrastructure.
@@ -756,7 +755,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?
756755
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
757756
* transaction.
758757
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
759-
* @since 3.5
758+
* @since 4.0
760759
*/
761760
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, Range<?> range, Long count,
762761
Duration minIdleTime) {
@@ -792,7 +791,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<
792791
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
793792
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
794793
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
795-
* @since 3.5
794+
* @since 4.0
796795
*/
797796
default Mono<PendingMessages> xPending(ByteBuffer key, Consumer consumer, Range<?> range, Long count,
798797
Duration minIdleTime) {
@@ -832,7 +831,7 @@ default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String
832831
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
833832
* when used in pipeline / transaction.
834833
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
835-
* @since 3.5
834+
* @since 4.0
836835
*/
837836
default Mono<PendingMessages> xPending(ByteBuffer key, String groupName, String consumerName, Range<?> range,
838837
Long count, Duration minIdleTime) {
@@ -863,13 +862,7 @@ class PendingRecordsCommand extends KeyCommand {
863862
private final String groupName;
864863
private final XPendingOptions options;
865864

866-
private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, @Nullable String consumerName, Range<?> range,
867-
@Nullable Long count, @Nullable Duration minIdleTime) {
868-
869-
this(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
870-
}
871-
872-
private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions options) {
865+
private PendingRecordsCommand(@Nullable ByteBuffer key, String groupName, XPendingOptions options) {
873866

874867
super(key);
875868

@@ -885,7 +878,7 @@ private PendingRecordsCommand(ByteBuffer key, String groupName, XPendingOptions
885878
* @return new instance of {@link PendingRecordsCommand}.
886879
*/
887880
static PendingRecordsCommand pending(ByteBuffer key, String groupName) {
888-
return new PendingRecordsCommand(key, groupName, null, Range.unbounded(), null, null);
881+
return new PendingRecordsCommand(key, groupName, XPendingOptions.unbounded());
889882
}
890883

891884
/**
@@ -900,7 +893,7 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
900893
Assert.notNull(range, "Range must not be null");
901894
Assert.isTrue(count > -1, "Count must not be negative");
902895

903-
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.range(range, count));
896+
return new PendingRecordsCommand(getKey(), groupName, options.withRange(range, count));
904897
}
905898

906899
/**
@@ -910,20 +903,21 @@ public PendingRecordsCommand range(Range<?> range, Long count) {
910903
* @return new instance of {@link PendingRecordsCommand}.
911904
*/
912905
public PendingRecordsCommand consumer(String consumerName) {
913-
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().consumer(consumerName));
906+
return new PendingRecordsCommand(getKey(), groupName, options.consumer(consumerName));
914907
}
915908

916909
/**
917910
* Append given minimum idle time.
918911
*
919912
* @param minIdleTime must not be {@literal null}.
920913
* @return new instance of {@link PendingRecordsCommand}.
914+
* @since 4.0
921915
*/
922916
public PendingRecordsCommand minIdleTime(Duration minIdleTime) {
923917

924918
Assert.notNull(minIdleTime, "Idle must not be null");
925919

926-
return new PendingRecordsCommand(getKey(), groupName, XPendingOptions.unbounded().minIdleTime(minIdleTime));
920+
return new PendingRecordsCommand(getKey(), groupName, options.minIdleTime(minIdleTime));
927921
}
928922

929923
public String getGroupName() {
@@ -963,21 +957,21 @@ public Duration getMinIdleTime() {
963957
* @return {@literal true} if a consumer name is present.
964958
*/
965959
public boolean hasConsumer() {
966-
return StringUtils.hasText(options.getConsumerName());
960+
return options.hasConsumer();
967961
}
968962

969963
/**
970964
* @return {@literal true} count is set.
971965
*/
972966
public boolean isLimited() {
973-
return options.getCount() != null;
967+
return options.isLimited();
974968
}
975969

976970
/**
977971
* @return {@literal true} if idle is set.
978972
*/
979-
public boolean hasIdle() {
980-
return options.getMinIdleTime() != null;
973+
public boolean hasMinIdleTime() {
974+
return options.hasMinIdleTime();
981975
}
982976
}
983977

src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -701,15 +701,14 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName
701701
* @param groupName the name of the {@literal consumer group}. Must not be {@literal null}.
702702
* @param range the range of messages ids to search within. Must not be {@literal null}.
703703
* @param count limit the number of results. Must not be {@literal null}.
704-
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
704+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
705705
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
706706
* transaction.
707707
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
708-
* @since 3.5
708+
* @since 4.0
709709
*/
710-
@Nullable
711-
default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count, Duration minIdleTime) {
712-
return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(minIdleTime));
710+
default PendingMessages xPending(byte[] key, String groupName, Range<?> range, Long count, Duration idle) {
711+
return xPending(key, groupName, XPendingOptions.range(range, count).minIdleTime(idle));
713712
}
714713

715714
/**
@@ -740,9 +739,8 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull Consumer consume
740739
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
741740
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
742741
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
743-
* @since 3.5
742+
* @since 4.0
744743
*/
745-
@Nullable
746744
default PendingMessages xPending(byte[] key, Consumer consumer, Range<?> range, Long count, Duration minIdleTime) {
747745
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
748746
}
@@ -775,17 +773,15 @@ default PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName
775773
* @param consumerName the name of the {@literal consumer}. Must not be {@literal null}.
776774
* @param range the range of messages ids to search within. Must not be {@literal null}.
777775
* @param count limit the number of results. Must not be {@literal null}.
778-
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
776+
* @param idle the minimum idle time to filter pending messages. Must not be {@literal null}.
779777
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
780778
* when used in pipeline / transaction.
781779
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
782-
* @since 3.5
780+
* @since 4.0
783781
*/
784-
@Nullable
785782
default PendingMessages xPending(byte[] key, String groupName, String consumerName, Range<?> range, Long count,
786-
Duration minIdleTime) {
787-
return xPending(key, groupName,
788-
XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(minIdleTime));
783+
Duration idle) {
784+
return xPending(key, groupName, XPendingOptions.range(range, count).consumer(consumerName).minIdleTime(idle));
789785
}
790786

791787
/**
@@ -879,16 +875,21 @@ public XPendingOptions consumer(String consumerName) {
879875
/**
880876
* Append given minimum idle time.
881877
*
882-
* @param minIdleTime must not be {@literal null}.
878+
* @param minIdleTime can be {@literal null} for none.
883879
* @return new instance of {@link XPendingOptions}.
880+
* @since 4.0
884881
*/
885-
public XPendingOptions minIdleTime(Duration minIdleTime) {
882+
public XPendingOptions minIdleTime(@Nullable Duration minIdleTime) {
886883

887884
Assert.notNull(minIdleTime, "Idle must not be null");
888885

889886
return new XPendingOptions(consumerName, range, count, minIdleTime);
890887
}
891888

889+
XPendingOptions withRange(Range<?> range, Long count) {
890+
return new XPendingOptions(consumerName, range, count, minIdleTime);
891+
}
892+
892893
/**
893894
* @return never {@literal null}.
894895
*/
@@ -912,24 +913,12 @@ public Range<?> getRange() {
912913

913914
/**
914915
* @return can be {@literal null}.
916+
* @since 4.0
915917
*/
916-
@Nullable
917-
public Duration getMinIdleTime() {
918+
public @Nullable Duration getMinIdleTime() {
918919
return minIdleTime;
919920
}
920921

921-
/**
922-
* @return can be {@literal null}.
923-
*/
924-
@Nullable
925-
public Long getIdleMillis() {
926-
if (minIdleTime == null) {
927-
return null;
928-
}
929-
930-
return minIdleTime.toMillis();
931-
}
932-
933922
/**
934923
* @return {@literal true} if a consumer name is present.
935924
*/
@@ -947,7 +936,7 @@ public boolean isLimited() {
947936
/**
948937
* @return {@literal true} if idle time is set.
949938
*/
950-
public boolean hasIdle() {
939+
public boolean hasMinIdleTime() {
951940
return minIdleTime != null;
952941
}
953942
}

src/main/java/org/springframework/data/redis/connection/StringRedisConnection.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3201,11 +3201,10 @@ PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNul
32013201
* @return pending messages for the given {@literal consumer} in given {@literal consumer group} or {@literal null}
32023202
* when used in pipeline / transaction.
32033203
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
3204-
* @since 3.5
3204+
* @since 4.0
32053205
*/
3206-
@Nullable
3207-
PendingMessages xPending(String key, String groupName, String consumerName,
3208-
org.springframework.data.domain.Range<String> range, Long count, Duration minIdleTime);
3206+
PendingMessages xPending(@NonNull String key, @NonNull String groupName, @NonNull String consumerName,
3207+
org.springframework.data.domain.@NonNull Range<String> range, @NonNull Long count, @NonNull Duration minIdleTime);
32093208

32103209
/**
32113210
* Obtain detailed information about pending {@link PendingMessage messages} for a given
@@ -3217,11 +3216,10 @@ PendingMessages xPending(String key, String groupName, String consumerName,
32173216
* @param count limit the number of results. Must not be {@literal null}.
32183217
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
32193218
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
3220-
* @since 3.5
3219+
* @since 4.0
32213220
*/
3222-
@Nullable
3223-
default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range<String> range,
3224-
Long count) {
3221+
default PendingMessages xPending(@NonNull String key, @NonNull Consumer consumer, org.springframework.data.domain.@NonNull Range<String> range,
3222+
@NonNull Long count) {
32253223
return xPending(key, consumer.getGroup(), consumer.getName(), range, count);
32263224
}
32273225

@@ -3237,11 +3235,10 @@ default PendingMessages xPending(String key, Consumer consumer, org.springframew
32373235
* @param minIdleTime the minimum idle time to filter pending messages. Must not be {@literal null}.
32383236
* @return pending messages for the given {@link Consumer} or {@literal null} when used in pipeline / transaction.
32393237
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
3240-
* @since 3.5
3238+
* @since 4.0
32413239
*/
3242-
@Nullable
3243-
default PendingMessages xPending(String key, Consumer consumer, org.springframework.data.domain.Range<String> range,
3244-
Long count, Duration minIdleTime) {
3240+
default PendingMessages xPending(@NonNull String key, @NonNull Consumer consumer, org.springframework.data.domain.@NonNull Range<String> range,
3241+
@NonNull Long count, @NonNull Duration minIdleTime) {
32453242
return xPending(key, consumer.getGroup(), consumer.getName(), range, count, minIdleTime);
32463243
}
32473244

@@ -3274,11 +3271,10 @@ PendingMessages xPending(@NonNull String key, @NonNull String groupName,
32743271
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
32753272
* transaction.
32763273
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
3277-
* @since 3.5
3274+
* @since 4.0
32783275
*/
3279-
@Nullable
3280-
PendingMessages xPending(String key, String groupName, org.springframework.data.domain.Range<String> range,
3281-
Long count, Duration minIdleTime);
3276+
PendingMessages xPending(@NonNull String key, @NonNull String groupName, org.springframework.data.domain.@NonNull Range<String> range,
3277+
@NonNull Long count, @NonNull Duration minIdleTime);
32823278

32833279
/**
32843280
* Obtain detailed information about pending {@link PendingMessage messages} applying given {@link XPendingOptions

src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ public PendingMessagesSummary xPending(byte[] key, String groupName) {
262262
}
263263

264264
@Override
265+
@SuppressWarnings("NullAway")
265266
public PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) {
266267

267268
Assert.notNull(key, "Key must not be null");
@@ -272,22 +273,7 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op
272273

273274
try {
274275

275-
@SuppressWarnings("all")
276-
XPendingParams pendingParams = new XPendingParams(
277-
JedisConverters.toBytes(StreamConverters.getLowerValue(range)),
278-
JedisConverters.toBytes(StreamConverters.getUpperValue(range)),
279-
options.getCount().intValue());
280-
281-
String consumerName = options.getConsumerName();
282-
283-
if (StringUtils.hasText(consumerName)) {
284-
pendingParams = pendingParams.consumer(consumerName);
285-
}
286-
287-
if (options.hasIdle()) {
288-
pendingParams = pendingParams.idle(options.getIdleMillis());
289-
}
290-
276+
XPendingParams pendingParams = StreamConverters.toXPendingParams(options);
291277
List<Object> response = connection.getCluster().xpending(key, group, pendingParams);
292278

293279
return StreamConverters.toPendingMessages(groupName, range,

src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ public static XReadGroupParams toXReadGroupParams(StreamReadOptions readOptions)
304304

305305
}
306306

307+
@SuppressWarnings("NullAway")
307308
public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOptions options) {
308309

309310
Range<String> range = (Range<String>) options.getRange();
@@ -313,8 +314,8 @@ public static XPendingParams toXPendingParams(RedisStreamCommands.XPendingOption
313314
if (options.hasConsumer()) {
314315
xPendingParams.consumer(options.getConsumerName());
315316
}
316-
if (options.hasIdle()) {
317-
xPendingParams.idle(options.getIdleMillis());
317+
if (options.hasMinIdleTime()) {
318+
xPendingParams.idle(options.getMinIdleTime().toMillis());
318319
}
319320

320321
return xPendingParams;

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
246246
ByteUtils.getByteBuffer(command.getConsumerName()));
247247
xPendingArgs.consumer(consumer);
248248
}
249-
if (command.hasIdle()) {
249+
if (command.hasMinIdleTime()) {
250250
xPendingArgs.idle(command.getMinIdleTime());
251251
}
252252

@@ -344,7 +344,7 @@ public Flux<CommandResponse<XInfoCommand, Flux<XInfoConsumer>>> xInfoConsumers(P
344344

345345
Assert.notNull(command.getKey(), "Key must not be null");
346346
Assert.notNull(command.getGroupName(), "Command.getGroupName() must not be null");
347-
347+
348348
ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
349349
return new CommandResponse<>(command, cmd.xinfoConsumers(command.getKey(), groupName)
350350
.map(it -> new XInfoConsumer(command.getGroupName(), (List<Object>) it)));

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public PendingMessages xPending(byte @NonNull [] key, @NonNull String groupName,
232232
LettuceConverters.toBytes(options.getConsumerName()));
233233
xPendingArgs.consumer(consumer);
234234
}
235-
if (options.hasIdle()) {
235+
if (options.hasMinIdleTime()) {
236236
xPendingArgs.idle(options.getMinIdleTime());
237237
}
238238

src/main/java/org/springframework/data/redis/core/StreamOperations.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,9 @@ default PendingMessages pending(@NonNull K key, @NonNull Consumer consumer) {
363363
* @return pending messages for the given {@literal consumer group} or {@literal null} when used in pipeline /
364364
* transaction.
365365
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
366-
* @since 3.5
366+
* @since 4.0
367367
*/
368-
PendingMessages pending(K key, String group, Range<?> range, long count, Duration minIdleTime);
368+
PendingMessages pending(@NonNull K key, @NonNull String group, @NonNull Range<?> range, long count, @NonNull Duration minIdleTime);
369369

370370
/**
371371
* Obtain detailed information about pending {@link PendingMessage messages} for a given {@link Range} and
@@ -394,7 +394,7 @@ default PendingMessages pending(@NonNull K key, @NonNull Consumer consumer) {
394394
* @see <a href="https://redis.io/commands/xpending">Redis Documentation: xpending</a>
395395
* @since 3.5
396396
*/
397-
PendingMessages pending(K key, Consumer consumer, Range<?> range, long count, Duration minIdleTime);
397+
PendingMessages pending(@NonNull K key, @NonNull Consumer consumer, @NonNull Range<?> range, long count, @NonNull Duration minIdleTime);
398398

399399
/**
400400
* Get the length of a stream.

0 commit comments

Comments
 (0)