Skip to content

Commit 23205fb

Browse files
authored
Merge pull request #32 from nera0/master
Add BatchMessageListener interface and enrich the Producer and Consumer interface
2 parents 0f426a5 + 60f8307 commit 23205fb

File tree

4 files changed

+154
-0
lines changed

4 files changed

+154
-0
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package io.openmessaging;
20+
21+
import java.util.List;
22+
23+
public interface BatchMessage {
24+
/**
25+
* @return all messages in this {@code BatchMessage}
26+
*/
27+
List<Message> messages();
28+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.openmessaging.consumer;
19+
20+
import io.openmessaging.BatchMessage;
21+
import io.openmessaging.Message;
22+
import io.openmessaging.exception.OMSRuntimeException;
23+
24+
/**
25+
* A message listener can implement this {@code BathMessageListener} interface and register itself to a consumer instance
26+
* to asynchronously receive messages.
27+
*
28+
* @version OMS 1.0.0
29+
* @since OMS 1.0.0
30+
*/
31+
public interface BatchMessageListener {
32+
/**
33+
* Callback method to receive incoming messages.
34+
* <p>
35+
* A message listener should handle different types of {@code BatchMessage}.
36+
*
37+
* @param batchMessage the received batchMessage.
38+
*/
39+
void onReceived(BatchMessage batchMessage, Context context);
40+
41+
42+
interface Context {
43+
/**
44+
* Acknowledges the specified and consumed message, which is related to this {@code MessageContext}.
45+
* <p>
46+
* Messages that have been received but not acknowledged may be redelivered.
47+
*
48+
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
49+
*/
50+
void success(Message... messages);
51+
/**
52+
* Acknowledges messages, which is related to this {@code MessageContext}.
53+
* <p>
54+
*
55+
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
56+
*/
57+
void ack();
58+
}
59+
}

openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import io.openmessaging.exception.OMSTimeOutException;
2727
import io.openmessaging.interceptor.ConsumerInterceptor;
2828

29+
import java.util.List;
30+
2931
/**
3032
* A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from MOM server to {@code
3133
* PushConsumer} client.
@@ -109,6 +111,20 @@ public interface Consumer extends ServiceLifecycle {
109111
*/
110112
void bindQueue(String queueName, MessageListener listener);
111113

114+
/**
115+
* Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}.
116+
* <p>
117+
* {@link BatchMessageListener#onReceived(List<Message>, BatchMessageListener.Context)} will be called when new delivered messages is
118+
* coming.
119+
*
120+
* @param queueName a specified queue.
121+
* @param listener a specified listener to receive new messages.
122+
* @throws OMSSecurityException when have no authority to bind to this queue.
123+
* @throws OMSDestinationException when have no given destination in the server.
124+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
125+
*/
126+
void bindQueue(String queueName, BatchMessageListener listener);
127+
112128
/**
113129
* Unbind the {@code Consumer} from a specified queue.
114130
* <p>
@@ -118,6 +134,20 @@ public interface Consumer extends ServiceLifecycle {
118134
*/
119135
void unbindQueue(String queueName);
120136

137+
/**
138+
* This method is used to find out whether the {@code Consumer} in bind queue.
139+
*
140+
* @return true if this {@code Consumer} is bind, false otherwise.
141+
*/
142+
boolean isBindQueue();
143+
144+
/**
145+
* This method is used to find out the queue bind to {@code Consumer}.
146+
*
147+
* @return the queue this consumer is bind, or null if the consumer is not bind queue.
148+
*/
149+
String getBindQueue();
150+
121151
/**
122152
* Adds a {@code ConsumerInterceptor} instance to this consumer.
123153
*
@@ -146,6 +176,20 @@ public interface Consumer extends ServiceLifecycle {
146176
*/
147177
Message receive(long timeout);
148178

179+
/**
180+
* Receives the next batch messages from the bind queues of this consumer in pull model.
181+
* <p>
182+
* This call blocks indefinitely until the messages is arrives, the timeout expires, or until this {@code PullConsumer}
183+
* is shut down.
184+
*
185+
* @param timeout receive messages will blocked at most <code>timeout</code> milliseconds.
186+
* @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down.
187+
* @throws OMSSecurityException when have no authority to receive messages from this queue.
188+
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
189+
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
190+
*/
191+
List<Message> batchReceive(long timeout);
192+
149193
/**
150194
* Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using
151195
* manual commit.

openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
9696
*/
9797
void send(List<Message> messages);
9898

99+
/**
100+
* Send messages to the specified destination asynchronously, the destination should be preset to {@link
101+
* Message#headers()}, other header fields as well.
102+
* <p>
103+
* The returned {@code Promise} will have the result once the operation completes, and the registered {@code
104+
* FutureListener} will be notified, either because the operation was successful or because of an error.
105+
*
106+
* @param messages a batch messages will be sent.
107+
* @return the {@code Promise} of an asynchronous messages send operation.
108+
* @see Future
109+
* @see FutureListener
110+
*/
111+
Future<List<SendResult>> sendAsync(List<Message> messages);
112+
113+
/**
114+
* <p>
115+
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't care about the
116+
* send result and also have no context to get the result.
117+
*
118+
* @param messages a batch message will be sent.
119+
*/
120+
void sendOneway(List<Message> messages);
121+
99122
/**
100123
* Adds a {@code ProducerInterceptor} to intercept send operations of producer.
101124
*

0 commit comments

Comments
 (0)