Skip to content

Commit 5d76423

Browse files
committed
#7 Add filter() support for channels
1 parent 81a3344 commit 5d76423

File tree

6 files changed

+361
-8
lines changed

6 files changed

+361
-8
lines changed

README.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ Since Scala is a functional language, this implementation of channels supports f
348348
val chInt = Channel.make[Int](2)
349349

350350
// The channel of strings is the result of mapping the channel of integer
351-
val chString = ch1.map(v => v.toString)
351+
val chString = chInt.map(v => v.toString)
352352

353353
// Send some integers
354354
chInt.send(1)
@@ -360,6 +360,26 @@ val s1: String = chString.recv()
360360
val s2: String = chString.recv()
361361
```
362362

363+
#### filter()
364+
365+
```scala
366+
// Creating a channel of integers
367+
val ch1 = Channel.make[Int](3)
368+
369+
// Filter the original channel
370+
val chFiltered = ch1.filter(v => v != 2)
371+
372+
// Send some integers
373+
ch1.send(1)
374+
ch1.send(2)
375+
ch1.send(3)
376+
ch1.close()
377+
378+
// Receive filtered values
379+
val v1 = chFiltered.recv() // 1
380+
val v2 = chFiltered.recv() // 3
381+
```
382+
363383
## Reference
364384

365385
*ToDo*

src/main/scala/com/github/yruslan/channel/Channel.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,6 @@ abstract class Channel[T] extends ReadChannel[T] with WriteChannel[T] {
102102
}
103103
}
104104

105-
final override def map[U](f: T => U): ReadChannel[U] = {
106-
new ChannelDecoratorMap[T, U](this, f)
107-
}
108-
109105
protected def fetchValueOpt(): Option[T]
110106

111107
final override def sender(value: T) (action: => Unit = {}): Selector = {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* The MIT License (MIT)
3+
*
4+
* Copyright (c) 2020 Ruslan Yushchenko
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*
24+
* For more information, please refer to <http://opensource.org/licenses/MIT>
25+
*/
26+
27+
package com.github.yruslan.channel
28+
29+
import com.github.yruslan.channel.impl.Selector
30+
31+
import java.time.Instant
32+
import java.time.Instant.now
33+
import scala.concurrent.duration.{Duration, MILLISECONDS}
34+
35+
class ChannelDecoratorFilter[T](inputChannel: ReadChannel[T], pred: T => Boolean) extends ChannelDecorator[T](inputChannel) with ReadChannel[T] {
36+
override def recv(): T = {
37+
var v = inputChannel.recv()
38+
var found = pred(v)
39+
40+
while (!found) {
41+
v = inputChannel.recv()
42+
found = pred(v)
43+
}
44+
v
45+
}
46+
47+
override def tryRecv(): Option[T] = {
48+
var valueOpt = inputChannel.tryRecv()
49+
var found = valueOpt.isEmpty || valueOpt.forall(v => pred(v))
50+
51+
while (!found) {
52+
valueOpt = inputChannel.tryRecv()
53+
found = valueOpt.isEmpty || valueOpt.forall(v => pred(v))
54+
}
55+
valueOpt
56+
}
57+
58+
override def tryRecv(timeout: Duration): Option[T] = {
59+
if (timeout == Duration.Zero) {
60+
return tryRecv()
61+
}
62+
63+
val timeoutMilli = if (timeout.isFinite) timeout.toMillis else 0L
64+
val startInstant = Instant.now()
65+
var valueOpt = inputChannel.tryRecv(timeout)
66+
var found = valueOpt.isEmpty || valueOpt.forall(v => pred(v))
67+
var elapsedTime = java.time.Duration.between(startInstant, now).toMillis
68+
69+
if (found || elapsedTime >= timeoutMilli) {
70+
valueOpt
71+
} else {
72+
while (!found && elapsedTime < timeoutMilli) {
73+
val newTimeout = Duration(timeoutMilli - elapsedTime, MILLISECONDS)
74+
valueOpt = inputChannel.tryRecv(newTimeout)
75+
found = valueOpt.isEmpty || valueOpt.forall(v => pred(v))
76+
elapsedTime = java.time.Duration.between(startInstant, now).toMillis
77+
}
78+
valueOpt
79+
}
80+
}
81+
82+
override def recver(action: T => Unit): Selector = inputChannel.recver(t => if (pred(t)) action(t))
83+
84+
override def fornew(action: T => Unit): Unit = inputChannel.fornew(t => if (pred(t)) action(t))
85+
86+
override def foreach(action: T => Unit): Unit = inputChannel.foreach(t => if (pred(t)) action(t))
87+
}

src/main/scala/com/github/yruslan/channel/ChannelDecoratorMap.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,4 @@ class ChannelDecoratorMap[T, U](inputChannel: ReadChannel[T], f: T => U) extends
4141
override def fornew(action: U => Unit): Unit = inputChannel.fornew(t => action(f(t)))
4242

4343
override def foreach(action: U => Unit): Unit = inputChannel.foreach(t => action(f(t)))
44-
45-
override def map[K](mapFunction: U => K): ReadChannel[K] = new ChannelDecoratorMap[U, K](this, mapFunction)
4644
}

src/main/scala/com/github/yruslan/channel/ReadChannel.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,6 @@ trait ReadChannel[T] extends ChannelLike {
4040
def fornew(f: T => Unit): Unit
4141
def foreach(f: T => Unit): Unit
4242

43-
def map[U](f: T => U): ReadChannel[U]
43+
def map[U](f: T => U): ReadChannel[U] = new ChannelDecoratorMap[T, U](this, f)
44+
def filter(f: T => Boolean): ReadChannel[T] = new ChannelDecoratorFilter[T](this, f)
4445
}

0 commit comments

Comments
 (0)