Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1107 lines
27 KiB
Go
1107 lines
27 KiB
Go
package roaring64
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"math/big"
|
|
"runtime"
|
|
"sync"
|
|
)
|
|
|
|
// BSI is at its simplest is an array of bitmaps that represent an encoded
|
|
// binary value. The advantage of a BSI is that comparisons can be made
|
|
// across ranges of values whereas a bitmap can only represent the existence
|
|
// of a single value for a given column ID. Another usage scenario involves
|
|
// storage of high cardinality values.
|
|
//
|
|
// It depends upon the bitmap libraries. It is not thread safe, so
|
|
// upstream concurrency guards must be provided.
|
|
type BSI struct {
|
|
bA []Bitmap
|
|
eBM Bitmap // Existence BitMap
|
|
MaxValue int64
|
|
MinValue int64
|
|
runOptimized bool
|
|
}
|
|
|
|
// NewBSI constructs a new BSI. Note that it is your responsibility to ensure that
|
|
// the min/max values are set correctly. Queries CompareValue, MinMax, etc. will not
|
|
// work correctly if the min/max values are not set correctly.
|
|
func NewBSI(maxValue int64, minValue int64) *BSI {
|
|
|
|
bitszmin := big.NewInt(minValue).BitLen() + 1
|
|
bitszmax := big.NewInt(maxValue).BitLen() + 1
|
|
bitsz := bitszmin
|
|
if bitszmax > bitsz {
|
|
bitsz = bitszmax
|
|
}
|
|
ba := make([]Bitmap, bitsz)
|
|
return &BSI{bA: ba, MaxValue: maxValue, MinValue: minValue}
|
|
}
|
|
|
|
// NewDefaultBSI constructs an auto-sized BSI
|
|
func NewDefaultBSI() *BSI {
|
|
return NewBSI(int64(0), int64(0))
|
|
}
|
|
|
|
// RunOptimize attempts to further compress the runs of consecutive values found in the bitmap
|
|
func (b *BSI) RunOptimize() {
|
|
b.eBM.RunOptimize()
|
|
for i := 0; i < len(b.bA); i++ {
|
|
b.bA[i].RunOptimize()
|
|
}
|
|
b.runOptimized = true
|
|
}
|
|
|
|
// HasRunCompression returns true if the bitmap benefits from run compression
|
|
func (b *BSI) HasRunCompression() bool {
|
|
return b.runOptimized
|
|
}
|
|
|
|
// GetExistenceBitmap returns a pointer to the underlying existence bitmap of the BSI
|
|
func (b *BSI) GetExistenceBitmap() *Bitmap {
|
|
return &b.eBM
|
|
}
|
|
|
|
// ValueExists tests whether the value exists.
|
|
func (b *BSI) ValueExists(columnID uint64) bool {
|
|
|
|
return b.eBM.Contains(uint64(columnID))
|
|
}
|
|
|
|
// GetCardinality returns a count of unique column IDs for which a value has been set.
|
|
func (b *BSI) GetCardinality() uint64 {
|
|
return b.eBM.GetCardinality()
|
|
}
|
|
|
|
// BitCount returns the number of bits needed to represent values.
|
|
func (b *BSI) BitCount() int {
|
|
return len(b.bA) - 1 // Exclude sign bit
|
|
}
|
|
|
|
// IsBigUInt returns the number of bits needed to represent values.
|
|
func (b *BSI) isBig() bool {
|
|
return len(b.bA) > 64
|
|
}
|
|
|
|
// IsNegative returns true for negative values
|
|
func (b *BSI) IsNegative(columnID uint64) bool {
|
|
if len(b.bA) == 0 {
|
|
return false
|
|
}
|
|
return b.bA[b.BitCount()].Contains(columnID)
|
|
}
|
|
|
|
// SetBigValue sets a value that exceeds 64 bits
|
|
func (b *BSI) SetBigValue(columnID uint64, value *big.Int) {
|
|
// If max/min values are set to zero then automatically determine bit array size
|
|
if b.MaxValue == 0 && b.MinValue == 0 {
|
|
minBits := value.BitLen() + 1
|
|
if minBits == 1 {
|
|
minBits = 2
|
|
}
|
|
for len(b.bA) < minBits {
|
|
b.bA = append(b.bA, Bitmap{})
|
|
}
|
|
}
|
|
|
|
for i := b.BitCount(); i >= 0; i-- {
|
|
if value.Bit(i) == 0 {
|
|
b.bA[i].Remove(columnID)
|
|
} else {
|
|
b.bA[i].Add(columnID)
|
|
}
|
|
}
|
|
b.eBM.Add(columnID)
|
|
}
|
|
|
|
// SetValue sets a value for a given columnID.
|
|
func (b *BSI) SetValue(columnID uint64, value int64) {
|
|
b.SetBigValue(columnID, big.NewInt(value))
|
|
}
|
|
|
|
// GetValue gets the value at the column ID. Second param will be false for non-existent values.
|
|
func (b *BSI) GetValue(columnID uint64) (value int64, exists bool) {
|
|
bv, exists := b.GetBigValue(columnID)
|
|
if !exists {
|
|
return
|
|
}
|
|
if !bv.IsInt64() {
|
|
if bv.Sign() == -1 {
|
|
msg := fmt.Errorf("can't represent a negative %d bit value as an int64", b.BitCount())
|
|
panic(msg)
|
|
}
|
|
if bv.Sign() == 1 {
|
|
msg := fmt.Errorf("can't represent a positive %d bit value as an int64", b.BitCount())
|
|
panic(msg)
|
|
}
|
|
}
|
|
return bv.Int64(), exists
|
|
}
|
|
|
|
// GetBigValue gets the value at the column ID. Second param will be false for non-existent values.
|
|
func (b *BSI) GetBigValue(columnID uint64) (value *big.Int, exists bool) {
|
|
exists = b.eBM.Contains(columnID)
|
|
if !exists {
|
|
return
|
|
}
|
|
val := big.NewInt(0)
|
|
for i := b.BitCount(); i >= 0; i-- {
|
|
if b.bA[i].Contains(columnID) {
|
|
bigBit := big.NewInt(1)
|
|
bigBit.Lsh(bigBit, uint(i))
|
|
val.Or(val, bigBit)
|
|
}
|
|
}
|
|
|
|
if b.IsNegative(columnID) {
|
|
val = negativeTwosComplementToInt(val)
|
|
}
|
|
return val, exists
|
|
}
|
|
|
|
func negativeTwosComplementToInt(val *big.Int) *big.Int {
|
|
inverted := new(big.Int).Not(val)
|
|
mask := new(big.Int).Lsh(big.NewInt(1), uint(val.BitLen()))
|
|
inverted.And(inverted, mask.Sub(mask, big.NewInt(1)))
|
|
inverted.Add(inverted, big.NewInt(1))
|
|
val.Neg(inverted)
|
|
return val
|
|
}
|
|
|
|
type action func(t *task, batch []uint64, resultsChan chan *Bitmap, wg *sync.WaitGroup)
|
|
|
|
func parallelExecutor(parallelism int, t *task, e action, foundSet *Bitmap) *Bitmap {
|
|
|
|
var n int = parallelism
|
|
if n == 0 {
|
|
n = runtime.NumCPU()
|
|
}
|
|
|
|
resultsChan := make(chan *Bitmap, n)
|
|
|
|
card := foundSet.GetCardinality()
|
|
x := card / uint64(n)
|
|
|
|
remainder := card - (x * uint64(n))
|
|
var batch []uint64
|
|
var wg sync.WaitGroup
|
|
iter := foundSet.ManyIterator()
|
|
for i := 0; i < n; i++ {
|
|
if i == n-1 {
|
|
batch = make([]uint64, x+remainder)
|
|
} else {
|
|
batch = make([]uint64, x)
|
|
}
|
|
iter.NextMany(batch)
|
|
wg.Add(1)
|
|
go e(t, batch, resultsChan, &wg)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
close(resultsChan)
|
|
|
|
ba := make([]*Bitmap, 0)
|
|
for bm := range resultsChan {
|
|
ba = append(ba, bm)
|
|
}
|
|
|
|
return ParOr(0, ba...)
|
|
|
|
}
|
|
|
|
type bsiAction func(input *BSI, filterSet *Bitmap, batch []uint64, resultsChan chan *BSI, wg *sync.WaitGroup)
|
|
|
|
func parallelExecutorBSIResults(parallelism int, input *BSI, e bsiAction, foundSet, filterSet *Bitmap, sumResults bool) *BSI {
|
|
|
|
var n int = parallelism
|
|
if n == 0 {
|
|
n = runtime.NumCPU()
|
|
}
|
|
|
|
resultsChan := make(chan *BSI, n)
|
|
|
|
card := foundSet.GetCardinality()
|
|
x := card / uint64(n)
|
|
|
|
remainder := card - (x * uint64(n))
|
|
var batch []uint64
|
|
var wg sync.WaitGroup
|
|
iter := foundSet.ManyIterator()
|
|
for i := 0; i < n; i++ {
|
|
if i == n-1 {
|
|
batch = make([]uint64, x+remainder)
|
|
} else {
|
|
batch = make([]uint64, x)
|
|
}
|
|
iter.NextMany(batch)
|
|
wg.Add(1)
|
|
go e(input, filterSet, batch, resultsChan, &wg)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
close(resultsChan)
|
|
|
|
ba := make([]*BSI, 0)
|
|
for bm := range resultsChan {
|
|
ba = append(ba, bm)
|
|
}
|
|
|
|
results := NewDefaultBSI()
|
|
if sumResults {
|
|
for _, v := range ba {
|
|
results.Add(v)
|
|
}
|
|
} else {
|
|
results.ParOr(0, ba...)
|
|
}
|
|
return results
|
|
|
|
}
|
|
|
|
// Operation identifier
|
|
type Operation int
|
|
|
|
const (
|
|
// LT less than
|
|
LT Operation = 1 + iota
|
|
// LE less than or equal
|
|
LE
|
|
// EQ equal
|
|
EQ
|
|
// GE greater than or equal
|
|
GE
|
|
// GT greater than
|
|
GT
|
|
// RANGE range
|
|
RANGE
|
|
// MIN find minimum
|
|
MIN
|
|
// MAX find maximum
|
|
MAX
|
|
)
|
|
|
|
type task struct {
|
|
bsi *BSI
|
|
op Operation
|
|
valueOrStart *big.Int
|
|
end *big.Int
|
|
values map[string]struct{}
|
|
bits *Bitmap
|
|
}
|
|
|
|
// CompareValue compares value.
|
|
// Values should be in the range of the BSI (max, min). If the value is outside the range, the result
|
|
// might erroneous. The operation parameter indicates the type of comparison to be made.
|
|
// For all operations with the exception of RANGE, the value to be compared is specified by valueOrStart.
|
|
// For the RANGE parameter the comparison criteria is >= valueOrStart and <= end.
|
|
// The parallelism parameter indicates the number of CPU threads to be applied for processing. A value
|
|
// of zero indicates that all available CPU resources will be potentially utilized.
|
|
func (b *BSI) CompareValue(parallelism int, op Operation, valueOrStart, end int64,
|
|
foundSet *Bitmap) *Bitmap {
|
|
|
|
return b.CompareBigValue(parallelism, op, big.NewInt(valueOrStart), big.NewInt(end), foundSet)
|
|
}
|
|
|
|
// CompareBigValue compares value.
|
|
// Values should be in the range of the BSI (max, min). If the value is outside the range, the result
|
|
// might erroneous. The operation parameter indicates the type of comparison to be made.
|
|
// For all operations with the exception of RANGE, the value to be compared is specified by valueOrStart.
|
|
// For the RANGE parameter the comparison criteria is >= valueOrStart and <= end.
|
|
// The parallelism parameter indicates the number of CPU threads to be applied for processing. A value
|
|
// of zero indicates that all available CPU resources will be potentially utilized.
|
|
func (b *BSI) CompareBigValue(parallelism int, op Operation, valueOrStart, end *big.Int,
|
|
foundSet *Bitmap) *Bitmap {
|
|
|
|
if valueOrStart == nil {
|
|
valueOrStart = b.MinMaxBig(parallelism, MIN, &b.eBM)
|
|
}
|
|
if end == nil && op == RANGE {
|
|
end = b.MinMaxBig(parallelism, MAX, &b.eBM)
|
|
}
|
|
|
|
comp := &task{bsi: b, op: op, valueOrStart: valueOrStart, end: end}
|
|
if foundSet == nil {
|
|
return parallelExecutor(parallelism, comp, compareValue, &b.eBM)
|
|
}
|
|
return parallelExecutor(parallelism, comp, compareValue, foundSet)
|
|
}
|
|
|
|
// Returns a twos complement value given a value, the return will be bit extended to 'bits' length
|
|
// if the value is negative
|
|
func twosComplement(num *big.Int, bitCount int) *big.Int {
|
|
// Check if the number is negative
|
|
isNegative := num.Sign() < 0
|
|
|
|
// Get the absolute value if negative
|
|
abs := new(big.Int).Abs(num)
|
|
|
|
// Convert to binary string
|
|
binStr := abs.Text(2)
|
|
|
|
// Pad with zeros to the left
|
|
if len(binStr) < bitCount {
|
|
binStr = fmt.Sprintf("%0*s", bitCount, binStr)
|
|
}
|
|
|
|
// If negative, calculate two's complement
|
|
if isNegative {
|
|
// Invert bits
|
|
inverted := make([]byte, len(binStr))
|
|
for i := range binStr {
|
|
if binStr[i] == '0' {
|
|
inverted[i] = '1'
|
|
} else {
|
|
inverted[i] = '0'
|
|
}
|
|
}
|
|
|
|
// Add 1
|
|
carry := byte(1)
|
|
for i := len(inverted) - 1; i >= 0; i-- {
|
|
inverted[i] += carry
|
|
if inverted[i] == '2' {
|
|
inverted[i] = '0'
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
binStr = string(inverted)
|
|
}
|
|
|
|
bigInt := new(big.Int)
|
|
_, _ = bigInt.SetString(binStr, 2)
|
|
return bigInt
|
|
}
|
|
|
|
func compareValue(e *task, batch []uint64, resultsChan chan *Bitmap, wg *sync.WaitGroup) {
|
|
|
|
defer wg.Done()
|
|
|
|
results := NewBitmap()
|
|
if e.bsi.runOptimized {
|
|
results.RunOptimize()
|
|
}
|
|
|
|
startIsNegative := e.valueOrStart.Sign() == -1
|
|
endIsNegative := true
|
|
if e.end != nil {
|
|
endIsNegative = e.end.Sign() == -1
|
|
}
|
|
|
|
for i := 0; i < len(batch); i++ {
|
|
cID := batch[i]
|
|
eq1, eq2 := true, true
|
|
lt1, lt2, gt1 := false, false, false
|
|
j := e.bsi.BitCount()
|
|
isNegative := e.bsi.IsNegative(cID)
|
|
compStartValue := e.valueOrStart
|
|
compEndValue := e.end
|
|
if isNegative != startIsNegative {
|
|
compStartValue = twosComplement(e.valueOrStart, e.bsi.BitCount()+1)
|
|
}
|
|
if isNegative != endIsNegative && e.end != nil {
|
|
compEndValue = twosComplement(e.end, e.bsi.BitCount()+1)
|
|
}
|
|
|
|
for ; j >= 0; j-- {
|
|
sliceContainsBit := e.bsi.bA[j].Contains(cID)
|
|
|
|
if compStartValue.Bit(j) == 1 {
|
|
// BIT in value is SET
|
|
if !sliceContainsBit {
|
|
if eq1 {
|
|
if (e.op == GT || e.op == GE || e.op == RANGE) && startIsNegative && !isNegative {
|
|
gt1 = true
|
|
}
|
|
if e.op == LT || e.op == LE {
|
|
if !startIsNegative || (startIsNegative == isNegative) {
|
|
lt1 = true
|
|
}
|
|
}
|
|
eq1 = false
|
|
if e.op != RANGE {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// BIT in value is CLEAR
|
|
if sliceContainsBit {
|
|
if eq1 {
|
|
if (e.op == LT || e.op == LE) && isNegative && !startIsNegative {
|
|
lt1 = true
|
|
}
|
|
if e.op == GT || e.op == GE || e.op == RANGE {
|
|
if startIsNegative || (startIsNegative == isNegative) {
|
|
gt1 = true
|
|
}
|
|
}
|
|
eq1 = false
|
|
|
|
if e.op != RANGE {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if e.op == RANGE && compEndValue.Bit(j) == 1 {
|
|
// BIT in value is SET
|
|
if !sliceContainsBit {
|
|
if eq2 {
|
|
if !endIsNegative || (endIsNegative == isNegative) {
|
|
lt2 = true
|
|
}
|
|
eq2 = false
|
|
if startIsNegative && !endIsNegative {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else if e.op == RANGE {
|
|
// BIT in value is CLEAR
|
|
if sliceContainsBit {
|
|
if eq2 {
|
|
if isNegative && !endIsNegative {
|
|
lt2 = true
|
|
}
|
|
eq2 = false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
switch e.op {
|
|
case LT:
|
|
if lt1 {
|
|
results.Add(cID)
|
|
}
|
|
case LE:
|
|
if lt1 || (eq1 && (!startIsNegative || (startIsNegative && isNegative))) {
|
|
results.Add(cID)
|
|
}
|
|
case EQ:
|
|
if eq1 {
|
|
results.Add(cID)
|
|
}
|
|
case GE:
|
|
if gt1 || (eq1 && (startIsNegative || (!startIsNegative && !isNegative))) {
|
|
results.Add(cID)
|
|
}
|
|
case GT:
|
|
if gt1 {
|
|
results.Add(cID)
|
|
}
|
|
case RANGE:
|
|
if (eq1 || gt1) && (eq2 || lt2) {
|
|
results.Add(cID)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("Operation [%v] not supported here", e.op))
|
|
}
|
|
}
|
|
|
|
resultsChan <- results
|
|
}
|
|
|
|
// MinMax - Find minimum or maximum int64 value.
|
|
func (b *BSI) MinMax(parallelism int, op Operation, foundSet *Bitmap) int64 {
|
|
return b.MinMaxBig(parallelism, op, foundSet).Int64()
|
|
}
|
|
|
|
// MinMaxBig - Find minimum or maximum value.
|
|
func (b *BSI) MinMaxBig(parallelism int, op Operation, foundSet *Bitmap) *big.Int {
|
|
|
|
var n int = parallelism
|
|
if n == 0 {
|
|
n = runtime.NumCPU()
|
|
}
|
|
|
|
resultsChan := make(chan *big.Int, n)
|
|
|
|
if foundSet == nil {
|
|
foundSet = &b.eBM
|
|
}
|
|
|
|
card := foundSet.GetCardinality()
|
|
x := card / uint64(n)
|
|
|
|
remainder := card - (x * uint64(n))
|
|
var batch []uint64
|
|
var wg sync.WaitGroup
|
|
iter := foundSet.ManyIterator()
|
|
for i := 0; i < n; i++ {
|
|
if i == n-1 {
|
|
batch = make([]uint64, x+remainder)
|
|
} else {
|
|
batch = make([]uint64, x)
|
|
}
|
|
iter.NextMany(batch)
|
|
wg.Add(1)
|
|
go b.minOrMax(op, batch, resultsChan, &wg)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
close(resultsChan)
|
|
var minMax *big.Int
|
|
minSigned, maxSigned := minMaxSignedInt(b.BitCount() + 1)
|
|
if op == MAX {
|
|
minMax = minSigned
|
|
} else {
|
|
minMax = maxSigned
|
|
}
|
|
|
|
for val := range resultsChan {
|
|
if (op == MAX && val.Cmp(minMax) > 0) || (op == MIN && val.Cmp(minMax) <= 0) {
|
|
minMax = val
|
|
}
|
|
}
|
|
return minMax
|
|
}
|
|
|
|
func minMaxSignedInt(bits int) (*big.Int, *big.Int) {
|
|
// Calculate the maximum value
|
|
max := new(big.Int).Lsh(big.NewInt(1), uint(bits-1))
|
|
max.Sub(max, big.NewInt(1))
|
|
|
|
// Calculate the minimum value
|
|
min := new(big.Int).Neg(max)
|
|
min.Sub(min, big.NewInt(1))
|
|
|
|
return min, max
|
|
}
|
|
|
|
func (b *BSI) minOrMax(op Operation, batch []uint64, resultsChan chan *big.Int, wg *sync.WaitGroup) {
|
|
|
|
defer wg.Done()
|
|
|
|
x := b.BitCount() + 1
|
|
var value *big.Int
|
|
minSigned, maxSigned := minMaxSignedInt(x)
|
|
if op == MAX {
|
|
value = minSigned
|
|
} else {
|
|
value = maxSigned
|
|
}
|
|
|
|
for i := 0; i < len(batch); i++ {
|
|
cID := batch[i]
|
|
eq := true
|
|
lt, gt := false, false
|
|
j := b.BitCount()
|
|
cVal := new(big.Int)
|
|
valueIsNegative := value.Sign() == -1
|
|
isNegative := b.IsNegative(cID)
|
|
|
|
compValue := value
|
|
if isNegative != valueIsNegative {
|
|
// convert compValue to twos complement
|
|
inverted := new(big.Int).Not(compValue)
|
|
mask := new(big.Int).Lsh(big.NewInt(1), uint(compValue.BitLen()))
|
|
inverted.And(inverted, mask.Sub(mask, big.NewInt(1)))
|
|
inverted.Add(inverted, big.NewInt(1))
|
|
}
|
|
|
|
done := false
|
|
for ; j >= 0; j-- {
|
|
sliceContainsBit := b.bA[j].Contains(cID)
|
|
if sliceContainsBit {
|
|
bigBit := big.NewInt(1)
|
|
bigBit.Lsh(bigBit, uint(j))
|
|
cVal.Or(cVal, bigBit)
|
|
if isNegative {
|
|
cVal = negativeTwosComplementToInt(cVal)
|
|
}
|
|
}
|
|
if done {
|
|
continue
|
|
}
|
|
if compValue.Bit(j) == 1 {
|
|
// BIT in value is SET
|
|
if !sliceContainsBit {
|
|
if eq {
|
|
eq = false
|
|
if op == MAX && valueIsNegative && !isNegative {
|
|
gt = true
|
|
done = true
|
|
}
|
|
if op == MIN && (!valueIsNegative || (valueIsNegative == isNegative)) {
|
|
lt = true
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// BIT in value is CLEAR
|
|
if sliceContainsBit {
|
|
if eq {
|
|
eq = false
|
|
if op == MIN && isNegative && !valueIsNegative {
|
|
lt = true
|
|
}
|
|
if op == MAX && (valueIsNegative || (valueIsNegative == isNegative)) {
|
|
gt = true
|
|
done = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if lt || gt {
|
|
value = cVal
|
|
}
|
|
}
|
|
|
|
resultsChan <- value
|
|
}
|
|
|
|
// Sum all values contained within the foundSet. As a convenience, the cardinality of the foundSet
|
|
// is also returned (for calculating the average).
|
|
func (b *BSI) Sum(foundSet *Bitmap) (int64, uint64) {
|
|
val, count := b.SumBigValues(foundSet)
|
|
return val.Int64(), count
|
|
}
|
|
|
|
// SumBigValues - Sum all values contained within the foundSet. As a convenience, the cardinality of the foundSet
|
|
// is also returned (for calculating the average). This method will sum arbitrarily large values.
|
|
func (b *BSI) SumBigValues(foundSet *Bitmap) (sum *big.Int, count uint64) {
|
|
if foundSet == nil {
|
|
foundSet = &b.eBM
|
|
}
|
|
sum = new(big.Int)
|
|
count = foundSet.GetCardinality()
|
|
resultsChan := make(chan int64, b.BitCount())
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < b.BitCount(); i++ {
|
|
wg.Add(1)
|
|
go func(j int) {
|
|
defer wg.Done()
|
|
resultsChan <- int64(foundSet.AndCardinality(&b.bA[j]) << uint(j))
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
close(resultsChan)
|
|
|
|
for val := range resultsChan {
|
|
sum.Add(sum, big.NewInt(val))
|
|
}
|
|
sum.Sub(sum, big.NewInt(int64(foundSet.AndCardinality(&b.bA[b.BitCount()])<<uint(b.BitCount()))))
|
|
|
|
return sum, count
|
|
}
|
|
|
|
// Transpose calls b.IntersectAndTranspose(0, b.eBM)
|
|
func (b *BSI) Transpose() *Bitmap {
|
|
return b.IntersectAndTranspose(0, &b.eBM)
|
|
}
|
|
|
|
// IntersectAndTranspose is a matrix transpose function. Return a bitmap such that the values are represented as column IDs
|
|
// in the returned bitmap. This is accomplished by iterating over the foundSet and only including
|
|
// the column IDs in the source (foundSet) as compared with this BSI. This can be useful for
|
|
// vectoring one set of integers to another.
|
|
//
|
|
// TODO: This implementation is functional but not performant, needs to be re-written perhaps using SIMD SSE2 instructions.
|
|
func (b *BSI) IntersectAndTranspose(parallelism int, foundSet *Bitmap) *Bitmap {
|
|
if foundSet == nil {
|
|
foundSet = &b.eBM
|
|
}
|
|
trans := &task{bsi: b}
|
|
return parallelExecutor(parallelism, trans, transpose, foundSet)
|
|
}
|
|
|
|
func transpose(e *task, batch []uint64, resultsChan chan *Bitmap, wg *sync.WaitGroup) {
|
|
|
|
defer wg.Done()
|
|
|
|
results := NewBitmap()
|
|
if e.bsi.runOptimized {
|
|
results.RunOptimize()
|
|
}
|
|
for _, cID := range batch {
|
|
if value, ok := e.bsi.GetValue(uint64(cID)); ok {
|
|
results.Add(uint64(value))
|
|
}
|
|
}
|
|
resultsChan <- results
|
|
}
|
|
|
|
// ParOr is intended primarily to be a concatenation function to be used during bulk load operations.
|
|
// Care should be taken to make sure that columnIDs do not overlap (unless overlapping values are
|
|
// identical).
|
|
func (b *BSI) ParOr(parallelism int, bsis ...*BSI) {
|
|
|
|
// Consolidate sets
|
|
bits := len(b.bA)
|
|
for i := 0; i < len(bsis); i++ {
|
|
if len(bsis[i].bA) > bits {
|
|
bits = len(bsis[i].bA )
|
|
}
|
|
}
|
|
|
|
// Make sure we have enough bit slices
|
|
for bits > len(b.bA) {
|
|
bm := Bitmap{}
|
|
bm.RunOptimize()
|
|
b.bA = append(b.bA, bm)
|
|
}
|
|
|
|
a := make([][]*Bitmap, bits)
|
|
for i := range a {
|
|
a[i] = make([]*Bitmap, 0)
|
|
for _, x := range bsis {
|
|
if len(x.bA) > i {
|
|
a[i] = append(a[i], &x.bA[i])
|
|
} else {
|
|
if b.runOptimized {
|
|
a[i][0].RunOptimize()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Consolidate existence bit maps
|
|
ebms := make([]*Bitmap, len(bsis))
|
|
for i := range ebms {
|
|
ebms[i] = &bsis[i].eBM
|
|
}
|
|
|
|
// First merge all the bit slices from all bsi maps that exist in target
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < bits; i++ {
|
|
wg.Add(1)
|
|
go func(j int) {
|
|
defer wg.Done()
|
|
x := []*Bitmap{&b.bA[j]}
|
|
x = append(x, a[j]...)
|
|
b.bA[j] = *ParOr(parallelism, x...)
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
|
|
// merge all the EBM maps
|
|
x := []*Bitmap{&b.eBM}
|
|
x = append(x, ebms...)
|
|
b.eBM = *ParOr(parallelism, x...)
|
|
}
|
|
|
|
// UnmarshalBinary de-serialize a BSI. The value at bitData[0] is the EBM. Other indices are in least to most
|
|
// significance order starting at bitData[1] (bit position 0).
|
|
func (b *BSI) UnmarshalBinary(bitData [][]byte) error {
|
|
|
|
for i := 1; i < len(bitData); i++ {
|
|
if bitData == nil || len(bitData[i]) == 0 {
|
|
continue
|
|
}
|
|
if b.BitCount() < i {
|
|
newBm := Bitmap{}
|
|
if b.runOptimized {
|
|
newBm.RunOptimize()
|
|
}
|
|
b.bA = append(b.bA, newBm)
|
|
}
|
|
if err := b.bA[i-1].UnmarshalBinary(bitData[i]); err != nil {
|
|
return err
|
|
}
|
|
if b.runOptimized {
|
|
b.bA[i-1].RunOptimize()
|
|
}
|
|
|
|
}
|
|
// First element of bitData is the EBM
|
|
if bitData[0] == nil {
|
|
b.eBM = Bitmap{}
|
|
if b.runOptimized {
|
|
b.eBM.RunOptimize()
|
|
}
|
|
return nil
|
|
}
|
|
if err := b.eBM.UnmarshalBinary(bitData[0]); err != nil {
|
|
return err
|
|
}
|
|
if b.runOptimized {
|
|
b.eBM.RunOptimize()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadFrom reads a serialized version of this BSI from stream.
|
|
func (b *BSI) ReadFrom(stream io.Reader) (p int64, err error) {
|
|
bm, n, err := readBSIContainerFromStream(stream)
|
|
p += n
|
|
if err != nil {
|
|
err = fmt.Errorf("reading existence bitmap: %w", err)
|
|
return
|
|
}
|
|
b.eBM = bm
|
|
b.bA = b.bA[:0]
|
|
for {
|
|
// This forces a new memory location to be allocated and if we're lucky it only escapes if
|
|
// there's no error.
|
|
var bm Bitmap
|
|
bm, n, err = readBSIContainerFromStream(stream)
|
|
p += n
|
|
if err == io.EOF {
|
|
err = nil
|
|
return
|
|
}
|
|
if err != nil {
|
|
err = fmt.Errorf("reading bit slice index %v: %w", len(b.bA), err)
|
|
return
|
|
}
|
|
b.bA = append(b.bA, bm)
|
|
}
|
|
}
|
|
|
|
func readBSIContainerFromStream(r io.Reader) (bm Bitmap, p int64, err error) {
|
|
p, err = bm.ReadFrom(r)
|
|
return
|
|
}
|
|
|
|
// MarshalBinary serializes a BSI
|
|
func (b *BSI) MarshalBinary() ([][]byte, error) {
|
|
|
|
var err error
|
|
data := make([][]byte, b.BitCount()+1)
|
|
// Add extra element for EBM (BitCount() + 1)
|
|
for i := 1; i < b.BitCount()+1; i++ {
|
|
data[i], err = b.bA[i-1].MarshalBinary()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
// Marshal EBM
|
|
data[0], err = b.eBM.MarshalBinary()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// WriteTo writes a serialized version of this BSI to stream.
|
|
func (b *BSI) WriteTo(w io.Writer) (n int64, err error) {
|
|
n1, err := b.eBM.WriteTo(w)
|
|
n += n1
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, bm := range b.bA {
|
|
n1, err = bm.WriteTo(w)
|
|
n += n1
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// BatchEqual returns a bitmap containing the column IDs where the values are contained within the list of values provided.
|
|
func (b *BSI) BatchEqual(parallelism int, values []int64) *Bitmap {
|
|
//convert list of int64 values to big.Int(s)
|
|
bigValues := make([]*big.Int, len(values))
|
|
for i, v := range values {
|
|
bigValues[i] = big.NewInt(v)
|
|
}
|
|
return b.BatchEqualBig(parallelism, bigValues)
|
|
}
|
|
|
|
// BatchEqualBig returns a bitmap containing the column IDs where the values are contained within the list of values provided.
|
|
func (b *BSI) BatchEqualBig(parallelism int, values []*big.Int) *Bitmap {
|
|
|
|
valMap := make(map[string]struct{}, len(values))
|
|
for i := 0; i < len(values); i++ {
|
|
valMap[string(values[i].Bytes())] = struct{}{}
|
|
}
|
|
comp := &task{bsi: b, values: valMap}
|
|
return parallelExecutor(parallelism, comp, batchEqual, &b.eBM)
|
|
}
|
|
|
|
func batchEqual(e *task, batch []uint64, resultsChan chan *Bitmap,
|
|
wg *sync.WaitGroup) {
|
|
|
|
defer wg.Done()
|
|
|
|
results := NewBitmap()
|
|
if e.bsi.runOptimized {
|
|
results.RunOptimize()
|
|
}
|
|
|
|
for i := 0; i < len(batch); i++ {
|
|
cID := batch[i]
|
|
if value, ok := e.bsi.GetBigValue(uint64(cID)); ok {
|
|
if _, yes := e.values[string(value.Bytes())]; yes {
|
|
results.Add(cID)
|
|
}
|
|
}
|
|
}
|
|
resultsChan <- results
|
|
}
|
|
|
|
// ClearBits cleared the bits that exist in the target if they are also in the found set.
|
|
func ClearBits(foundSet, target *Bitmap) {
|
|
iter := foundSet.Iterator()
|
|
for iter.HasNext() {
|
|
cID := iter.Next()
|
|
target.Remove(cID)
|
|
}
|
|
}
|
|
|
|
// ClearValues removes the values found in foundSet
|
|
func (b *BSI) ClearValues(foundSet *Bitmap) {
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
ClearBits(foundSet, &b.eBM)
|
|
}()
|
|
for i := 0; i < b.BitCount(); i++ {
|
|
wg.Add(1)
|
|
go func(j int) {
|
|
defer wg.Done()
|
|
ClearBits(foundSet, &b.bA[j])
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// NewBSIRetainSet - Construct a new BSI from a clone of existing BSI, retain only values contained in foundSet
|
|
func (b *BSI) NewBSIRetainSet(foundSet *Bitmap) *BSI {
|
|
|
|
newBSI := NewDefaultBSI()
|
|
newBSI.bA = make([]Bitmap, b.BitCount()+1)
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
newBSI.eBM = *b.eBM.Clone()
|
|
newBSI.eBM.And(foundSet)
|
|
}()
|
|
for i := 0; i < b.BitCount(); i++ {
|
|
wg.Add(1)
|
|
go func(j int) {
|
|
defer wg.Done()
|
|
newBSI.bA[j] = *b.bA[j].Clone()
|
|
newBSI.bA[j].And(foundSet)
|
|
}(i)
|
|
}
|
|
wg.Wait()
|
|
return newBSI
|
|
}
|
|
|
|
// Clone performs a deep copy of BSI contents.
|
|
func (b *BSI) Clone() *BSI {
|
|
return b.NewBSIRetainSet(&b.eBM)
|
|
}
|
|
|
|
// Add - In-place sum the contents of another BSI with this BSI, column wise.
|
|
func (b *BSI) Add(other *BSI) {
|
|
|
|
b.eBM.Or(&other.eBM)
|
|
for i := 0; i < len(other.bA); i++ {
|
|
b.addDigit(&other.bA[i], i)
|
|
}
|
|
}
|
|
|
|
func (b *BSI) addDigit(foundSet *Bitmap, i int) {
|
|
|
|
if i >= b.BitCount()+1 || b.BitCount() == 0 {
|
|
b.bA = append(b.bA, Bitmap{})
|
|
}
|
|
carry := And(&b.bA[i], foundSet)
|
|
b.bA[i].Xor(foundSet)
|
|
if !carry.IsEmpty() {
|
|
if i+1 >= b.BitCount() {
|
|
b.bA = append(b.bA, Bitmap{})
|
|
}
|
|
b.addDigit(carry, i+1)
|
|
}
|
|
}
|
|
|
|
// TransposeWithCounts is a matrix transpose function that returns a BSI that has a columnID system defined by the values
|
|
// contained within the input BSI. Given that for BSIs, different columnIDs can have the same value. TransposeWithCounts
|
|
// is useful for situations where there is a one-to-many relationship between the vectored integer sets. The resulting BSI
|
|
// contains the number of times a particular value appeared in the input BSI.
|
|
func (b *BSI) TransposeWithCounts(parallelism int, foundSet, filterSet *Bitmap) *BSI {
|
|
if foundSet == nil {
|
|
foundSet = &b.eBM
|
|
}
|
|
if filterSet == nil {
|
|
filterSet = &b.eBM
|
|
}
|
|
return parallelExecutorBSIResults(parallelism, b, transposeWithCounts, foundSet, filterSet, true)
|
|
}
|
|
|
|
func transposeWithCounts(input *BSI, filterSet *Bitmap, batch []uint64, resultsChan chan *BSI, wg *sync.WaitGroup) {
|
|
|
|
defer wg.Done()
|
|
|
|
results := NewDefaultBSI()
|
|
if input.runOptimized {
|
|
results.RunOptimize()
|
|
}
|
|
for _, cID := range batch {
|
|
if value, ok := input.GetValue(uint64(cID)); ok {
|
|
if !filterSet.Contains(uint64(value)) {
|
|
continue
|
|
}
|
|
if val, ok2 := results.GetValue(uint64(value)); !ok2 {
|
|
results.SetValue(uint64(value), 1)
|
|
} else {
|
|
val++
|
|
results.SetValue(uint64(value), val)
|
|
}
|
|
}
|
|
}
|
|
resultsChan <- results
|
|
}
|
|
|
|
// Increment - In-place increment of values in a BSI. Found set select columns for incrementing.
|
|
func (b *BSI) Increment(foundSet *Bitmap) {
|
|
if foundSet == nil {
|
|
foundSet = &b.eBM
|
|
}
|
|
b.addDigit(foundSet, 0)
|
|
b.eBM.Or(foundSet)
|
|
}
|
|
|
|
// IncrementAll - In-place increment of all values in a BSI.
|
|
func (b *BSI) IncrementAll() {
|
|
b.Increment(b.GetExistenceBitmap())
|
|
}
|
|
|
|
// Equals - Check for semantic equality of two BSIs.
|
|
func (b *BSI) Equals(other *BSI) bool {
|
|
if !b.eBM.Equals(&other.eBM) {
|
|
return false
|
|
}
|
|
for i := 0; i < len(b.bA) || i < len(other.bA); i++ {
|
|
if i >= len(b.bA) {
|
|
if !other.bA[i].IsEmpty() {
|
|
return false
|
|
}
|
|
} else if i >= len(other.bA) {
|
|
if !b.bA[i].IsEmpty() {
|
|
return false
|
|
}
|
|
} else {
|
|
if !b.bA[i].Equals(&other.bA[i]) {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// GetSizeInBytes - the size in bytes of the data structure
|
|
func (b *BSI) GetSizeInBytes() int {
|
|
size := b.eBM.GetSizeInBytes()
|
|
for _, bm := range b.bA {
|
|
size += bm.GetSizeInBytes()
|
|
}
|
|
return int(size)
|
|
}
|