-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventSource.js
125 lines (117 loc) · 3.69 KB
/
EventSource.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/* UMD.define */ (typeof define=="function"&&define||function(d,f,m){m={module:module,require:require};module.exports=f.apply(null,d.map(function(n){return m[n]||require(n)}))})
(["./Micro"], function(Micro){
"use strict";
function Value(x){ this.x = x; }
function ErrorValue(x){ this.x = x; }
function Stop(x){ this.x = x; }
function EventSource(micro){
this.micro = micro instanceof Micro ? micro : new Micro();
this.sink = this.micro.sink;
this.micro.sink = makeSink(this);
if(!micro){
this.micro.callback = makeDefaultCallback(this.micro);
}
}
EventSource.noValue = Micro.noValue;
EventSource.Value = Value;
EventSource.ErrorValue = ErrorValue;
EventSource.Stop = Stop;
EventSource.makeMultiplexer = makeMultiplexer;
EventSource.prototype = {
declaredClass: "events/EventSource",
on: function on(channelName, callback, errback, stopback){
if(typeof channelName != "string"){
stopback = errback, errback = callback,
callback = channelName, channelName = "default";
}
if(callback instanceof EventSource){
this.micro.on(channelName, callback.micro);
return callback;
}
var es = new EventSource(this.micro.on(channelName));
es.micro.callback = EventSource.makeMultiplexer(es, callback, errback, stopback);
return es;
},
release: function release(){
this.micro.release();
var channels = this.micro.channels, value = new Stop(this), ch;
this.micro.send(value, true);
for(ch in channels){
if(ch != "default" && channels.hasOwnProperty(ch)){
this.sink.send(ch, value, true);
}
}
},
// pipe-like methods
forEach: function forEach(f){
return this.on(function(value){ f(value); return value; });
},
map: function map(f){
return this.on(f);
},
filter: function filter(f){
return this.on(function(value, sink){ return f(value) ? value : sink.noValue; });
},
scan: function scan(f, init){
// the next assignment is intentional
return this.on(function(value){ return init = f(init, value); });
}
};
return EventSource;
function makeMultiplexer(source, callback, errback, stopback){
callback = typeof callback == "function" && callback;
errback = typeof errback == "function" && errback;
stopback = typeof stopback == "function" && stopback;
return function(val, sink){
try{
if(callback && val instanceof Value){
val = callback(val.x, sink);
val = val === Micro.noValue ? val : new Value(val);
}else if(errback && val instanceof ErrorValue){
val = errback(val.x, sink);
val = val === Micro.noValue ? val : new Value(val);
}else if(val instanceof Stop){
source.micro.release();
if(stopback){
val = stopback(val.x, sink);
val = val === Micro.noValue ? val : new Stop(val);
}
}
}catch(e){
val = new ErrorValue(e);
}
return val;
};
}
function makeSink(source){
return {
send: function send(value){
return source.sink.send("default", new Value(value));
},
sendError: function sendError(value){
return source.sink.send("default", new ErrorValue(value));
},
stop: function stop(value){
return source.sink.send("default", new Stop(value), true);
},
sendToChannel: function sendToChannel(channelName, value){
return source.sink.send(channelName, new Value(value));
},
sendErrorToChannel: function sendErrorToChannel(channelName, value){
return source.sink.send(channelName, new ErrorValue(value));
},
stopChannel: function stopChannel(channelName, value){
return source.sink.send(channelName, new Stop(value), true);
},
noValue: EventSource.noValue
}
}
function makeDefaultCallback(micro){
return function(val){
if(val instanceof Stop){
micro.release();
}
return val;
};
}
});