forked from asticode/go-astits
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpacket_pool.go
More file actions
101 lines (86 loc) · 2.41 KB
/
packet_pool.go
File metadata and controls
101 lines (86 loc) · 2.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package astits
import (
"sort"
"sync"
)
// PacketPool represents a pool of packets
type PacketPool struct {
b map[uint16][]*Packet // Indexed by PID
m *sync.Mutex
}
// NewPacketPool creates a new packet pool
func NewPacketPool() *PacketPool {
return &PacketPool{
b: make(map[uint16][]*Packet),
m: &sync.Mutex{},
}
}
// Add adds a new packet to the pool
func (b *PacketPool) Add(p *Packet) (ps []*Packet) {
// Throw away packet if error indicator
if p.Header.TransportErrorIndicator {
return
}
// Throw away packets that don't have a payload until we figure out what we're going to do with them
// TODO figure out what we're going to do with them :D
if !p.Header.HasPayload {
return
}
// Lock
b.m.Lock()
defer b.m.Unlock()
// Init buffer
var mps []*Packet
var ok bool
if mps, ok = b.b[p.Header.PID]; !ok {
mps = []*Packet{}
}
// Empty buffer if we detect a discontinuity
if hasDiscontinuity(mps, p) {
mps = []*Packet{}
}
// Throw away packet if it's the same as the previous one
if isSameAsPrevious(mps, p) {
return
}
// Add packet
if len(mps) > 0 || (len(mps) == 0 && p.Header.PayloadUnitStartIndicator) {
mps = append(mps, p)
}
// Check payload unit start indicator
if p.Header.PayloadUnitStartIndicator && len(mps) > 1 {
ps = mps[:len(mps)-1]
mps = []*Packet{p}
}
// Assign
b.b[p.Header.PID] = mps
return
}
// dump dumps the packet pool by looking for the first item with packets inside
func (b *PacketPool) dump() (ps []*Packet) {
b.m.Lock()
defer b.m.Unlock()
var keys []int
for k := range b.b {
keys = append(keys, int(k))
}
sort.Ints(keys)
for _, k := range keys {
ps = b.b[uint16(k)]
delete(b.b, uint16(k))
if len(ps) > 0 {
return
}
}
return
}
// hasDiscontinuity checks whether a packet is discontinuous with a set of packets
func hasDiscontinuity(ps []*Packet, p *Packet) bool {
return (p.Header.HasAdaptationField && p.AdaptationField.DiscontinuityIndicator) ||
(len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter != (ps[len(ps)-1].Header.ContinuityCounter+1)%16) ||
(len(ps) > 0 && !p.Header.HasPayload && p.Header.ContinuityCounter != ps[len(ps)-1].Header.ContinuityCounter)
}
// isSameAsPrevious checks whether a packet is the same as the last packet of a set of packets
func isSameAsPrevious(ps []*Packet, p *Packet) bool {
return len(ps) > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[len(ps)-1].Header.ContinuityCounter
}