diff --git a/README.MD b/README.MD index 50f09b3..8e90147 100644 --- a/README.MD +++ b/README.MD @@ -51,12 +51,12 @@ go build ### Dependencies -You need to have portaudio-dev, pulseaudio and lib-fdk-aac and go runtime installed to build this program +You need to have portaudio-dev, pulseaudio, libsamplerate and lib-fdk-aac and go runtime installed to build this program #### Ubuntu 21.4 build ````shell -sudo apt install golang-go libfdk-aac-dev +sudo apt install golang-go libfdk-aac-dev libsamplerate-dev ```` #### Rasbpian (buster) @@ -71,12 +71,27 @@ apt-get install libfdk-aac-dev You also need to install go from https://golang.org/dl/ as version 1.16 is not supplied by buster (they only support 1.11) +##### Raspberry 1 (arm6l) + +You can not rely on deb-multimedia, they only ship armv7 binary + +You will have to compile manually lib-fdk-aac + +```` +git clone --depth 1 https://github.com/mstorsjo/fdk-aac.git ~/ffmpeg-libraries/fdk-aac \ + && cd ~/ffmpeg-libraries/fdk-aac \ + && autoreconf -fiv \ + && ./configure \ + && make -j$(nproc) \ + && sudo make install +```` + #### Mac os build * Having xcode properly installed (clang needed) ````shell -brew install portaudio fdk-aac go +brew install portaudio fdk-aac libsamplerate go ```` ### Docker image @@ -131,8 +146,6 @@ docker run \ Note that mDNS by design will only work with networking mode "host" (recommended for beginners) or (mac/ip)vlan. -#### Acknowledgments - ## Run - goplay2 by default run only on the ipv4 interface (because [this issue](https://github.com/golang/go/issues/31024) on ipv6 parsing) @@ -161,10 +174,16 @@ Ex: It takes around 60ms on my mac to launch the audio stream at the **Anchor Ti `sink` (pulse audio sink name) to replace default sink +`nosync` (No Audio sync) disable experimental audio sync + Example : ```shell -./goplay2 -sink alsa_output -i en0 -n aiwa +./goplay2 -sink alsa_output -i en0 -n aiwa -nosync ``` +#### Misc + +patch used for qemu until it is merged + By [AlbanSeurat](https://github.com/AlbanSeurat) diff --git a/audio/clock.go b/audio/clock.go index a0298d7..9264c46 100644 --- a/audio/clock.go +++ b/audio/clock.go @@ -1,40 +1,57 @@ package audio import ( + "goplay2/codec" "goplay2/ptp" "time" ) type Clock struct { - virtualClock *ptp.VirtualClock - realAnchorTime time.Time - anchorRtpTime int64 - currentRtpTime int64 + networkClock *ptp.VirtualClock + // audio times + realAudioAnchorTime time.Time + audioAnchorTime time.Duration + // network times + realNetworkAnchorTime time.Time + anchorRtpTime int64 + networkAnchorTime int64 } -// AnchorTime setup anchor time using the real clock +func NewClock(networkClock *ptp.VirtualClock) *Clock { + return &Clock{ + networkClock: networkClock, + } +} + +func (c *Clock) virtualToRealTime(elapsed int64) time.Time { + return time.Unix(0, time.Now().UnixNano()+(elapsed-c.networkClock.Now().UnixNano())) +} + +// SetAnchorTime setup anchor time using the real clock // virtualAnchorTime - network Time (using PTP) of the anchorTime // rtpTime Timestamp - monotonic counter of frames -func (c *Clock) AnchorTime(virtualAnchorTime int64, rtpTime int64) { - +func (c *Clock) SetAnchorTime(virtualAnchorTime int64, rtpTime int64) { c.anchorRtpTime = rtpTime - c.currentRtpTime = rtpTime - - realNowTime := time.Now() - virtualNowTime := c.virtualClock.Now() - - c.realAnchorTime = time.Unix(0, realNowTime.UnixNano()+(virtualAnchorTime-virtualNowTime.UnixNano())) + c.networkAnchorTime = virtualAnchorTime + c.realNetworkAnchorTime = c.virtualToRealTime(virtualAnchorTime) } // PacketTime returns the real clock time at which time a RTP packet should be played func (c *Clock) PacketTime(frameRtpTime int64) time.Time { - return time.Unix(0, 0) + elapsedTime := float64(frameRtpTime-c.anchorRtpTime) * 1.0 / float64(codec.SampleRate) * 1e9 // convert to nanoseconds + return c.virtualToRealTime(c.networkAnchorTime + int64(elapsedTime)) +} + +func (c *Clock) AudioTime(anchorAudioTime time.Duration, realAnchorTime time.Time) { + c.audioAnchorTime = anchorAudioTime + c.realAudioAnchorTime = realAnchorTime } -func (c *Clock) CurrentRtpTime() int64 { - return c.currentRtpTime +func (c *Clock) PlayTime(nowAudioTime time.Duration, playbackTime time.Duration) time.Time { + now := time.Now() + return now.Add(playbackTime - nowAudioTime) } -func (c *Clock) IncRtpTime() { - c.currentRtpTime += 1024 +func (c *Clock) AnchorTime() time.Time { + return c.realNetworkAnchorTime } diff --git a/audio/filter.go b/audio/filter.go new file mode 100644 index 0000000..2c26a50 --- /dev/null +++ b/audio/filter.go @@ -0,0 +1,8 @@ +package audio + +import "time" + +type Filter interface { + Reset(*Clock) + Apply(audioStream Stream, samples []int16, playTime time.Time, sequence uint32, startTs uint32) (int, error) +} diff --git a/audio/player.go b/audio/player.go index 09e7557..6c3bd46 100644 --- a/audio/player.go +++ b/audio/player.go @@ -1,11 +1,10 @@ package audio import ( - "bytes" - "encoding/binary" - codec2 "goplay2/codec" + "goplay2/codec" + "goplay2/config" "goplay2/globals" - "goplay2/ptp" + "goplay2/rtp" "time" ) @@ -19,32 +18,53 @@ const ( type Player struct { ControlChannel chan globals.ControlMessage clock *Clock + filter Filter Status PlaybackStatus - stream codec2.Stream - ringBuffer *Ring + stream codec.Stream + ring *Ring + aacDecoder *codec.AacDecoder + untilSeq uint32 } -func NewPlayer(clock *ptp.VirtualClock, ring *Ring) *Player { - - return &Player{ - clock: &Clock{clock, time.Now(), 0, 0}, +func NewPlayer(audioClock *Clock, filter Filter) *Player { + aacDecoder := codec.NewAacDecoder() + asc := []byte{0x12, 0x10} + if err := aacDecoder.InitRaw(asc); err != nil { + globals.ErrLog.Panicf("init decoder failed, err is %s", err) + } + player := &Player{ + clock: audioClock, + filter: filter, ControlChannel: make(chan globals.ControlMessage, 100), - stream: codec2.NewStream(), + aacDecoder: aacDecoder, + stream: codec.NewStream(config.Config.Volume), Status: STOPPED, - ringBuffer: ring, + ring: New(globals.BufferSize / 2048), + untilSeq: 0, } + return player } -func (p *Player) callBack(out []int16, currentTime time.Duration, outputBufferDacTime time.Duration) { - frame, err := p.ringBuffer.TryPop() +func (p *Player) audioSync(reader Stream, samples []int16, nextTime time.Time, sequence uint32, startTs uint32) (int, error) { + if sequence <= p.untilSeq { + return 0, nil + } + if config.Config.DisableAudioSync { + return reader.Read(samples) + } else { + return p.filter.Apply(reader, samples, nextTime, sequence, startTs) + } +} + +func (p *Player) callBack(out []int16, currentTime time.Duration, outputBufferDacTime time.Duration) int { + playTime := p.clock.PlayTime(currentTime, outputBufferDacTime) + size, err := p.ring.TryRead(out, playTime, p.audioSync) if err == ErrIsEmpty { p.fillSilence(out) - } else { - err = binary.Read(bytes.NewReader(frame.(*PCMFrame).pcmData), binary.LittleEndian, out) - if err != nil { - globals.ErrLog.Printf("error reading data : %v\n", err) - } + } else if size < len(out) { + p.fillSilence(out[size:]) } + return p.stream.FilterVolume(out) } func (p *Player) Run() { @@ -53,10 +73,20 @@ func (p *Player) Run() { globals.ErrLog.Fatalln("Audio Stream init error:", err) } defer p.stream.Close() + p.clock.AudioTime(p.stream.AudioTime(), time.Now()) for { select { case msg := <-p.ControlChannel: switch msg.MType { + case globals.STOP: + if p.Status == PLAYING { + if err := p.stream.Stop(); err != nil { + globals.ErrLog.Printf("error pausing audio :%v\n", err) + return + } + } + p.Reset() + p.Status = STOPPED case globals.PAUSE: if p.Status == PLAYING { if err := p.stream.Stop(); err != nil { @@ -74,7 +104,8 @@ func (p *Player) Run() { } } p.Status = PLAYING - p.clock.AnchorTime(msg.Param1, msg.Param2) + p.clock.SetAnchorTime(msg.Param1, msg.Param2) + p.filter.Reset(p.clock) case globals.SKIP: p.skipUntil(msg.Param1, msg.Param2) case globals.VOLUME: @@ -86,23 +117,30 @@ func (p *Player) Run() { } } -func (p *Player) skipUntil(fromSeq int64, UntilSeq int64) { - p.ringBuffer.Flush(func(value interface{}) bool { - frame := value.(*PCMFrame) - return frame.SequenceNumber < uint32(fromSeq) || frame.SequenceNumber > uint32(UntilSeq) +func (p *Player) skipUntil(fromSeq int64, untilSeq int64) { + // TODO : use also timestamp to have better precision + p.ring.Filter(func(sequence uint32, startTs uint32) bool { + return sequence > uint32(fromSeq) && sequence < uint32(untilSeq) }) + // some data are possibly not yet in the buffer - reader should skip them afterwards (during async callback) + p.untilSeq = uint32(untilSeq) } -func (p *Player) Push(frame interface{}) { - p.ringBuffer.Push(frame) +func (p *Player) Push(frame *rtp.Frame) { + var pcmBuffer = make([]int16, 2048) + _, err := frame.PcmData(p.aacDecoder, pcmBuffer) + if err != nil { + globals.ErrLog.Printf("error decoding the packet %v", err) + } + p.ring.Write(pcmBuffer, frame.SequenceNumber, frame.Timestamp) } func (p *Player) Reset() { - p.ringBuffer.Reset() + p.ring.Reset() + p.untilSeq = 0 } func (p *Player) fillSilence(out []int16) { - globals.ErrLog.Printf("warning : filling audio buffer with silence") for i := range out { out[i] = 0 } diff --git a/audio/ring.go b/audio/ring.go index affde48..9bc43e8 100644 --- a/audio/ring.go +++ b/audio/ring.go @@ -1,207 +1,151 @@ package audio import ( + "container/list" "errors" + "goplay2/codec" "sync" + "time" ) var ( - ErrIsFull = errors.New("ring is full") - ErrIsEmpty = errors.New("ring is empty") + ErrIsFull = errors.New("ring is full") + ErrIsEmpty = errors.New("ring is empty") + ErrIsPartial = errors.New("buffer is partial") ) -type Ring struct { - buf []interface{} - size int - r int // next position to read - w int // next position to write - isFull bool - mu sync.Mutex - wcd *sync.Cond - rcd *sync.Cond +type markedBuffer struct { + sequence uint32 + startTs uint32 + buffer []int16 } -// NewRing New returns a new Ring whose buffer has the given size. -func NewRing(size int) *Ring { - rwmu := sync.Mutex{} - return &Ring{ - buf: make([]interface{}, size), - size: size, - wcd: sync.NewCond(&rwmu), - rcd: sync.NewCond(&rwmu), - } +func (b *markedBuffer) len() int { + return len(b.buffer) } -func (r *Ring) TryPop() (b interface{}, err error) { - r.mu.Lock() - defer r.mu.Unlock() - if r.w == r.r && !r.isFull { - return nil, ErrIsEmpty - } - b = r.buf[r.r] - r.r++ - if r.r == r.size { - r.r = 0 - } - r.isFull = false - r.wcd.Signal() - return b, err +func (b *markedBuffer) data() []int16 { + return b.buffer } -func (r *Ring) TryPush(c interface{}) error { - r.mu.Lock() - defer r.mu.Unlock() - if r.w == r.r && r.isFull { - return ErrIsFull - } - r.buf[r.w] = c - r.w++ +func (b *markedBuffer) Peek(samples []int16) (int, error) { + return copy(samples, b.buffer), nil +} - if r.w == r.size { - r.w = 0 - } - if r.w == r.r { - r.isFull = true +func (b *markedBuffer) Seek(size int) (int, error) { + if size < len(b.buffer) { + b.buffer = b.buffer[size:] + b.startTs += uint32(size) + return size, ErrIsPartial } else { - r.rcd.Signal() + return size, nil } - return nil } -func (r *Ring) TryPeek() (b interface{}, err error) { - r.mu.Lock() - defer r.mu.Unlock() - if r.w == r.r && !r.isFull { - return nil, ErrIsEmpty - } - b = r.buf[r.r] - return b, nil +func (b *markedBuffer) Read(samples []int16) (int, error) { + copied, _ := b.Peek(samples) + return b.Seek(copied) } -func (r *Ring) Flush(predicate func(interface{}) bool) int { - r.mu.Lock() - defer r.mu.Unlock() - if r.w == r.r && !r.isFull { - return 0 - } - writerPos := 0 - result := make([]interface{}, len(r.buf)) - pos := r.r + 1 - max := r.w - for pos != max { - v := r.buf[pos] - if v != nil && predicate(v) { - result[writerPos] = v - writerPos++ - } - pos++ - if pos == r.size { - pos = 0 - } - } - r.buf = result - r.r = 0 - r.w = writerPos - return writerPos +type TimingDecision uint8 + +const ( + PLAY TimingDecision = iota + DISCARD // will drop the frame + DELAY // will play silence +) + +type Stream interface { + Peek(p []int16) (n int, err error) + Seek(size int) (n int, err error) + Read(p []int16) (n int, err error) } -func (r *Ring) Push(c interface{}) { - err := r.TryPush(c) - r.wcd.L.Lock() - for err == ErrIsFull { - r.wcd.Wait() - err = r.TryPush(c) - } - r.wcd.L.Unlock() +type FilterFunction func(audioStream Stream, samples []int16, playTime time.Time, sequence uint32, startTs uint32) (int, error) + +// Ring is a circular buffer that implement io.ReaderWriter interface. +type Ring struct { + buffers *list.List + size int + mu sync.Mutex + wcd *sync.Cond + rcd *sync.Cond } -func (r *Ring) Pop() interface{} { - value, err := r.TryPop() - r.rcd.L.Lock() - for err == ErrIsEmpty { - r.rcd.Wait() - value, err = r.TryPop() +// New returns a new Ring whose buffer has the given size. +func New(size int) *Ring { + rwmu := sync.Mutex{} + return &Ring{ + buffers: list.New(), + size: size, + wcd: sync.NewCond(&rwmu), } - r.rcd.L.Unlock() - return value } -func (r *Ring) Peek() interface{} { - value, err := r.TryPeek() - r.rcd.L.Lock() - for err == ErrIsEmpty { - r.rcd.Wait() - value, err = r.TryPeek() +func (r *Ring) Write(samples []int16, sequence uint32, ts uint32) { + err := r.TryWrite(samples, sequence, ts) + r.wcd.L.Lock() + for err == ErrIsFull { + r.wcd.Wait() + err = r.TryWrite(samples, sequence, ts) } - r.rcd.L.Unlock() - return value + r.wcd.L.Unlock() } -// Length return the length of available read bytes. -func (r *Ring) Length() int { +func (r *Ring) TryWrite(samples []int16, sequence uint32, ts uint32) error { r.mu.Lock() defer r.mu.Unlock() - - if r.w == r.r { - if r.isFull { - return r.size - } - return 0 - } - - if r.w > r.r { - return r.w - r.r + if r.buffers.Len() == r.size { + return ErrIsFull } - - return r.size - r.r + r.w -} - -// Capacity returns the size of the underlying buffer. -func (r *Ring) Capacity() int { - return r.size + r.buffers.PushFront(&markedBuffer{sequence: sequence, startTs: ts, buffer: samples}) + return nil } -// Free returns the length of available bytes to write. -func (r *Ring) Free() int { +func (r *Ring) TryRead(samples []int16, playTime time.Time, filter FilterFunction) (int, error) { r.mu.Lock() defer r.mu.Unlock() - - if r.w == r.r { - if r.isFull { - return 0 + if r.buffers.Len() == 0 { + return 0, ErrIsEmpty + } + n := 0 + var err error = nil + var size int + for r.buffers.Len() > 0 && n < len(samples) { + back := r.buffers.Back() + elem := back.Value.(*markedBuffer) + size, err = filter(elem, samples[n:], playTime, elem.sequence, elem.startTs) + playTime.Add(time.Duration(size*1e9/codec.SampleRate) * time.Nanosecond) + n += size + if err == nil { + r.buffers.Remove(back) + } else if err == ErrIsEmpty { + return 0, err } - return r.size - } - - if r.w < r.r { - return r.r - r.w } - - return r.size - r.w + r.r -} - -// IsFull returns this Ring is full. -func (r *Ring) IsFull() bool { - r.mu.Lock() - defer r.mu.Unlock() - - return r.isFull + r.wcd.Signal() + return n, nil } -// IsEmpty returns this Ring is empty. -func (r *Ring) IsEmpty() bool { +// Reset the read pointer and writer pointer to zero. +func (r *Ring) Reset() { r.mu.Lock() defer r.mu.Unlock() - - return !r.isFull && r.w == r.r + r.buffers.Init() + r.wcd.Signal() } -// Reset the read pointer and writer pointer to zero. -func (r *Ring) Reset() { +func (r *Ring) Filter(predicate func(sequence uint32, startTs uint32) bool) { r.mu.Lock() defer r.mu.Unlock() - - r.r = 0 - r.w = 0 - r.isFull = false + for e := r.buffers.Front(); e != nil; e = e.Next() { + elem := e.Value.(*markedBuffer) + if predicate(elem.sequence, elem.startTs) { + prev := e.Prev() + r.buffers.Remove(e) + if prev != nil { + e = prev + } + } + } } diff --git a/audio/ring_test.go b/audio/ring_test.go deleted file mode 100644 index bd302cc..0000000 --- a/audio/ring_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package audio - -import ( - "fmt" - "testing" - "time" -) - -func TestRing_Pop(t *testing.T) { - - buffer := NewRing(2) - - go func() { - time.Sleep(1 * time.Second) - buffer.Push(0) - }() - _, err := buffer.TryPop() - value := buffer.Pop() - - if value != 0 || err != ErrIsEmpty { - t.Fail() - } -} - -func TestRing_Push(t *testing.T) { - - buffer := NewRing(2) - - buffer.Push(0) - buffer.Push(1) - go func() { - time.Sleep(1 * time.Second) - buffer.Pop() - }() - err := buffer.TryPush(2) - buffer.Push(2) - - if err != ErrIsFull { - t.Fail() - } - -} - -func TestRing_Flush(t *testing.T) { - - buffer := NewRing(10) - for i := 0; i < 8; i++ { - buffer.Push(i) - } - buffer.Flush(func(value interface{}) bool { - return value.(int) > 0 && value.(int) < 3 - }) - fmt.Printf("buffer : %v\n", buffer) - if buffer.Length() != 2 { - t.Fail() - } - -} diff --git a/audio/server.go b/audio/server.go index ff10994..5668646 100644 --- a/audio/server.go +++ b/audio/server.go @@ -3,31 +3,23 @@ package audio import ( "encoding/binary" "errors" - "goplay2/codec" "goplay2/globals" + "goplay2/rtp" "io" "net" "time" ) type Server struct { - aacDecoder *codec.AacDecoder sharedKey []byte player *Player controlChannel chan interface{} } func NewServer(player *Player) *Server { - aacDecoder := codec.NewAacDecoder() - - asc := []byte{0x12, 0x10} - if err := aacDecoder.InitRaw(asc); err != nil { - globals.ErrLog.Panicf("init decoder failed, err is %s", err) - } return &Server{ - aacDecoder: aacDecoder, - player: player, + player: player, } } @@ -57,16 +49,16 @@ func (s *Server) control(l net.Listener) { } defer conn.Close() for { - frame, err := s.decodeToPcm(conn) + frame, err := s.decodeToFrame(conn) if err != nil { - globals.ErrLog.Printf("error decoding to pcm %v", err) + globals.ErrLog.Printf("error parsing the packet %v", err) return } s.player.Push(frame) } } -func (s *Server) decodeToPcm(reader io.Reader) (*PCMFrame, error) { +func (s *Server) decodeToFrame(reader io.Reader) (*rtp.Frame, error) { var packetSize uint16 err := binary.Read(reader, binary.BigEndian, &packetSize) if err != nil { @@ -76,7 +68,7 @@ func (s *Server) decodeToPcm(reader io.Reader) (*PCMFrame, error) { if _, err := io.ReadFull(reader, buffer); err != nil { return nil, err } - return NewFrame(s.aacDecoder, s.sharedKey, buffer) + return rtp.NewFrame(buffer, s.sharedKey) } func (s *Server) SetRateAnchorTime(rtpTime uint32, networkTime time.Time) { @@ -84,7 +76,7 @@ func (s *Server) SetRateAnchorTime(rtpTime uint32, networkTime time.Time) { } func (s *Server) Teardown() { - s.player.ControlChannel <- globals.ControlMessage{MType: globals.PAUSE} + s.player.ControlChannel <- globals.ControlMessage{MType: globals.STOP} } func (s *Server) SetRate0() { diff --git a/codec/aac.go b/codec/aac.go index 3be1bb5..0534ae0 100755 --- a/codec/aac.go +++ b/codec/aac.go @@ -25,12 +25,7 @@ package codec /* #cgo pkg-config: fdk-aac - -#ifdef __linux__ #include "fdk-aac/aacdecoder_lib.h" -#else -#include "aacdecoder_lib.h" -#endif typedef struct { HANDLE_AACDECODER dec; @@ -111,10 +106,6 @@ static int aacdec_fill(aacdec_t* h, char* data, int nb_data, int* pnb_left) { return 0; } -static int aacdec_sample_bits(aacdec_t* h) { - return h->sample_bits; -} - static int aacdec_pcm_size(aacdec_t* h) { if (!h->info) { return 0; @@ -154,124 +145,6 @@ static int aacdec_decode_frame(aacdec_t* h, char* pcm, int nb_pcm, int* pnb_vali return 0; } -static int aacdec_sample_rate(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->sampleRate; -} - -static int aacdec_frame_size(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->frameSize; -} - -static int aacdec_num_channels(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numChannels; -} - -static int aacdec_aac_sample_rate(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->aacSampleRate; -} - -static int aacdec_profile(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->profile; -} - -static int aacdec_audio_object_type(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->aot; -} - -static int aacdec_channel_config(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->channelConfig; -} - -static int aacdec_bitrate(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->bitRate; -} - -static int aacdec_aac_samples_per_frame(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->aacSamplesPerFrame; -} - -static int aacdec_aac_num_channels(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->aacNumChannels; -} - -static int aacdec_extension_audio_object_type(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->extAot; -} - -static int aacdec_extension_sampling_rate(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->extSamplingRate; -} - -static int aacdec_num_lost_access_units(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numLostAccessUnits; -} - -static int aacdec_num_total_bytes(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numTotalBytes; -} - -static int aacdec_num_bad_bytes(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numBadBytes; -} - -static int aacdec_num_total_access_units(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numTotalAccessUnits; -} - -static int aacdec_num_bad_access_units(aacdec_t* h) { - if (!h->info) { - return 0; - } - return h->info->numBadAccessUnits; -} */ import "C" @@ -292,7 +165,7 @@ func NewAacDecoder() *AacDecoder { return &AacDecoder{} } -// Open the decoder in RAW mode with ASC. +// InitRaw Open the decoder in RAW mode with ASC. // For example, the FLV audio payload is a SequenceHeader(ASC) or RAW AAC data, // user can init the decoder with ASC and decode the raw data. // @remark user should never get the info util decode one frame. @@ -309,189 +182,32 @@ func (v *AacDecoder) InitRaw(asc []byte) (err error) { return nil } -// Open the decoder in ADTS mode without ASC, -// we never know the stream info util got the first frame, -// because the codec info is insert at begin of each frame. -// @remark The frame to Decode() is muxed in ADTS format. -func (v *AacDecoder) InitAdts() (err error) { - r := C.aacdec_init_adts(&v.m) - - if int(r) != 0 { - return fmt.Errorf("init ADTS decoder failed, code is %d", int(r)) - } - - return nil -} - // De-allocate all resources of an AAC decoder instance. func (v *AacDecoder) Close() error { C.aacdec_close(&v.m) return nil } -// Fill the buffer of decoder then decode. -// @remark we always expect all input are consumed by decoder. -func (v *AacDecoder) fill(input []byte) (err error) { +func (v *AacDecoder) DecodeTo(input []byte, pcm []int16) (size int, err error) { p := (*C.char)(unsafe.Pointer(&input[0])) pSize := C.int(len(input)) leftSize := C.int(0) - r := C.aacdec_fill(&v.m, p, pSize, &leftSize) - - if int(r) != 0 { - return fmt.Errorf("fill aac decoder failed, code is %d", int(r)) + if int(r) != 0 || int(leftSize) > 0 { + return -1, fmt.Errorf("fill aac decoder failed, code is %d", int(r)) } - if int(leftSize) > 0 { - return fmt.Errorf("decoder left %v bytes", int(leftSize)) - } - - return -} - -// Decode one audio frame. -// @param the frame contains encoded aac frame, optional can be nil. -// @eturn when pcm is nil, should fill more bytes and decode again. -func (v *AacDecoder) Decode(frame []byte) (pcm []byte, err error) { - if len(frame) > 0 { - if err = v.fill(frame); err != nil { - return - } + return -1, fmt.Errorf("decoder left %v bytes", int(leftSize)) } - - nbPcm := int(C.aacdec_pcm_size(&v.m)) - if nbPcm == 0 { - nbPcm = 50 * 1024 - } - pcm = make([]byte, nbPcm) - - p := (*C.char)(unsafe.Pointer(&pcm[0])) - pSize := C.int(nbPcm) + p = (*C.char)(unsafe.Pointer(&pcm[0])) + pSize = C.int(len(pcm) * 2) validSize := C.int(0) - - r := C.aacdec_decode_frame(&v.m, p, pSize, &validSize) - + r = C.aacdec_decode_frame(&v.m, p, pSize, &validSize) if int(r) == aacDecNotEnoughBits { - return nil, nil + return -1, nil } - if int(r) != 0 { - return nil, fmt.Errorf("decode frame failed, code is %d", int(r)) + return -1, fmt.Errorf("decode frame failed, code is %d", int(r)) } - - return pcm[0:int(validSize)], nil -} - -// The bits of a sample, the fdk aac always use 16bits sample. -func (v *AacDecoder) SampleBits() int { - return int(C.aacdec_sample_bits(&v.m)) -} - -// The samplerate in Hz of the fully decoded PCM audio signal (after SBR processing). -// @remark The only really relevant ones for the user. -func (v *AacDecoder) SampleRate() int { - return int(C.aacdec_sample_rate(&v.m)) -} - -// The frame size of the decoded PCM audio signal. -// 1024 or 960 for AAC-LC -// 2048 or 1920 for HE-AAC (v2) -// 512 or 480 for AAC-LD and AAC-ELD -// @remark The only really relevant ones for the user. -func (v *AacDecoder) FrameSize() int { - return int(C.aacdec_frame_size(&v.m)) -} - -// The number of output audio channels in the decoded and interleaved PCM audio signal. -// @remark The only really relevant ones for the user. -func (v *AacDecoder) NumChannels() int { - return int(C.aacdec_num_channels(&v.m)) -} - -// sampling rate in Hz without SBR (from configuration info). -// @remark Decoder internal members. -func (v *AacDecoder) AacSampleRate() int { - return int(C.aacdec_aac_sample_rate(&v.m)) -} - -// MPEG-2 profile (from file header) (-1: not applicable (e. g. MPEG-4)). -// @remark Decoder internal members. -func (v *AacDecoder) Profile() int { - return int(C.aacdec_profile(&v.m)) -} - -// Audio Object Type (from ASC): is set to the appropriate value for MPEG-2 bitstreams (e. g. 2 for AAC-LC). -// @remark Decoder internal members. -func (v *AacDecoder) AudioObjectType() int { - return int(C.aacdec_audio_object_type(&v.m)) -} - -// Channel configuration (0: PCE defined, 1: mono, 2: stereo, ... -// @remark Decoder internal members. -func (v *AacDecoder) ChannelConfig() int { - return int(C.aacdec_channel_config(&v.m)) -} - -// Instantaneous bit rate. -// @remark Decoder internal members. -func (v *AacDecoder) Bitrate() int { - return int(C.aacdec_bitrate(&v.m)) -} - -// Samples per frame for the AAC core (from ASC). -// 1024 or 960 for AAC-LC -// 512 or 480 for AAC-LD and AAC-ELD -// @remark Decoder internal members. -func (v *AacDecoder) AacSamplesPerFrame() int { - return int(C.aacdec_aac_samples_per_frame(&v.m)) -} - -// The number of audio channels after AAC core processing (before PS or MPS processing). -// CAUTION: This are not the final number of output channels! -// @remark Decoder internal members. -func (v *AacDecoder) AacNumChannels() int { - return int(C.aacdec_aac_num_channels(&v.m)) -} - -// Extension Audio Object Type (from ASC) -// @remark Decoder internal members. -func (v *AacDecoder) ExtensionAudioObjectType() int { - return int(C.aacdec_extension_audio_object_type(&v.m)) -} - -// Extension sampling rate in Hz (from ASC) -// @remark Decoder internal members. -func (v *AacDecoder) ExtensionSamplingRate() int { - return int(C.aacdec_extension_sampling_rate(&v.m)) -} - -// This integer will reflect the estimated amount of lost access units in case aacDecoder_DecodeFrame() -// returns AAC_DEC_TRANSPORT_SYNC_ERROR. It will be < 0 if the estimation failed. -// @remark Statistics. -func (v *AacDecoder) NumLostAccessUnits() int { - return int(C.aacdec_num_lost_access_units(&v.m)) -} - -// This is the number of total bytes that have passed through the decoder. -// @remark Statistics. -func (v *AacDecoder) NumTotalBytes() int { - return int(C.aacdec_num_total_bytes(&v.m)) -} - -// This is the number of total bytes that were considered with errors from numTotalBytes. -// @remark Statistics. -func (v *AacDecoder) NumBadBytes() int { - return int(C.aacdec_num_bad_bytes(&v.m)) -} - -// This is the number of total access units that have passed through the decoder. -// @remark Statistics. -func (v *AacDecoder) NumTotalAccessUnits() int { - return int(C.aacdec_num_total_access_units(&v.m)) -} - -// This is the number of total access units that were considered with errors from numTotalBytes. -// @remark Statistics. -func (v *AacDecoder) NumBadAccessUnits() int { - return int(C.aacdec_num_bad_access_units(&v.m)) + return int(validSize), nil } diff --git a/codec/portaudio.go b/codec/portaudio.go index 78523a5..7a6ceb2 100644 --- a/codec/portaudio.go +++ b/codec/portaudio.go @@ -1,18 +1,24 @@ -//+build !linux +//go:build !linux +// +build !linux package codec import ( "github.com/gordonklaus/portaudio" + "math" + "time" ) type PortAudioStream struct { - out []int16 - stream *portaudio.Stream + out []int16 + stream *portaudio.Stream + volumeMultiplier float64 } -func NewStream() Stream { - return &PortAudioStream{} +func NewStream(volumeDb float64) Stream { + stream := &PortAudioStream{} + stream.SetVolume(volumeDb) + return stream } func (s *PortAudioStream) Init(callBack StreamCallback) error { @@ -23,8 +29,7 @@ func (s *PortAudioStream) Init(callBack StreamCallback) error { portAudioCallback := func(out []int16, info portaudio.StreamCallbackTimeInfo) { callBack(out, info.CurrentTime, info.OutputBufferDacTime) } - //TODO : get the framePerBuffer from setup - s.stream, err = portaudio.OpenDefaultStream(0, 2, 44100, 1024, portAudioCallback) + s.stream, err = portaudio.OpenDefaultStream(0, OutputChannel, SampleRate, 1024, portAudioCallback) if err != nil { return err } @@ -47,7 +52,18 @@ func (s *PortAudioStream) Stop() error { return s.stream.Stop() } -func (s *PortAudioStream) SetVolume(volume float64) error { +func (s *PortAudioStream) AudioTime() time.Duration { + return s.stream.Time() +} + +func (s *PortAudioStream) SetVolume(volumeLevelDb float64) error { + s.volumeMultiplier = 1.0 * math.Pow(10, volumeLevelDb/20.0) return nil - // to nothing on mac +} + +func (s *PortAudioStream) FilterVolume(out []int16) int { + for index, sample := range out { + out[index] = int16(float64(sample) * s.volumeMultiplier) + } + return len(out) } diff --git a/codec/pulseaudio.go b/codec/pulseaudio.go index 896e2ab..076e2f1 100644 --- a/codec/pulseaudio.go +++ b/codec/pulseaudio.go @@ -21,13 +21,21 @@ const ( ) type PaStream struct { + creation time.Time client *pulse.Client stream *pulse.PlaybackStream sink *pulse.Sink - buffer []int16 index int } +func (s *PaStream) AudioTime() time.Duration { + return time.Now().Sub(s.creation) +} + +func (s *PaStream) FilterVolume(out []int16) int { + return len(out) +} + func dbToLinearVolume(volume float64) uint32 { if math.IsInf(volume, -1) || volume <= math.Inf(-1) { return paVolumeMuted @@ -46,27 +54,12 @@ func NewStream() Stream { } return &PaStream{ client: client, + creation: time.Now(), } } func (s *PaStream) Init(callBack StreamCallback) error { var err error - streamCallback := func(out []int16) (int, error) { - var copied = 0 - if s.index+rtpPacketSize < audioBufferSize { - callBack(s.buffer[s.index:s.index+rtpPacketSize], 0*time.Second, 0*time.Second) - copied = copy(out, s.buffer[:s.index+rtpPacketSize]) - s.index += rtpPacketSize - copied - } else { - copied = copy(out, s.buffer[:s.index]) - s.index -= copied - } - copy(s.buffer, s.buffer[copied:]) - if s.index < 0 { - s.index = 0 - } - return copied, nil - } s.sink, err = s.client.SinkByID(config.Config.PulseSink) if err != nil { s.sink, err = s.client.DefaultSink() @@ -74,7 +67,12 @@ func (s *PaStream) Init(callBack StreamCallback) error { if err != nil { return err } - s.stream, err = s.client.NewPlayback(pulse.Int16Reader(streamCallback), + pulseAudioCallBack := func(out []int16) (int, error){ + audioTime := s.AudioTime() + callBack(out, audioTime, audioTime) + return len(out), nil + } + s.stream, err = s.client.NewPlayback(pulse.Int16Reader(pulseAudioCallBack), pulse.PlaybackStereo, pulse.PlaybackBufferSize(1024), pulse.PlaybackSink(s.sink), @@ -82,9 +80,6 @@ func (s *PaStream) Init(callBack StreamCallback) error { if err != nil { return err } - - s.buffer = make([]int16, audioBufferSize) - return nil } @@ -114,7 +109,6 @@ func (*SetSinkVolume) command() uint32 { } func (s *PaStream) SetVolume(volume float64) error { - linearVolume := dbToLinearVolume(volume) vols := make(proto.ChannelVolumes, 2) diff --git a/codec/stream.go b/codec/stream.go index e71c783..1638bf4 100644 --- a/codec/stream.go +++ b/codec/stream.go @@ -5,7 +5,12 @@ import ( "time" ) -type StreamCallback func(out []int16, currentTime time.Duration, outputBufferDacTime time.Duration) +const ( + OutputChannel = 2 + SampleRate = 44100 +) + +type StreamCallback func(out []int16, currentTime time.Duration, outputBufferDacTime time.Duration) int type Stream interface { io.Closer @@ -13,4 +18,6 @@ type Stream interface { Start() error Stop() error SetVolume(volume float64) error + AudioTime() time.Duration + FilterVolume(out []int16) int } diff --git a/config/config.go b/config/config.go index ef9edff..a8f4e0c 100644 --- a/config/config.go +++ b/config/config.go @@ -5,23 +5,28 @@ import ( "github.com/google/uuid" "io/ioutil" "log" + "net" "os" "os/signal" + "strings" "syscall" ) type Configuration struct { - Volume float64 `json:"sound-volume"` - DeviceUUID string `json:"device-uuid"` - PulseSink string `json:"-"` - DeviceName string `json:"-"` - exitsSignals chan os.Signal + Volume float64 `json:"sound-volume"` + DeviceUUID string `json:"device-uuid"` + PulseSink string `json:"-"` + DeviceName string `json:"-"` + DisableAudioSync bool `json:"-"` + AudioMetrics Metrics `json:"-"` + exitsSignals chan os.Signal } var Config = &Configuration{ - PulseSink: "", - Volume: -999, - DeviceUUID: uuid.NewString(), + PulseSink: "", + Volume: -999, + DeviceUUID: uuid.NewString(), + DisableAudioSync: false, } func (c *Configuration) Load() { @@ -35,6 +40,7 @@ func (c *Configuration) Load() { go func() { <-c.exitsSignals c.Store() + c.AudioMetrics.Store(c.DeviceName) os.Exit(0) }() } @@ -49,3 +55,26 @@ func (c *Configuration) Store() { log.Printf("Warning : impossible to store config file %s \n", c.DeviceName+"/config.json") } } + +func NetworkInfo(ifName string) (*net.Interface, string, []string) { + iFace, err := net.InterfaceByName(ifName) + if err != nil { + panic(err) + } + macAddress := strings.ToUpper(iFace.HardwareAddr.String()) + ipAddresses, err := iFace.Addrs() + if err != nil { + panic(err) + } + var ipStringAddr []string + for _, addr := range ipAddresses { + switch v := addr.(type) { + case *net.IPNet: + ipStringAddr = append(ipStringAddr, v.IP.String()) + case *net.IPAddr: + ipStringAddr = append(ipStringAddr, v.IP.String()) + } + } + + return iFace, macAddress, ipStringAddr +} diff --git a/config/metrics.go b/config/metrics.go new file mode 100644 index 0000000..270ee65 --- /dev/null +++ b/config/metrics.go @@ -0,0 +1,46 @@ +package config + +import ( + "encoding/json" + "goplay2/globals" + "io/ioutil" + "log" + "time" +) + +type Metrics struct { + CountDrop uint32 `json:"count-drop"` + CountSilence uint32 `json:"count-silence"` + DriftAverage int64 `json:"drift-average"` + PacketReceived int64 `json:"packet-received"` +} + +func (m Metrics) Store(deviceName string) { + data, err := json.Marshal(&m) + if err != nil { + log.Printf("Warning: impossible to marshal configuration in json") + } + err = ioutil.WriteFile(deviceName+"/metrics.json", data, 0660) + if err != nil { + log.Printf("Warning : impossible to store config file %s \n", deviceName+"/config.json") + } +} + +func (m *Metrics) Drop() { + m.CountDrop += 1 + globals.MetricLog.Printf("Drop sequence because of drift") +} + +func (m *Metrics) Silence() { + globals.MetricLog.Printf("filling audio buffer with silence") + m.CountSilence += 1 +} + +func (m *Metrics) Drift(drift time.Duration) { + globals.MetricLog.Printf("drift : %v\n", drift) + if drift < 0 { + drift = -drift + } + m.PacketReceived += 1 // 1-based count ( to prevent divide by zero ) + m.DriftAverage = m.DriftAverage + (drift.Milliseconds()-m.DriftAverage)/m.PacketReceived +} diff --git a/filters/resampling.go b/filters/resampling.go new file mode 100644 index 0000000..dc40283 --- /dev/null +++ b/filters/resampling.go @@ -0,0 +1,58 @@ +package filters + +import "C" +import ( + "goplay2/audio" + "goplay2/codec" + "goplay2/config" + "time" +) + +type ResamplingFilter struct { + clock *audio.Clock + metrics *config.Metrics + srcState *SrcState + skewInfo skewData +} + +func NewResamplingFilter(clock *audio.Clock, metrics *config.Metrics) (*ResamplingFilter, error) { + srcState, err := NewSrcState(codec.OutputChannel) + + if err != nil { + return nil, err + } + return &ResamplingFilter{ + clock: clock, + metrics: metrics, + srcState: srcState, + skewInfo: skewData{ + skewAverage: 0, + count: 1, + }, + }, nil +} + +func (p *ResamplingFilter) Reset(clock *audio.Clock) { + p.skewInfo.reset(clock.AnchorTime()) +} + +func (p *ResamplingFilter) Apply(audioStream audio.Stream, samples []int16, nextTime time.Time, _ uint32, startTs uint32) (int, error) { + driftTime := p.clock.PacketTime(int64(startTs)).Sub(nextTime) + p.metrics.Drift(driftTime) + if driftTime < -150*time.Millisecond { + // drop packet if too old + return 0, nil + } else if driftTime > 150*time.Millisecond { + // add silence if really too young + return 0, audio.ErrIsEmpty + } + skew := p.skewInfo.calculate(p.clock, nextTime, startTs) + input := make([]int16, len(samples)) + peeked, _ := audioStream.Peek(input) + inputFramesUsed, outputFramesGen, err := p.srcState.Process(skew, input[:peeked], samples) + if err != nil { + panic(err) + } + _, err = audioStream.Seek(inputFramesUsed) + return outputFramesGen, err +} diff --git a/filters/samplingrate.go b/filters/samplingrate.go new file mode 100644 index 0000000..d28feb8 --- /dev/null +++ b/filters/samplingrate.go @@ -0,0 +1,89 @@ +package filters + +/* +#cgo pkg-config: samplerate +#include +#include +#include + +static int resample(SRC_STATE* state, int outputSample, short * input, unsigned long in_len, short * output, unsigned long out_len, + double ratio, unsigned long * input_frames_used, unsigned long * output_frames_gen) { + + float * inputf = malloc(in_len * sizeof(float)); + src_short_to_float_array(input, inputf, in_len); + float * outputf = malloc(out_len * sizeof(float)); + memset(outputf, 0, out_len * sizeof(float)); + + SRC_DATA srcdata; + srcdata.src_ratio = ratio; + srcdata.data_in = inputf; + srcdata.input_frames = in_len / outputSample; + srcdata.data_out = outputf; + srcdata.output_frames = out_len / outputSample; + srcdata.end_of_input = 0; + int result = src_process(state, &srcdata); + src_float_to_short_array(outputf, output, out_len); + + free(inputf); + free(outputf); + + if ( input_frames_used != NULL ) { + *input_frames_used = srcdata.input_frames_used * outputSample; + } + if ( output_frames_gen != NULL ) { + *output_frames_gen = srcdata.output_frames_gen * outputSample; + } + return result; +} + + +*/ +import "C" +import ( + "errors" + "unsafe" +) + +type SrcState struct { + inner *C.SRC_STATE + outputChannels int +} + +func formatErr(err C.int) error { + if err == 0 { + return nil + } + return errors.New(C.GoString(C.src_strerror(err))) +} + +func NewSrcState(channels int) (*SrcState, error) { + var err C.int + state := C.src_new(C.SRC_SINC_BEST_QUALITY, C.int(channels), &err) + if err != 0 { + return nil, formatErr(err) + } + return &SrcState{ + inner: state, + outputChannels: channels, + }, nil +} + +func (s *SrcState) Close() error { + s.inner = C.src_delete(s.inner) + return nil +} + +func (s *SrcState) Process(ratio float64, input []int16, output []int16) (inputFramesUsed int, outputFramesGen int, err error) { + var inUsed C.ulong + var outGen C.ulong + + in := (*C.short)(unsafe.Pointer(&input[0])) + out := (*C.short)(unsafe.Pointer(&output[0])) + + err = formatErr(C.resample(s.inner, C.int(s.outputChannels), in, C.ulong(len(input)), out, + C.ulong(len(output)), C.double(ratio), &inUsed, &outGen)) + + inputFramesUsed = int(inUsed) + outputFramesGen = int(outGen) + return +} diff --git a/filters/skew.go b/filters/skew.go new file mode 100644 index 0000000..f69b369 --- /dev/null +++ b/filters/skew.go @@ -0,0 +1,31 @@ +package filters + +import ( + "goplay2/audio" + "math" + "time" +) + +type skewData struct { + lastSampling time.Time + count float64 + skewAverage float64 +} + +func (s *skewData) calculate(audioClock *audio.Clock, playTime time.Time, timestamp uint32) float64 { + e0 := float64(audioClock.PacketTime(int64(timestamp)).Sub(s.lastSampling)) / float64(playTime.Sub(s.lastSampling)) + s.skewAverage += (e0 - s.skewAverage) / s.count + if s.count < 16 { + s.count += 1 + } + if time.Now().Sub(s.lastSampling) > 200*time.Millisecond { + s.lastSampling = time.Now() + } + return math.Max(0.99, math.Min(1.01, s.skewAverage)) +} + +func (s *skewData) reset(anchorTime time.Time) { + s.lastSampling = anchorTime + s.count = 1 + s.skewAverage = 0 +} diff --git a/globals/control.go b/globals/control.go index 1a140a6..62cf8a0 100644 --- a/globals/control.go +++ b/globals/control.go @@ -11,6 +11,7 @@ type ControlMessage struct { const ( PAUSE ControlMessageType = iota + STOP START SKIP VOLUME diff --git a/globals/globals.go b/globals/globals.go index 38aab53..4e4f14f 100644 --- a/globals/globals.go +++ b/globals/globals.go @@ -5,3 +5,5 @@ import "log" const BufferSize = 8 * 1024 * 1024 // default itunes buffer size var ErrLog *log.Logger + +var MetricLog *log.Logger diff --git a/go.sum b/go.sum index 2e06d0f..85d3ba9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/albanseurat/go-fdkaac v1.0.5 h1:H19q8Cnb+5ypPuiNMTs7xPsiv5vv8i0N2wJldSKiolw= -github.com/albanseurat/go-fdkaac v1.0.5/go.mod h1:pTwqib2y8Q6Ds5bFvEdJ48pZyrS/sDqLWVQy53f8cgc= github.com/albanseurat/go-ptp v0.0.0-20210621114405-a6d76e8bd928 h1:J29RmFQeULXbQGQuOaOIflJUArCsa7zUkeVgby5bDuM= github.com/albanseurat/go-ptp v0.0.0-20210621114405-a6d76e8bd928/go.mod h1:QOSbNB21golIacoQMnMeYwwfLxU1eTKCsvbfiSBsahM= github.com/brutella/dnssd v1.2.0/go.mod h1:FpJqlQ8+XU6w1vbnG1zJiQPTRE5fvQIRdrcBojMVuuQ= @@ -16,8 +14,6 @@ github.com/gordonklaus/portaudio v0.0.0-20200911161147-bb74aa485641/go.mod h1:Hf github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE= github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jfreymuth/pulse v0.1.0 h1:KN38/9hoF9PJvP5DpEVhMRKNuwnJUonc8c9ARorRXUA= -github.com/jfreymuth/pulse v0.1.0/go.mod h1:cpYspI6YljhkUf1WLXLLDmeaaPFc3CnGLjDZf9dZ4no= github.com/jfreymuth/pulse v0.1.1-0.20210727160034-392febcff724 h1:BtFEHYuLN5tmBiHSuvlAwKYV1kWoTN6o/POFi1sbzBs= github.com/jfreymuth/pulse v0.1.1-0.20210727160034-392febcff724/go.mod h1:cpYspI6YljhkUf1WLXLLDmeaaPFc3CnGLjDZf9dZ4no= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -35,6 +31,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/smallnest/ringbuffer v0.0.0-20210227121335-0a58434b36f2 h1:co1YnJJ6rDvcodJzcXObchJMfHclIROMulsWObuNfTY= +github.com/smallnest/ringbuffer v0.0.0-20210227121335-0a58434b36f2/go.mod h1:mXcZNMJHswhQDDJZIjdtJoG97JIwIa/HdcHNM3w15T0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= diff --git a/handlers/parameter.go b/handlers/parameter.go index 0618bdb..fce5d40 100644 --- a/handlers/parameter.go +++ b/handlers/parameter.go @@ -7,6 +7,7 @@ import ( "goplay2/config" "goplay2/globals" "goplay2/rtsp" + "log" "strings" ) @@ -39,7 +40,7 @@ func (r *Rstp) OnSetParameterWeb(req *rtsp.Request) (*rtsp.Response, error) { line := scanner.Text() if strings.HasPrefix(line, "volume") { if c, err := fmt.Sscanf(line, "volume: %f", &vol); c != 1 || err != nil { - fmt.Printf("erreur parsing volume parameters : %s\n", line) + log.Printf("erreur parsing volume parameters : %s\n", line) } else { config.Config.Volume = vol r.player.ControlChannel <- globals.ControlMessage{MType: globals.VOLUME, Paramf: config.Config.Volume} diff --git a/main.go b/main.go index 9a72284..31d7e93 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "goplay2/audio" "goplay2/config" "goplay2/event" + "goplay2/filters" "goplay2/globals" "goplay2/handlers" "goplay2/homekit" @@ -14,43 +15,28 @@ import ( "log" "net" "os" - "strings" "sync" ) func main() { var ifName string var delay int64 + var metricsFileName string + var err error flag.StringVar(&config.Config.DeviceName, "n", "goplay", "Specify device name") flag.StringVar(&ifName, "i", "eth0", "Specify interface") flag.Int64Var(&delay, "delay", 0, "Specify hardware delay in ms") flag.StringVar(&config.Config.PulseSink, "sink", config.Config.PulseSink, "Specify Pulse Audio Sink - Linux only") + flag.StringVar(&metricsFileName, "metrics", "/dev/null", "File name to logs audio sync - temp param") + flag.BoolVar(&config.Config.DisableAudioSync, "nosync", config.Config.DisableAudioSync, "Disable multi-room/audio-sync. On slow CPU multi-room can cause audio jitter") flag.Parse() // after declaring flags we need to call it config.Config.Load() - defer config.Config.Store() globals.ErrLog = log.New(os.Stderr, "Error:", log.LstdFlags|log.Lshortfile|log.Lmsgprefix) - iFace, err := net.InterfaceByName(ifName) - if err != nil { - panic(err) - } - macAddress := strings.ToUpper(iFace.HardwareAddr.String()) - ipAddresses, err := iFace.Addrs() - if err != nil { - panic(err) - } - var ipStringAddr []string - for _, addr := range ipAddresses { - switch v := addr.(type) { - case *net.IPNet: - ipStringAddr = append(ipStringAddr, v.IP.String()) - case *net.IPAddr: - ipStringAddr = append(ipStringAddr, v.IP.String()) - } - } + iFace, macAddress, ipStringAddr := config.NetworkInfo(ifName) homekit.Device = homekit.NewAccessory(macAddress, config.Config.DeviceUUID, airplayDevice()) log.Printf("Starting goplay for device %v", homekit.Device) homekit.Server, err = homekit.NewServer(macAddress, config.Config.DeviceName, ipStringAddr) @@ -62,13 +48,21 @@ func main() { } defer server.Shutdown() + if metricFile, err := os.Create(metricsFileName); err == nil { + globals.MetricLog = log.New(metricFile, "", log.LstdFlags|log.Lshortfile|log.Lmsgprefix) + } else { + log.Panicf("Impossible to open metrics file %s", metricsFileName) + } + clock := ptp.NewVirtualClock(delay) ptpServer := ptp.NewServer(clock) - // Divided by 100 -> average size of a RTP packet - audioBuffer := audio.NewRing(globals.BufferSize / 100) - - player := audio.NewPlayer(clock, audioBuffer) + audioClock := audio.NewClock(clock) + filter, err := filters.NewResamplingFilter(audioClock, &config.Config.AudioMetrics) + if err != nil { + panic(err) + } + player := audio.NewPlayer(audioClock, filter) wg := new(sync.WaitGroup) wg.Add(4) diff --git a/audio/rtp.go b/rtp/frame.go similarity index 60% rename from audio/rtp.go rename to rtp/frame.go index 31bd00b..5f00b5b 100644 --- a/audio/rtp.go +++ b/rtp/frame.go @@ -1,4 +1,4 @@ -package audio +package rtp import ( "encoding/binary" @@ -12,16 +12,20 @@ type TcpPacket struct { SequenceNumber uint32 } -type PCMFrame struct { +type Frame struct { TcpPacket - pcmData []byte + aacData []byte } -func (p *PCMFrame) Data() []byte { - return p.pcmData +func (p *Frame) PcmData(aacDecoder *codec.AacDecoder, pcm []int16) (int, error) { + decode, err := aacDecoder.DecodeTo(p.aacData, pcm) + if err != nil { + return -1, err + } + return decode, nil } -func NewFrame(aacDecoder *codec.AacDecoder, sharedKey []byte, rawPacket []byte) (*PCMFrame, error) { +func NewFrame(rawPacket []byte, sharedKey []byte) (*Frame, error) { var err error packet := TcpPacket{} if err = packet.Unmarshal(rawPacket); err != nil { @@ -32,18 +36,17 @@ func NewFrame(aacDecoder *codec.AacDecoder, sharedKey []byte, rawPacket []byte) packet.Marker = false // used by apple in sequenceNumber packet.PayloadType = 0 // used by apple in sequenceNumber packet.SequenceNumber = binary.BigEndian.Uint32(seqBytes[:]) - message := packet.Payload[:len(packet.Payload)-24] + encryptedData := packet.Payload[:len(packet.Payload)-24] nonce := packet.Payload[len(packet.Payload)-8:] var mac [16]byte copy(mac[:], packet.Payload[len(packet.Payload)-24:len(packet.Payload)-8]) aad := packet.Raw[4:0xc] - decrypted, err := chacha20poly1305.DecryptAndVerify(sharedKey, nonce, message, mac, aad) - if err != nil { - return nil, err - } - decode, err := aacDecoder.Decode(decrypted) + decrypted, err := chacha20poly1305.DecryptAndVerify(sharedKey, + nonce, encryptedData, mac, aad) if err != nil { return nil, err } - return &PCMFrame{packet, decode}, nil + return &Frame{TcpPacket: packet, + aacData: decrypted, + }, nil }