Skip to content

Commit

Permalink
merge to superset optimizations - concurrency support
Browse files Browse the repository at this point in the history
  • Loading branch information
aliszka committed Sep 10, 2024
1 parent ed969e0 commit 8288f7e
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 192 deletions.
188 changes: 124 additions & 64 deletions bitmap_opt.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,125 @@
package sroar

import (
"sync"
)

const minContainersForConcurrency = 16

// AndToSuperset calculates intersection of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
// and utilize container buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

dstNumKeys := dst.keys.numKeys()
if src == nil {
concurrentlyOnRange(conc, dstNumKeys, func(_, from, to int) {
zeroOutSelectedContainers(dst, from, to)
})
return
}

srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
andSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
})
}

// OrToSuperset calculates union of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize containers buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

if src == nil {
return
}

srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, srcNumKeys, func(i, from, to int) {
orSelectedContainers(dst, src, from, to, containerBufs[i])
})
}

// AndNotToSuperset calculates difference between current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize containers buffers provided.
// Number of passed buffers indicates concurrency level
// (e.g. 4 buffers = merge will be performed by 4 goroutines).
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBufs ...[]uint16) {
conc := len(containerBufs)
assert(conc > 0)

if src == nil {
for ai, an := 0, dst.keys.numKeys(); ai < an; ai++ {
off := dst.keys.val(ai)
zeroOutContainer(dst.getContainer(off))
return
}

dstNumKeys := dst.keys.numKeys()
srcNumKeys := src.keys.numKeys()
concurrentlyOnRange(conc, dstNumKeys, func(i, from, to int) {
andNotSelectedContainers(dst, src, from, to, 0, srcNumKeys, containerBufs[i])
})
}

func (ra *Bitmap) ConvertToBitmapContainers() {
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
ak := ra.keys.key(ai)
off := ra.keys.val(ai)
ac := ra.getContainer(off)

if ac[indexType] == typeArray {
c := array(ac).toBitmapContainer(nil)
offset := ra.newContainer(uint16(len(c)))
copy(ra.data[offset:], c)
ra.setKey(ak, offset)
}
}
}

func concurrentlyOnRange(conc, max int, callback func(i, from, to int)) {
if conc == 1 || max < conc*minContainersForConcurrency {
callback(0, 0, max)
return
}

a, b := dst, src
ai, an := 0, a.keys.numKeys()
bi, bn := 0, b.keys.numKeys()
delta := max / conc

wg := new(sync.WaitGroup)
wg.Add(conc - 1)
for i := 0; i < conc-1; i++ {
go func(i int) {
callback(i, delta*i, delta*(i+1))
wg.Done()
}(i)
}
callback(conc-1, delta*(conc-1), max)
wg.Wait()
}

func zeroOutSelectedContainers(a *Bitmap, ai, an int) {
for ; ai < an; ai++ {
off := a.keys.val(ai)
zeroOutContainer(a.getContainer(off))
}
}

func andSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
for ai < an && bi < bn {
ak := a.keys.key(ai)
bk := b.keys.key(bi)
Expand Down Expand Up @@ -49,64 +150,38 @@ func (dst *Bitmap) AndToSuperset(src *Bitmap, containerBuf []uint16) {
}
}

// OrToSuperset calculates union of current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) OrToSuperset(src *Bitmap, containerBuf []uint16) {
if src == nil {
return
}

srcIdx, numKeys := 0, src.keys.numKeys()
for ; srcIdx < numKeys; srcIdx++ {
srcCont := src.getContainer(src.keys.val(srcIdx))
if getCardinality(srcCont) == 0 {
func orSelectedContainers(a, b *Bitmap, bi, bn int, containerBuf []uint16) {
for ; bi < bn; bi++ {
off := b.keys.val(bi)
bc := b.getContainer(off)
if getCardinality(bc) == 0 {
continue
}

key := src.keys.key(srcIdx)

dstIdx := dst.keys.search(key)
if dstIdx >= dst.keys.numKeys() || dst.keys.key(dstIdx) != key {
bk := b.keys.key(bi)
ai := a.keys.search(bk)
if ai >= a.keys.numKeys() || a.keys.key(ai) != bk {
// Container does not exist in dst.
panic("Current bitmap should have all containers of incoming bitmap")
} else {
// Container exists in dst as well. Do an inline containerOr.
offset := dst.keys.val(dstIdx)
dstCont := dst.getContainer(offset)
containerOrToSuperset(dstCont, srcCont, containerBuf)
off = a.keys.val(ai)
ac := a.getContainer(off)
containerOrToSuperset(ac, bc, containerBuf)
}
}
}

// AndNotToSuperset calculates difference between current and incoming bitmap
// It reuses containers present in current bitmap
// and utilize container buffer provided.
//
// CAUTION: should be used only when current bitmap contained before
// all elements present in incoming bitmap
func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
if src == nil {
return
}

a, b := dst, src
ai, an := 0, a.keys.numKeys()
bi, bn := 0, b.keys.numKeys()

func andNotSelectedContainers(a, b *Bitmap, ai, an, bi, bn int, containerBuf []uint16) {
for ai < an && bi < bn {
ak := a.keys.key(ai)
bk := b.keys.key(bi)
if ak == bk {
off := a.keys.val(ai)
ac := a.getContainer(off)
off = b.keys.val(bi)
off := b.keys.val(bi)
bc := b.getContainer(off)

if getCardinality(bc) != 0 {
off = a.keys.val(ai)
ac := a.getContainer(off)
containerAndNotToSuperset(ac, bc, containerBuf)
}
ai++
Expand All @@ -118,18 +193,3 @@ func (dst *Bitmap) AndNotToSuperset(src *Bitmap, containerBuf []uint16) {
}
}
}

func (ra *Bitmap) ConvertToBitmapContainers() {
for ai, an := 0, ra.keys.numKeys(); ai < an; ai++ {
ak := ra.keys.key(ai)
off := ra.keys.val(ai)
ac := ra.getContainer(off)

if ac[indexType] == typeArray {
c := array(ac).toBitmapContainer(nil)
offset := ra.newContainer(uint16(len(c)))
copy(ra.data[offset:], c)
ra.setKey(ak, offset)
}
}
}
Loading

0 comments on commit 8288f7e

Please sign in to comment.