5
5
'use strict' ;
6
6
7
7
var defs = require ( './defs' ) ;
8
- var Promise = require ( 'bluebird' ) ;
8
+ var Bluebird = require ( 'bluebird' ) ;
9
9
var inherits = require ( 'util' ) . inherits ;
10
10
var EventEmitter = require ( 'events' ) . EventEmitter ;
11
11
var BaseChannel = require ( './channel' ) . BaseChannel ;
@@ -29,7 +29,17 @@ module.exports.ChannelModel = ChannelModel;
29
29
var CM = ChannelModel . prototype ;
30
30
31
31
CM . close = function ( ) {
32
- return Promise . fromCallback ( this . connection . close . bind ( this . connection ) ) ;
32
+ var close = this . connection . close . bind ( this . connection ) ;
33
+
34
+ return new Promise ( function ( resolve , reject ) {
35
+ close ( function ( err , result ) {
36
+ if ( err ) {
37
+ reject ( err ) ;
38
+ } else {
39
+ resolve ( result ) ;
40
+ }
41
+ } ) ;
42
+ } ) ;
33
43
} ;
34
44
35
45
// Channels
@@ -55,17 +65,31 @@ var C = Channel.prototype;
55
65
// API procedures.
56
66
C . rpc = function ( method , fields , expect ) {
57
67
var self = this ;
58
- return Promise . fromCallback ( function ( cb ) {
59
- return self . _rpc ( method , fields , expect , cb ) ;
60
- } )
61
- . then ( function ( f ) {
68
+
69
+ return new Promise ( function ( resolve , reject ) {
70
+ self . _rpc ( method , fields , expect , function ( err , result ) {
71
+ if ( err ) {
72
+ reject ( err ) ;
73
+ } else {
74
+ resolve ( result ) ;
75
+ }
76
+ } ) ;
77
+ } ) . then ( function ( f ) {
62
78
return f . fields ;
63
79
} ) ;
64
80
} ;
65
81
66
82
// Do the remarkably simple channel open handshake
67
83
C . open = function ( ) {
68
- return Promise . try ( this . allocate . bind ( this ) ) . then (
84
+ var allocate = this . allocate . bind ( this ) ;
85
+
86
+ return new Promise ( function ( resolve , reject ) {
87
+ try {
88
+ resolve ( allocate ( ) ) ;
89
+ } catch ( e ) {
90
+ reject ( e ) ;
91
+ }
92
+ } ) . then (
69
93
function ( ch ) {
70
94
return ch . rpc ( defs . ChannelOpen , { outOfBand : "" } ,
71
95
defs . ChannelOpenOk ) ;
@@ -74,10 +98,16 @@ C.open = function() {
74
98
75
99
C . close = function ( ) {
76
100
var self = this ;
77
- return Promise . fromCallback ( function ( cb ) {
101
+
102
+ return Bluebird . fromCallback ( function ( cb ) {
78
103
return self . closeBecause ( "Goodbye" , defs . constants . REPLY_SUCCESS ,
79
104
cb ) ;
80
105
} ) ;
106
+ // return new Promise(function(resolve, reject) {
107
+ // return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
108
+ // (err, result) => err ? reject(err) : resolve(result)
109
+ // );
110
+ // });
81
111
} ;
82
112
83
113
// === Public API, declaring queues and stuff ===
@@ -167,9 +197,18 @@ C.consume = function(queue, callback, options) {
167
197
// NB we want the callback to be run synchronously, so that we've
168
198
// registered the consumerTag before any messages can arrive.
169
199
var fields = Args . consume ( queue , options ) ;
170
- return Promise . fromCallback ( function ( cb ) {
200
+ return Bluebird . fromCallback ( function ( cb ) {
171
201
self . _rpc ( defs . BasicConsume , fields , defs . BasicConsumeOk , cb ) ;
172
202
} )
203
+ // return new Promise(function(resolve, reject) {
204
+ // self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, function (err, result) {
205
+ // if (err) {
206
+ // reject(err);
207
+ // } else {
208
+ // resolve(result);
209
+ // }
210
+ // });
211
+ // })
173
212
. then ( function ( ok ) {
174
213
self . registerConsumer ( ok . fields . consumerTag , callback ) ;
175
214
return ok . fields ;
@@ -178,10 +217,16 @@ C.consume = function(queue, callback, options) {
178
217
179
218
C . cancel = function ( consumerTag ) {
180
219
var self = this ;
181
- return Promise . fromCallback ( function ( cb ) {
220
+ return new Promise ( function ( resolve , reject ) {
182
221
self . _rpc ( defs . BasicCancel , Args . cancel ( consumerTag ) ,
183
222
defs . BasicCancelOk ,
184
- cb ) ;
223
+ function ( err , result ) {
224
+ if ( err ) {
225
+ reject ( err ) ;
226
+ } else {
227
+ resolve ( result ) ;
228
+ }
229
+ } ) ;
185
230
} )
186
231
. then ( function ( ok ) {
187
232
self . unregisterConsumer ( consumerTag ) ;
@@ -192,9 +237,18 @@ C.cancel = function(consumerTag) {
192
237
C . get = function ( queue , options ) {
193
238
var self = this ;
194
239
var fields = Args . get ( queue , options ) ;
195
- return Promise . fromCallback ( function ( cb ) {
240
+ return Bluebird . fromCallback ( function ( cb ) {
196
241
return self . sendOrEnqueue ( defs . BasicGet , fields , cb ) ;
197
242
} )
243
+ // return new Promise(function(resolve, reject) {
244
+ // return self.sendOrEnqueue(defs.BasicGet, fields, function (err, result) {
245
+ // if (err) {
246
+ // reject(err);
247
+ // } else {
248
+ // resolve(result);
249
+ // }
250
+ // });
251
+ // })
198
252
. then ( function ( f ) {
199
253
if ( f . id === defs . BasicGetEmpty ) {
200
254
return false ;
0 commit comments