@@ -113,16 +113,21 @@ def __init__(self, hosts_and_ports, qpath, cert, key, dest=None, listen=None,
113113
114114 # create the filesystem queues for accepted and rejected messages
115115 if dest is not None and listen is None :
116+ outqpath = os .path .join (qpath , 'outgoing' )
117+ rejectqpath = os .path .join (qpath , 'reject' )
118+
116119 # Determine what sort of outgoing structure to make
117120 if path_type == 'dirq' :
118121 if QueueSimple is None :
119122 raise ImportError ("dirq path_type requested but the dirq "
120123 "module wasn't found." )
121124
122- self ._outq = QueueSimple (qpath )
125+ self ._outq = QueueSimple (outqpath )
126+ self ._rejectq = QueueSimple (rejectqpath )
123127
124128 elif path_type == 'directory' :
125- self ._outq = MessageDirectory (qpath )
129+ self ._outq = MessageDirectory (outqpath )
130+ self ._rejectq = MessageDirectory (rejectqpath )
126131 else :
127132 raise Ssm2Exception ('Unsupported path_type variable.' )
128133
@@ -499,8 +504,17 @@ def send_all(self):
499504 if "Message size is too large" not in str (e ):
500505 raise
501506 else :
502- # Exit out of loop iteration so that message is not removed.
503507 log .warn ('Message %s could not be sent as its larger than 1MB' , msgid )
508+
509+ # Add the message to the rejected queue
510+ name = self ._rejectq .add (text )
511+ log .info ("Message %s saved to reject queue as %s" , msgid , name )
512+
513+ # Remove the message from the outgoing queue
514+ self ._last_msg = None
515+ self ._outq .remove (msgid )
516+
517+ # Exit out of loop iteration so that message is not removed.
504518 continue
505519
506520 else :
0 commit comments