Integrate BACKBEAT SDK and resolve KACHING license validation

Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-09-06 07:56:26 +10:00
parent 543ab216f9
commit 9bdcbe0447
4730 changed files with 1480093 additions and 1916 deletions

View File

@@ -0,0 +1,2 @@
github_checks:
annotations: false

5
vendor/github.com/libp2p/go-libp2p-pubsub/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,5 @@
cover.out
prof.out
go-floodsub.test
.idea/

8
vendor/github.com/libp2p/go-libp2p-pubsub/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,8 @@
This project is transitioning from an MIT-only license to a dual MIT/Apache-2.0 license.
Unless otherwise noted, all code contributed prior to 2019-05-06 and not contributed by
a user listed in [this signoff issue](https://github.com/ipfs/go-ipfs/issues/6302) is
licensed under MIT-only. All new contributions (and past contributions since 2019-05-06)
are licensed under a dual MIT/Apache-2.0 license.
MIT: https://www.opensource.org/licenses/mit
Apache-2.0: https://www.apache.org/licenses/license-2.0

View File

@@ -0,0 +1,5 @@
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

19
vendor/github.com/libp2p/go-libp2p-pubsub/LICENSE-MIT generated vendored Normal file
View File

@@ -0,0 +1,19 @@
The MIT License (MIT)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

155
vendor/github.com/libp2p/go-libp2p-pubsub/README.md generated vendored Normal file
View File

@@ -0,0 +1,155 @@
# go-libp2p-pubsub
<p align="left">
<a href="http://protocol.ai"><img src="https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square" /></a>
<a href="http://libp2p.io/"><img src="https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square" /></a>
<a href="http://webchat.freenode.net/?channels=%23libp2p"><img src="https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square" /></a>
<a href="https://discuss.libp2p.io"><img src="https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square"/></a>
</p>
<p align="left">
<a href="https://codecov.io/gh/libp2p/go-libp2p-pubsub"><img src="https://codecov.io/gh/libp2p/go-libp2p-pubsub/branch/master/graph/badge.svg"></a>
<a href="https://goreportcard.com/report/github.com/libp2p/go-libp2p-pubsub"><img src="https://goreportcard.com/badge/github.com/libp2p/go-libp2p-pubsub" /></a>
<a href="https://github.com/RichardLitt/standard-readme"><img src="https://img.shields.io/badge/readme%20style-standard-brightgreen.svg?style=flat-square" /></a>
<a href="https://godoc.org/github.com/libp2p/go-libp2p-pubsub"><img src="http://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/golang-%3E%3D1.14.0-orange.svg?style=flat-square" /></a>
<br>
</p>
This repo contains the canonical pubsub implementation for libp2p. We currently provide three message router options:
- Floodsub, which is the baseline flooding protocol.
- Randomsub, which is a simple probabilistic router that propagates to random subsets of peers.
- Gossipsub, which is a more advanced router with mesh formation and gossip propagation. See [spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) and [implementation](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go) for more details.
## Repo Lead Maintainer
[@vyzo](https://github.com/vyzo/)
> This repo follows the [Repo Lead Maintainer Protocol](https://github.com/ipfs/team-mgmt/blob/master/LEAD_MAINTAINER_PROTOCOL.md)
## Table of Contents
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Install](#install)
- [Usage](#usage)
- [Example](#example)
- [Documentation](#documentation)
- [Tracing](#tracing)
- [Contribute](#contribute)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
## Install
```
go get github.com/libp2p/go-libp2p-pubsub
```
## Usage
To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.
### Example
https://github.com/libp2p/go-libp2p/tree/master/examples/pubsub
## Documentation
See the [libp2p specs](https://github.com/libp2p/specs/tree/master/pubsub) for high level documentation and [godoc](https://godoc.org/github.com/libp2p/go-libp2p-pubsub) for API documentation.
### In this repo, you will find
```
.
├── LICENSE
├── README.md
# Regular Golang repo set up
├── codecov.yml
├── pb
├── go.mod
├── go.sum
├── doc.go
# PubSub base
├── pubsub.go
├── blacklist.go
├── notify.go
├── comm.go
├── discovery.go
├── sign.go
├── subscription.go
├── topic.go
├── trace.go
├── tracer.go
├── validation.go
# Floodsub router
├── floodsub.go
# Randomsub router
├── randomsub.go
# Gossipsub router
├── gossipsub.go
├── score.go
├── score_params.go
└── mcache.go
```
### Tracing
The pubsub system supports _tracing_, which collects all events pertaining to the internals of the system. This allows you to recreate the complete message flow and state of the system for analysis purposes.
To enable tracing, instantiate the pubsub system using the `WithEventTracer` option; the option accepts a tracer with three available implementations in-package (trace to json, pb, or a remote peer).
If you want to trace using a remote peer, you can do so using the `traced` daemon from [go-libp2p-pubsub-tracer](https://github.com/libp2p/go-libp2p-pubsub-tracer). The package also includes a utility program, `tracestat`, for analyzing the traces collected by the daemon.
For instance, to capture the trace as a json file, you can use the following option:
```go
tracer, err := pubsub.NewJSONTracer("/path/to/trace.json")
if err != nil {
panic(err)
}
pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))
```
To capture the trace as a protobuf, you can use the following option:
```go
tracer, err := pubsub.NewPBTracer("/path/to/trace.pb")
if err != nil {
panic(err)
}
pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))
```
Finally, to use the remote tracer, you can use the following incantations:
```go
// assuming that your tracer runs in x.x.x.x and has a peer ID of QmTracer
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/x.x.x.x/tcp/4001/p2p/QmTracer"))
if err != nil {
panic(err)
}
tracer, err := pubsub.NewRemoteTracer(ctx, host, pi)
if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))
```
## Contribute
Contributions welcome. Please check out [the issues](https://github.com/libp2p/go-libp2p-pubsub/issues).
Check out our [contributing document](https://github.com/libp2p/community/blob/master/contributing.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to multiformats are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
The go-libp2p-pubsub project is dual-licensed under Apache 2.0 and MIT terms:
- Apache License, Version 2.0, ([LICENSE-APACHE](./LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](./LICENSE-MIT) or http://opensource.org/licenses/MIT)

107
vendor/github.com/libp2p/go-libp2p-pubsub/backoff.go generated vendored Normal file
View File

@@ -0,0 +1,107 @@
package pubsub
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
MaxBackoffJitterCoff = 100
MaxBackoffAttempts = 4
)
type backoffHistory struct {
duration time.Duration
lastTried time.Time
attempts int
}
type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
maxAttempts int // maximum backoff attempts prior to ejection
}
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
maxAttempts: maxAttempts,
info: make(map[peer.ID]*backoffHistory),
}
rand.Seed(time.Now().UnixNano()) // used for jitter
go b.cleanupLoop(ctx)
return b
}
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()
h, ok := b.info[id]
switch {
case !ok || time.Since(h.lastTried) > TimeToLive:
// first request goes immediately.
h = &backoffHistory{
duration: time.Duration(0),
attempts: 0,
}
case h.attempts >= b.maxAttempts:
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
case h.duration < MinBackoffDelay:
h.duration = MinBackoffDelay
case h.duration < MaxBackoffDelay:
jitter := rand.Intn(MaxBackoffJitterCoff)
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond
if h.duration > MaxBackoffDelay || h.duration < 0 {
h.duration = MaxBackoffDelay
}
}
h.attempts += 1
h.lastTried = time.Now()
b.info[id] = h
return h.duration, nil
}
func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()
for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}
func (b *backoff) cleanupLoop(ctx context.Context) {
ticker := time.NewTicker(b.ci)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-ticker.C:
b.cleanup()
}
}
}

58
vendor/github.com/libp2p/go-libp2p-pubsub/blacklist.go generated vendored Normal file
View File

@@ -0,0 +1,58 @@
package pubsub
import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p-pubsub/timecache"
)
// Blacklist is an interface for peer blacklisting.
type Blacklist interface {
Add(peer.ID) bool
Contains(peer.ID) bool
}
// MapBlacklist is a blacklist implementation using a perfect map
type MapBlacklist map[peer.ID]struct{}
// NewMapBlacklist creates a new MapBlacklist
func NewMapBlacklist() Blacklist {
return MapBlacklist(make(map[peer.ID]struct{}))
}
func (b MapBlacklist) Add(p peer.ID) bool {
b[p] = struct{}{}
return true
}
func (b MapBlacklist) Contains(p peer.ID) bool {
_, ok := b[p]
return ok
}
// TimeCachedBlacklist is a blacklist implementation using a time cache
type TimeCachedBlacklist struct {
tc timecache.TimeCache
}
// NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) {
b := &TimeCachedBlacklist{tc: timecache.NewTimeCache(expiry)}
return b, nil
}
// Add returns a bool saying whether Add of peer was successful
func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
s := p.String()
if b.tc.Has(s) {
return false
}
b.tc.Add(s)
return true
}
func (b *TimeCachedBlacklist) Contains(p peer.ID) bool {
return b.tc.Has(p.String())
}

View File

@@ -0,0 +1,3 @@
coverage:
range: "50...100"
comment: off

231
vendor/github.com/libp2p/go-libp2p-pubsub/comm.go generated vendored Normal file
View File

@@ -0,0 +1,231 @@
package pubsub
import (
"context"
"encoding/binary"
"io"
"time"
"github.com/gogo/protobuf/proto"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-varint"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// get the initial RPC containing all of our subscriptions to send to new peers
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
subscriptions := make(map[string]bool)
for t := range p.mySubs {
subscriptions[t] = true
}
for t := range p.myRelays {
subscriptions[t] = true
}
for t := range subscriptions {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
Subscribe: proto.Bool(true),
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
}
return &rpc
}
func (p *PubSub) handleNewStream(s network.Stream) {
peer := s.Conn().RemotePeer()
p.inboundStreamsMx.Lock()
other, dup := p.inboundStreams[peer]
if dup {
log.Debugf("duplicate inbound stream from %s; resetting other stream", peer)
other.Reset()
}
p.inboundStreams[peer] = s
p.inboundStreamsMx.Unlock()
defer func() {
p.inboundStreamsMx.Lock()
if p.inboundStreams[peer] == s {
delete(p.inboundStreams, peer)
}
p.inboundStreamsMx.Unlock()
}()
r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
for {
msgbytes, err := r.ReadMsg()
if err != nil {
r.ReleaseMsg(msgbytes)
if err != io.EOF {
s.Reset()
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
} else {
// Just be nice. They probably won't read this
// but it doesn't hurt to send it.
s.Close()
}
return
}
rpc := new(RPC)
err = rpc.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
s.Reset()
log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
return
}
rpc.from = peer
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
// Close is useless because the other side isn't reading.
s.Reset()
return
}
}
}
func (p *PubSub) notifyPeerDead(pid peer.ID) {
p.peerDeadPrioLk.RLock()
p.peerDeadMx.Lock()
p.peerDeadPend[pid] = struct{}{}
p.peerDeadMx.Unlock()
p.peerDeadPrioLk.RUnlock()
select {
case p.peerDead <- struct{}{}:
default:
}
}
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)
select {
case p.newPeerError <- pid:
case <-ctx.Done():
}
return
}
go p.handleSendingMessages(ctx, s, outgoing)
go p.handlePeerDead(s)
select {
case p.newPeerStream <- s:
case <-ctx.Done():
}
}
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, backoff time.Duration, outgoing <-chan *RPC) {
select {
case <-time.After(backoff):
p.handleNewPeer(ctx, pid, outgoing)
case <-ctx.Done():
return
}
}
func (p *PubSub) handlePeerDead(s network.Stream) {
pid := s.Conn().RemotePeer()
_, err := s.Read([]byte{0})
if err == nil {
log.Debugf("unexpected message from %s", pid)
}
s.Reset()
p.notifyPeerDead(pid)
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
writeRpc := func(rpc *RPC) error {
size := uint64(rpc.Size())
buf := pool.Get(varint.UvarintSize(size) + int(size))
defer pool.Put(buf)
n := binary.PutUvarint(buf, size)
_, err := rpc.MarshalTo(buf[n:])
if err != nil {
return err
}
_, err = s.Write(buf)
return err
}
defer s.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
err := writeRpc(rpc)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
case <-ctx.Done():
return
}
}
}
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
return &RPC{
RPC: pb.RPC{
Subscriptions: subs,
},
}
}
func rpcWithMessages(msgs ...*pb.Message) *RPC {
return &RPC{RPC: pb.RPC{Publish: msgs}}
}
func rpcWithControl(msgs []*pb.Message,
ihave []*pb.ControlIHave,
iwant []*pb.ControlIWant,
graft []*pb.ControlGraft,
prune []*pb.ControlPrune) *RPC {
return &RPC{
RPC: pb.RPC{
Publish: msgs,
Control: &pb.ControlMessage{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
},
},
}
}
func copyRPC(rpc *RPC) *RPC {
res := new(RPC)
*res = *rpc
if rpc.Control != nil {
res.Control = new(pb.ControlMessage)
*res.Control = *rpc.Control
}
return res
}

348
vendor/github.com/libp2p/go-libp2p-pubsub/discovery.go generated vendored Normal file
View File

@@ -0,0 +1,348 @@
package pubsub
import (
"context"
"math/rand"
"time"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
discimpl "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)
var (
// poll interval
// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
DiscoveryPollInitialDelay = 0 * time.Millisecond
// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
// more peers are needed for any topic
DiscoveryPollInterval = 1 * time.Second
)
// interval at which to retry advertisements when they fail.
const discoveryAdvertiseRetryInterval = 2 * time.Minute
type DiscoverOpt func(*discoverOptions) error
type discoverOptions struct {
connFactory BackoffConnectorFactory
opts []discovery.Option
}
func defaultDiscoverOptions() *discoverOptions {
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*10, time.Hour
cacheSize := 100
dialTimeout := time.Minute * 2
discoverOpts := &discoverOptions{
connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) {
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff)
},
}
return discoverOpts
}
// discover represents the discovery pipeline.
// The discovery pipeline handles advertising and discovery of peers
type discover struct {
p *PubSub
// discovery assists in discovering and advertising peers for a topic
discovery discovery.Discovery
// advertising tracks which topics are being advertised
advertising map[string]context.CancelFunc
// discoverQ handles continuing peer discovery
discoverQ chan *discoverReq
// ongoing tracks ongoing discovery requests
ongoing map[string]struct{}
// done handles completion of a discovery request
done chan string
// connector handles connecting to new peers found via discovery
connector *discimpl.BackoffConnector
// options are the set of options to be used to complete struct construction in Start
options *discoverOptions
}
// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion. Note
// that the topic size does not include the router in the count.
func MinTopicSize(size int) RouterReady {
return func(rt PubSubRouter, topic string) (bool, error) {
return rt.EnoughPeers(topic, size), nil
}
}
// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop
func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error {
if d.discovery == nil || p == nil {
return nil
}
d.p = p
d.advertising = make(map[string]context.CancelFunc)
d.discoverQ = make(chan *discoverReq, 32)
d.ongoing = make(map[string]struct{})
d.done = make(chan string)
conn, err := d.options.connFactory(p.host)
if err != nil {
return err
}
d.connector = conn
go d.discoverLoop()
go d.pollTimer()
return nil
}
func (d *discover) pollTimer() {
select {
case <-time.After(DiscoveryPollInitialDelay):
case <-d.p.ctx.Done():
return
}
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
ticker := time.NewTicker(DiscoveryPollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
case <-d.p.ctx.Done():
return
}
}
}
func (d *discover) requestDiscovery() {
for t := range d.p.myTopics {
if !d.p.rt.EnoughPeers(t, 0) {
d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)}
}
}
}
func (d *discover) discoverLoop() {
for {
select {
case discover := <-d.discoverQ:
topic := discover.topic
if _, ok := d.ongoing[topic]; ok {
discover.done <- struct{}{}
continue
}
d.ongoing[topic] = struct{}{}
go func() {
d.handleDiscovery(d.p.ctx, topic, discover.opts)
select {
case d.done <- topic:
case <-d.p.ctx.Done():
}
discover.done <- struct{}{}
}()
case topic := <-d.done:
delete(d.ongoing, topic)
case <-d.p.ctx.Done():
return
}
}
}
// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe.
func (d *discover) Advertise(topic string) {
if d.discovery == nil {
return
}
advertisingCtx, cancel := context.WithCancel(d.p.ctx)
if _, ok := d.advertising[topic]; ok {
cancel()
return
}
d.advertising[topic] = cancel
go func() {
next, err := d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warnf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
if next == 0 {
next = discoveryAdvertiseRetryInterval
}
}
t := time.NewTimer(next)
defer t.Stop()
for advertisingCtx.Err() == nil {
select {
case <-t.C:
next, err = d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warnf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
if next == 0 {
next = discoveryAdvertiseRetryInterval
}
}
t.Reset(next)
case <-advertisingCtx.Done():
return
}
}
}()
}
// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe.
func (d *discover) StopAdvertise(topic string) {
if d.discovery == nil {
return
}
if advertiseCancel, ok := d.advertising[topic]; ok {
advertiseCancel()
delete(d.advertising, topic)
}
}
// Discover searches for additional peers interested in a given topic
func (d *discover) Discover(topic string, opts ...discovery.Option) {
if d.discovery == nil {
return
}
d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)}
}
// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool {
if d.discovery == nil {
return true
}
t := time.NewTimer(time.Hour)
if !t.Stop() {
<-t.C
}
defer t.Stop()
for {
// Check if ready for publishing
bootstrapped := make(chan bool, 1)
select {
case d.p.eval <- func() {
done, _ := ready(d.p.rt, topic)
bootstrapped <- done
}:
if <-bootstrapped {
return true
}
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
// If not ready discover more peers
disc := &discoverReq{topic, opts, make(chan struct{}, 1)}
select {
case d.discoverQ <- disc:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
select {
case <-disc.done:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
t.Reset(time.Millisecond * 100)
select {
case <-t.C:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
}
}
func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...)
if err != nil {
log.Debugf("error finding peers for topic %s: %v", topic, err)
return
}
d.connector.Connect(ctx, peerCh)
}
type discoverReq struct {
topic string
opts []discovery.Option
done chan struct{}
}
type pubSubDiscovery struct {
discovery.Discovery
opts []discovery.Option
}
func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}
func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}
// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt {
return func(d *discoverOptions) error {
d.opts = opts
return nil
}
}
// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt {
return func(d *discoverOptions) error {
d.connFactory = connFactory
return nil
}
}

27
vendor/github.com/libp2p/go-libp2p-pubsub/doc.go generated vendored Normal file
View File

@@ -0,0 +1,27 @@
// The pubsub package provides facilities for the Publish/Subscribe pattern of message
// propagation, also known as overlay multicast.
// The implementation provides topic-based pubsub, with pluggable routing algorithms.
//
// The main interface to the library is the PubSub object.
// You can construct this object with the following constructors:
//
// - NewFloodSub creates an instance that uses the floodsub routing algorithm.
//
// - NewGossipSub creates an instance that uses the gossipsub routing algorithm.
//
// - NewRandomSub creates an instance that uses the randomsub routing algorithm.
//
// In addition, there is a generic constructor that creates a pubsub instance with
// a custom PubSubRouter interface. This procedure is currently reserved for internal
// use within the package.
//
// Once you have constructed a PubSub instance, you need to establish some connections
// to your peers; the implementation relies on ambient peer discovery, leaving bootstrap
// and active peer discovery up to the client.
//
// To publish a message to some topic, use Publish; you don't need to be subscribed
// to the topic in order to publish.
//
// To subscribe to a topic, use Subscribe; this will give you a subscription interface
// from which new messages can be pumped.
package pubsub

108
vendor/github.com/libp2p/go-libp2p-pubsub/floodsub.go generated vendored Normal file
View File

@@ -0,0 +1,108 @@
package pubsub
import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubTopicSearchSize = 5
)
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &FloodSubRouter{
protocols: ps,
}
return NewPubSub(ctx, h, rt, opts...)
}
// NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
}
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}
func (fs *FloodSubRouter) Protocols() []protocol.ID {
return fs.protocols
}
func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := fs.p.topics[topic]
if !ok {
return false
}
if suggested == 0 {
suggested = FloodSubTopicSearchSize
}
if len(tmap) >= suggested {
return true
}
return false
}
func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom
topic := msg.GetTopic()
out := rpcWithMessages(msg.Message)
for pid := range fs.p.topics[topic] {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
mch, ok := fs.p.peers[pid]
if !ok {
continue
}
select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
}
}
}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Leave(topic)
}

View File

@@ -0,0 +1,200 @@
package pubsub
import (
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// peers who don't follow up on IWANT requests after an IHAVE advertisement.
// The tracking of promises is probabilistic to avoid using too much memory.
type gossipTracer struct {
sync.Mutex
idGen *msgIDGenerator
followUpTime time.Duration
// promises for messages by message ID; for each message tracked, we track the promise
// expiration time for each peer.
promises map[string]map[peer.ID]time.Time
// promises for each peer; for each peer, we track the promised message IDs.
// this index allows us to quickly void promises when a peer is throttled.
peerPromises map[peer.ID]map[string]struct{}
}
func newGossipTracer() *gossipTracer {
return &gossipTracer{
idGen: newMsgIdGenerator(),
promises: make(map[string]map[peer.ID]time.Time),
peerPromises: make(map[peer.ID]map[string]struct{}),
}
}
func (gt *gossipTracer) Start(gs *GossipSubRouter) {
if gt == nil {
return
}
gt.idGen = gs.p.idGen
gt.followUpTime = gs.params.IWantFollowupTime
}
// track a promise to deliver a message from a list of msgIDs we are requesting
func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
if gt == nil {
return
}
idx := rand.Intn(len(msgIDs))
mid := msgIDs[idx]
gt.Lock()
defer gt.Unlock()
promises, ok := gt.promises[mid]
if !ok {
promises = make(map[peer.ID]time.Time)
gt.promises[mid] = promises
}
_, ok = promises[p]
if !ok {
promises[p] = time.Now().Add(gt.followUpTime)
peerPromises, ok := gt.peerPromises[p]
if !ok {
peerPromises = make(map[string]struct{})
gt.peerPromises[p] = peerPromises
}
peerPromises[mid] = struct{}{}
}
}
// returns the number of broken promises for each peer who didn't follow up
// on an IWANT request.
func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
var res map[peer.ID]int
now := time.Now()
// find broken promises from peers
for mid, promises := range gt.promises {
for p, expire := range promises {
if expire.Before(now) {
if res == nil {
res = make(map[peer.ID]int)
}
res[p]++
delete(promises, p)
peerPromises := gt.peerPromises[p]
delete(peerPromises, mid)
if len(peerPromises) == 0 {
delete(gt.peerPromises, p)
}
}
}
if len(promises) == 0 {
delete(gt.promises, mid)
}
}
return res
}
var _ RawTracer = (*gossipTracer)(nil)
func (gt *gossipTracer) fulfillPromise(msg *Message) {
mid := gt.idGen.ID(msg)
gt.Lock()
defer gt.Unlock()
promises, ok := gt.promises[mid]
if !ok {
return
}
delete(gt.promises, mid)
// delete the promise for all peers that promised it, as they have no way to fulfill it.
for p := range promises {
peerPromises, ok := gt.peerPromises[p]
if ok {
delete(peerPromises, mid)
if len(peerPromises) == 0 {
delete(gt.peerPromises, p)
}
}
}
}
func (gt *gossipTracer) DeliverMessage(msg *Message) {
// someone delivered a message, fulfill promises for it
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
// A message got rejected, so we can fulfill promises and let the score penalty apply
// from invalid message delivery.
// We do take exception and apply promise penalty regardless in the following cases, where
// the peer delivered an obviously invalid message.
switch reason {
case RejectMissingSignature:
return
case RejectInvalidSignature:
return
}
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) ValidateMessage(msg *Message) {
// we consider the promise fulfilled as soon as the message begins validation
// if it was a case of signature issue it would have been rejected immediately
// without triggering the Validate trace
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) UndeliverableMessage(msg *Message) {}
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
gt.Lock()
defer gt.Unlock()
peerPromises, ok := gt.peerPromises[p]
if !ok {
return
}
for mid := range peerPromises {
promises := gt.promises[mid]
delete(promises, p)
if len(promises) == 0 {
delete(gt.promises, mid)
}
}
delete(gt.peerPromises, p)
}

1963
vendor/github.com/libp2p/go-libp2p-pubsub/gossipsub.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,52 @@
package pubsub
import (
"fmt"
"github.com/libp2p/go-libp2p/core/protocol"
)
// GossipSubFeatureTest is a feature test function; it takes a feature and a protocol ID and
// should return true if the feature is supported by the protocol
type GossipSubFeatureTest = func(GossipSubFeature, protocol.ID) bool
// GossipSubFeature is a feature discriminant enum
type GossipSubFeature int
const (
// Protocol supports basic GossipSub Mesh -- gossipsub-v1.0 compatible
GossipSubFeatureMesh = iota
// Protocol supports Peer eXchange on prune -- gossipsub-v1.1 compatible
GossipSubFeaturePX
)
// GossipSubDefaultProtocols is the default gossipsub router protocol list
var GossipSubDefaultProtocols = []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
// GossipSubDefaultFeatures is the feature test function for the default gossipsub protocols
func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool {
switch feat {
case GossipSubFeatureMesh:
return proto == GossipSubID_v11 || proto == GossipSubID_v10
case GossipSubFeaturePX:
return proto == GossipSubID_v11
default:
return false
}
}
// WithGossipSubProtocols is a gossipsub router option that configures a custom protocol list
// and feature test function
func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.protos = protos
gs.feature = feature
return nil
}
}

View File

@@ -0,0 +1,11 @@
{
"repoLeadMaintainer": {
"name": "Dimitris Vyzovitis",
"email": "vyzo@protocol.ai",
"username": "@vyzo"
},
"workingGroup": {
"name": "libp2p",
"entryPoint": "https://github.com/libp2p/libp2p"
}
}

104
vendor/github.com/libp2p/go-libp2p-pubsub/mcache.go generated vendored Normal file
View File

@@ -0,0 +1,104 @@
package pubsub
import (
"fmt"
"github.com/libp2p/go-libp2p/core/peer"
)
// NewMessageCache creates a sliding window cache that remembers messages for as
// long as `history` slots.
//
// When queried for messages to advertise, the cache only returns messages in
// the last `gossip` slots.
//
// The `gossip` parameter must be smaller or equal to `history`, or this
// function will panic.
//
// The slack between `gossip` and `history` accounts for the reaction time
// between when a message is advertised via IHAVE gossip, and the peer pulls it
// via an IWANT command.
func NewMessageCache(gossip, history int) *MessageCache {
if gossip > history {
err := fmt.Errorf("invalid parameters for message cache; gossip slots (%d) cannot be larger than history slots (%d)",
gossip, history)
panic(err)
}
return &MessageCache{
msgs: make(map[string]*Message),
peertx: make(map[string]map[peer.ID]int),
history: make([][]CacheEntry, history),
gossip: gossip,
msgID: func(msg *Message) string {
return DefaultMsgIdFn(msg.Message)
},
}
}
type MessageCache struct {
msgs map[string]*Message
peertx map[string]map[peer.ID]int
history [][]CacheEntry
gossip int
msgID func(*Message) string
}
func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string) {
mc.msgID = msgID
}
type CacheEntry struct {
mid string
topic string
}
func (mc *MessageCache) Put(msg *Message) {
mid := mc.msgID(msg)
mc.msgs[mid] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()})
}
func (mc *MessageCache) Get(mid string) (*Message, bool) {
m, ok := mc.msgs[mid]
return m, ok
}
func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool) {
m, ok := mc.msgs[mid]
if !ok {
return nil, 0, false
}
tx, ok := mc.peertx[mid]
if !ok {
tx = make(map[peer.ID]int)
mc.peertx[mid] = tx
}
tx[p]++
return m, tx[p], true
}
func (mc *MessageCache) GetGossipIDs(topic string) []string {
var mids []string
for _, entries := range mc.history[:mc.gossip] {
for _, entry := range entries {
if entry.topic == topic {
mids = append(mids, entry.mid)
}
}
}
return mids
}
func (mc *MessageCache) Shift() {
last := mc.history[len(mc.history)-1]
for _, entry := range last {
delete(mc.msgs, entry.mid)
delete(mc.peertx, entry.mid)
}
for i := len(mc.history) - 2; i >= 0; i-- {
mc.history[i+1] = mc.history[i]
}
mc.history[0] = nil
}

52
vendor/github.com/libp2p/go-libp2p-pubsub/midgen.go generated vendored Normal file
View File

@@ -0,0 +1,52 @@
package pubsub
import (
"sync"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// msgIDGenerator handles computing IDs for msgs
// It allows setting custom generators(MsgIdFunction) per topic
type msgIDGenerator struct {
Default MsgIdFunction
topicGensLk sync.RWMutex
topicGens map[string]MsgIdFunction
}
func newMsgIdGenerator() *msgIDGenerator {
return &msgIDGenerator{
Default: DefaultMsgIdFn,
topicGens: make(map[string]MsgIdFunction),
}
}
// Set sets custom id generator(MsgIdFunction) for topic.
func (m *msgIDGenerator) Set(topic string, gen MsgIdFunction) {
m.topicGensLk.Lock()
m.topicGens[topic] = gen
m.topicGensLk.Unlock()
}
// ID computes ID for the msg or short-circuits with the cached value.
func (m *msgIDGenerator) ID(msg *Message) string {
if msg.ID != "" {
return msg.ID
}
msg.ID = m.RawID(msg.Message)
return msg.ID
}
// RawID computes ID for the proto 'msg'.
func (m *msgIDGenerator) RawID(msg *pb.Message) string {
m.topicGensLk.RLock()
gen, ok := m.topicGens[msg.GetTopic()]
m.topicGensLk.RUnlock()
if !ok {
gen = m.Default
}
return gen(msg)
}

75
vendor/github.com/libp2p/go-libp2p-pubsub/notify.go generated vendored Normal file
View File

@@ -0,0 +1,75 @@
package pubsub
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)
var _ network.Notifiee = (*PubSubNotif)(nil)
type PubSubNotif PubSub
func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
// ignore transient connections
if c.Stat().Transient {
return
}
go func() {
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
p.newPeersPend[c.RemotePeer()] = struct{}{}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()
select {
case p.newPeers <- struct{}{}:
default:
}
}()
}
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) {
}
func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
}
return true
}
p.newPeersPrioLk.RLock()
p.newPeersMx.Lock()
for _, pid := range p.host.Network().Peers() {
if isTransient(pid) {
continue
}
p.newPeersPend[pid] = struct{}{}
}
p.newPeersMx.Unlock()
p.newPeersPrioLk.RUnlock()
select {
case p.newPeers <- struct{}{}:
default:
}
}

11
vendor/github.com/libp2p/go-libp2p-pubsub/pb/Makefile generated vendored Normal file
View File

@@ -0,0 +1,11 @@
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go

2649
vendor/github.com/libp2p/go-libp2p-pubsub/pb/rpc.pb.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

57
vendor/github.com/libp2p/go-libp2p-pubsub/pb/rpc.proto generated vendored Normal file
View File

@@ -0,0 +1,57 @@
syntax = "proto2";
package pubsub.pb;
message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;
message SubOpts {
optional bool subscribe = 1; // subscribe or unsubcribe
optional string topicid = 2;
}
optional ControlMessage control = 3;
}
message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
optional string topic = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}
message ControlIHave {
optional string topicID = 1;
// implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings
repeated string messageIDs = 2;
}
message ControlIWant {
// implementors from other languages should use bytes here - go protobuf emits invalid utf8 strings
repeated string messageIDs = 1;
}
message ControlGraft {
optional string topicID = 1;
}
message ControlPrune {
optional string topicID = 1;
repeated PeerInfo peers = 2;
optional uint64 backoff = 3;
}
message PeerInfo {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}

6624
vendor/github.com/libp2p/go-libp2p-pubsub/pb/trace.pb.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,150 @@
syntax = "proto2";
package pubsub.pb;
message TraceEvent {
optional Type type = 1;
optional bytes peerID = 2;
optional int64 timestamp = 3;
optional PublishMessage publishMessage = 4;
optional RejectMessage rejectMessage = 5;
optional DuplicateMessage duplicateMessage = 6;
optional DeliverMessage deliverMessage = 7;
optional AddPeer addPeer = 8;
optional RemovePeer removePeer = 9;
optional RecvRPC recvRPC = 10;
optional SendRPC sendRPC = 11;
optional DropRPC dropRPC = 12;
optional Join join = 13;
optional Leave leave = 14;
optional Graft graft = 15;
optional Prune prune = 16;
enum Type {
PUBLISH_MESSAGE = 0;
REJECT_MESSAGE = 1;
DUPLICATE_MESSAGE = 2;
DELIVER_MESSAGE = 3;
ADD_PEER = 4;
REMOVE_PEER = 5;
RECV_RPC = 6;
SEND_RPC = 7;
DROP_RPC = 8;
JOIN = 9;
LEAVE = 10;
GRAFT = 11;
PRUNE = 12;
}
message PublishMessage {
optional bytes messageID = 1;
optional string topic = 2;
}
message RejectMessage {
optional bytes messageID = 1;
optional bytes receivedFrom = 2;
optional string reason = 3;
optional string topic = 4;
}
message DuplicateMessage {
optional bytes messageID = 1;
optional bytes receivedFrom = 2;
optional string topic = 3;
}
message DeliverMessage {
optional bytes messageID = 1;
optional string topic = 2;
optional bytes receivedFrom = 3;
}
message AddPeer {
optional bytes peerID = 1;
optional string proto = 2;
}
message RemovePeer {
optional bytes peerID = 1;
}
message RecvRPC {
optional bytes receivedFrom = 1;
optional RPCMeta meta = 2;
}
message SendRPC {
optional bytes sendTo = 1;
optional RPCMeta meta = 2;
}
message DropRPC {
optional bytes sendTo = 1;
optional RPCMeta meta = 2;
}
message Join {
optional string topic = 1;
}
message Leave {
optional string topic = 2;
}
message Graft {
optional bytes peerID = 1;
optional string topic = 2;
}
message Prune {
optional bytes peerID = 1;
optional string topic = 2;
}
message RPCMeta {
repeated MessageMeta messages = 1;
repeated SubMeta subscription = 2;
optional ControlMeta control = 3;
}
message MessageMeta {
optional bytes messageID = 1;
optional string topic = 2;
}
message SubMeta {
optional bool subscribe = 1;
optional string topic = 2;
}
message ControlMeta {
repeated ControlIHaveMeta ihave = 1;
repeated ControlIWantMeta iwant = 2;
repeated ControlGraftMeta graft = 3;
repeated ControlPruneMeta prune = 4;
}
message ControlIHaveMeta {
optional string topic = 1;
repeated bytes messageIDs = 2;
}
message ControlIWantMeta {
repeated bytes messageIDs = 1;
}
message ControlGraftMeta {
optional string topic = 1;
}
message ControlPruneMeta {
optional string topic = 1;
repeated bytes peers = 2;
}
}
message TraceEventBatch {
repeated TraceEvent batch = 1;
}

453
vendor/github.com/libp2p/go-libp2p-pubsub/peer_gater.go generated vendored Normal file
View File

@@ -0,0 +1,453 @@
package pubsub
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
manet "github.com/multiformats/go-multiaddr/net"
)
var (
DefaultPeerGaterRetainStats = 6 * time.Hour
DefaultPeerGaterQuiet = time.Minute
DefaultPeerGaterDuplicateWeight = 0.125
DefaultPeerGaterIgnoreWeight = 1.0
DefaultPeerGaterRejectWeight = 16.0
DefaultPeerGaterThreshold = 0.33
DefaultPeerGaterGlobalDecay = ScoreParameterDecay(2 * time.Minute)
DefaultPeerGaterSourceDecay = ScoreParameterDecay(time.Hour)
)
// PeerGaterParams groups together parameters that control the operation of the peer gater
type PeerGaterParams struct {
// when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
Threshold float64
// (linear) decay parameter for gater counters
GlobalDecay float64 // global counter decay
SourceDecay float64 // per IP counter decay
// decay interval
DecayInterval time.Duration
// counter zeroing threshold
DecayToZero float64
// how long to retain stats
RetainStats time.Duration
// quiet interval before turning off the gater; if there are no validation throttle events
// for this interval, the gater turns off
Quiet time.Duration
// weight of duplicate message deliveries
DuplicateWeight float64
// weight of ignored messages
IgnoreWeight float64
// weight of rejected messages
RejectWeight float64
// priority topic delivery weights
TopicDeliveryWeights map[string]float64
}
func (p *PeerGaterParams) validate() error {
if p.Threshold <= 0 {
return fmt.Errorf("invalid Threshold; must be > 0")
}
if p.GlobalDecay <= 0 || p.GlobalDecay >= 1 {
return fmt.Errorf("invalid GlobalDecay; must be between 0 and 1")
}
if p.SourceDecay <= 0 || p.SourceDecay >= 1 {
return fmt.Errorf("invalid SourceDecay; must be between 0 and 1")
}
if p.DecayInterval < time.Second {
return fmt.Errorf("invalid DecayInterval; must be at least 1s")
}
if p.DecayToZero <= 0 || p.DecayToZero >= 1 {
return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
}
// no need to check stats retention; a value of 0 means we don't retain stats
if p.Quiet < time.Second {
return fmt.Errorf("invalud Quiet interval; must be at least 1s")
}
if p.DuplicateWeight <= 0 {
return fmt.Errorf("invalid DuplicateWeight; must be > 0")
}
if p.IgnoreWeight < 1 {
return fmt.Errorf("invalid IgnoreWeight; must be >= 1")
}
if p.RejectWeight < 1 {
return fmt.Errorf("invalud RejectWeight; must be >= 1")
}
return nil
}
// WithTopicDeliveryWeights is a fluid setter for the priority topic delivery weights
func (p *PeerGaterParams) WithTopicDeliveryWeights(w map[string]float64) *PeerGaterParams {
p.TopicDeliveryWeights = w
return p
}
// NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay
// parameters and default values for all other parameters.
func NewPeerGaterParams(threshold, globalDecay, sourceDecay float64) *PeerGaterParams {
return &PeerGaterParams{
Threshold: threshold,
GlobalDecay: globalDecay,
SourceDecay: sourceDecay,
DecayToZero: DefaultDecayToZero,
DecayInterval: DefaultDecayInterval,
RetainStats: DefaultPeerGaterRetainStats,
Quiet: DefaultPeerGaterQuiet,
DuplicateWeight: DefaultPeerGaterDuplicateWeight,
IgnoreWeight: DefaultPeerGaterIgnoreWeight,
RejectWeight: DefaultPeerGaterRejectWeight,
}
}
// DefaultPeerGaterParams creates a new PeerGaterParams struct using default values
func DefaultPeerGaterParams() *PeerGaterParams {
return NewPeerGaterParams(DefaultPeerGaterThreshold, DefaultPeerGaterGlobalDecay, DefaultPeerGaterSourceDecay)
}
// the gater object.
type peerGater struct {
sync.Mutex
host host.Host
// gater parameters
params *PeerGaterParams
// counters
validate, throttle float64
// time of last validation throttle
lastThrottle time.Time
// stats per peer.ID -- multiple peer IDs may share the same stats object if they are
// colocated in the same IP
peerStats map[peer.ID]*peerGaterStats
// stats per IP
ipStats map[string]*peerGaterStats
// for unit tests
getIP func(peer.ID) string
}
type peerGaterStats struct {
// number of connected peer IDs mapped to this stat object
connected int
// stats expiration time -- only valid if connected = 0
expire time.Time
// counters
deliver, duplicate, ignore, reject float64
}
// WithPeerGater is a gossipsub router option that enables reactive validation queue
// management.
// The Gater is activated if the ratio of throttled/validated messages exceeds the specified
// threshold.
// Once active, the Gater probabilistically throttles peers _before_ they enter the validation
// queue, performing Random Early Drop.
// The throttle decision is randomized, with the probability of allowing messages to enter the
// validation queue controlled by the statistical observations of the performance of all peers
// in the IP address of the gated peer.
// The Gater deactivates if there is no validation throttlinc occurring for the specified quiet
// interval.
func WithPeerGater(params *PeerGaterParams) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
err := params.validate()
if err != nil {
return err
}
gs.gate = newPeerGater(ps.ctx, ps.host, params)
// hook the tracer
if ps.tracer != nil {
ps.tracer.raw = append(ps.tracer.raw, gs.gate)
} else {
ps.tracer = &pubsubTracer{
raw: []RawTracer{gs.gate},
pid: ps.host.ID(),
idGen: ps.idGen,
}
}
return nil
}
}
func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *peerGater {
pg := &peerGater{
params: params,
peerStats: make(map[peer.ID]*peerGaterStats),
ipStats: make(map[string]*peerGaterStats),
host: host,
}
go pg.background(ctx)
return pg
}
func (pg *peerGater) background(ctx context.Context) {
tick := time.NewTicker(pg.params.DecayInterval)
defer tick.Stop()
for {
select {
case <-tick.C:
pg.decayStats()
case <-ctx.Done():
return
}
}
}
func (pg *peerGater) decayStats() {
pg.Lock()
defer pg.Unlock()
pg.validate *= pg.params.GlobalDecay
if pg.validate < pg.params.DecayToZero {
pg.validate = 0
}
pg.throttle *= pg.params.GlobalDecay
if pg.throttle < pg.params.DecayToZero {
pg.throttle = 0
}
now := time.Now()
for ip, st := range pg.ipStats {
if st.connected > 0 {
st.deliver *= pg.params.SourceDecay
if st.deliver < pg.params.DecayToZero {
st.deliver = 0
}
st.duplicate *= pg.params.SourceDecay
if st.duplicate < pg.params.DecayToZero {
st.duplicate = 0
}
st.ignore *= pg.params.SourceDecay
if st.ignore < pg.params.DecayToZero {
st.ignore = 0
}
st.reject *= pg.params.SourceDecay
if st.reject < pg.params.DecayToZero {
st.reject = 0
}
} else if st.expire.Before(now) {
delete(pg.ipStats, ip)
}
}
}
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
st, ok := pg.peerStats[p]
if !ok {
st = pg.getIPStats(p)
pg.peerStats[p] = st
}
return st
}
func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats {
ip := pg.getPeerIP(p)
st, ok := pg.ipStats[ip]
if !ok {
st = &peerGaterStats{}
pg.ipStats[ip] = st
}
return st
}
func (pg *peerGater) getPeerIP(p peer.ID) string {
if pg.getIP != nil {
return pg.getIP(p)
}
connToIP := func(c network.Conn) string {
remote := c.RemoteMultiaddr()
ip, err := manet.ToIP(remote)
if err != nil {
log.Warnf("error determining IP for remote peer in %s: %s", remote, err)
return "<unknown>"
}
return ip.String()
}
conns := pg.host.Network().ConnsToPeer(p)
switch len(conns) {
case 0:
return "<unknown>"
case 1:
return connToIP(conns[0])
default:
// we have multiple connections -- order by number of streams and use the one with the
// most streams; it's a nightmare to track multiple IPs per peer, so pick the best one.
streams := make(map[string]int)
for _, c := range conns {
if c.Stat().Transient {
// ignore transient
continue
}
streams[c.ID()] = len(c.GetStreams())
}
sort.Slice(conns, func(i, j int) bool {
return streams[conns[i].ID()] > streams[conns[j].ID()]
})
return connToIP(conns[0])
}
}
// router interface
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
if pg == nil {
return AcceptAll
}
pg.Lock()
defer pg.Unlock()
// check the quiet period; if the validation queue has not throttled for more than the Quiet
// interval, we turn off the circuit breaker and accept.
if time.Since(pg.lastThrottle) > pg.params.Quiet {
return AcceptAll
}
// no throttle events -- or they have decayed; accept.
if pg.throttle == 0 {
return AcceptAll
}
// check the throttle/validate ration; if it is below threshold we accept.
if pg.validate != 0 && pg.throttle/pg.validate < pg.params.Threshold {
return AcceptAll
}
st := pg.getPeerStats(p)
// compute the goodput of the peer; the denominator is the weighted mix of message counters
total := st.deliver + pg.params.DuplicateWeight*st.duplicate + pg.params.IgnoreWeight*st.ignore + pg.params.RejectWeight*st.reject
if total == 0 {
return AcceptAll
}
// we make a randomized decision based on the goodput of the peer.
// the probabiity is biased by adding 1 to the delivery counter so that we don't unconditionally
// throttle in the first negative event; it also ensures that a peer always has a chance of being
// accepted; this is not a sinkhole/blacklist.
threshold := (1 + st.deliver) / (1 + total)
if rand.Float64() < threshold {
return AcceptAll
}
log.Debugf("throttling peer %s with threshold %f", p, threshold)
return AcceptControl
}
// -- RawTracer interface methods
var _ RawTracer = (*peerGater)(nil)
// tracer interface
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
pg.Lock()
defer pg.Unlock()
st := pg.getPeerStats(p)
st.connected++
}
func (pg *peerGater) RemovePeer(p peer.ID) {
pg.Lock()
defer pg.Unlock()
st := pg.getPeerStats(p)
st.connected--
st.expire = time.Now().Add(pg.params.RetainStats)
delete(pg.peerStats, p)
}
func (pg *peerGater) Join(topic string) {}
func (pg *peerGater) Leave(topic string) {}
func (pg *peerGater) Graft(p peer.ID, topic string) {}
func (pg *peerGater) Prune(p peer.ID, topic string) {}
func (pg *peerGater) ValidateMessage(msg *Message) {
pg.Lock()
defer pg.Unlock()
pg.validate++
}
func (pg *peerGater) DeliverMessage(msg *Message) {
pg.Lock()
defer pg.Unlock()
st := pg.getPeerStats(msg.ReceivedFrom)
topic := msg.GetTopic()
weight := pg.params.TopicDeliveryWeights[topic]
if weight == 0 {
weight = 1
}
st.deliver += weight
}
func (pg *peerGater) RejectMessage(msg *Message, reason string) {
pg.Lock()
defer pg.Unlock()
switch reason {
case RejectValidationQueueFull:
fallthrough
case RejectValidationThrottled:
pg.lastThrottle = time.Now()
pg.throttle++
case RejectValidationIgnored:
st := pg.getPeerStats(msg.ReceivedFrom)
st.ignore++
default:
st := pg.getPeerStats(msg.ReceivedFrom)
st.reject++
}
}
func (pg *peerGater) DuplicateMessage(msg *Message) {
pg.Lock()
defer pg.Unlock()
st := pg.getPeerStats(msg.ReceivedFrom)
st.duplicate++
}
func (pg *peerGater) ThrottlePeer(p peer.ID) {}
func (pg *peerGater) RecvRPC(rpc *RPC) {}
func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {}
func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}
func (pg *peerGater) UndeliverableMessage(msg *Message) {}

1422
vendor/github.com/libp2p/go-libp2p-pubsub/pubsub.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

168
vendor/github.com/libp2p/go-libp2p-pubsub/randomsub.go generated vendored Normal file
View File

@@ -0,0 +1,168 @@
package pubsub
import (
"context"
"math"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
RandomSubID = protocol.ID("/randomsub/1.0.0")
)
var (
RandomSubD = 6
)
// NewRandomSub returns a new PubSub object using RandomSubRouter as the router.
func NewRandomSub(ctx context.Context, h host.Host, size int, opts ...Option) (*PubSub, error) {
rt := &RandomSubRouter{
size: size,
peers: make(map[peer.ID]protocol.ID),
}
return NewPubSub(ctx, h, rt, opts...)
}
// RandomSubRouter is a router that implements a random propagation strategy.
// For each message, it selects the square root of the network size peers, with a min of RandomSubD,
// and forwards the message to them.
type RandomSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID
size int
tracer *pubsubTracer
}
func (rs *RandomSubRouter) Protocols() []protocol.ID {
return []protocol.ID{RandomSubID, FloodSubID}
}
func (rs *RandomSubRouter) Attach(p *PubSub) {
rs.p = p
rs.tracer = p.tracer
}
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
rs.tracer.AddPeer(p, proto)
rs.peers[p] = proto
}
func (rs *RandomSubRouter) RemovePeer(p peer.ID) {
rs.tracer.RemovePeer(p)
delete(rs.peers, p)
}
func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := rs.p.topics[topic]
if !ok {
return false
}
fsPeers := 0
rsPeers := 0
// count floodsub and randomsub peers
for p := range tmap {
switch rs.peers[p] {
case FloodSubID:
fsPeers++
case RandomSubID:
rsPeers++
}
}
if suggested == 0 {
suggested = RandomSubD
}
if fsPeers+rsPeers >= suggested {
return true
}
if rsPeers >= RandomSubD {
return true
}
return false
}
func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
func (rs *RandomSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom
tosend := make(map[peer.ID]struct{})
rspeers := make(map[peer.ID]struct{})
src := peer.ID(msg.GetFrom())
topic := msg.GetTopic()
tmap, ok := rs.p.topics[topic]
if !ok {
return
}
for p := range tmap {
if p == from || p == src {
continue
}
if rs.peers[p] == FloodSubID {
tosend[p] = struct{}{}
} else {
rspeers[p] = struct{}{}
}
}
if len(rspeers) > RandomSubD {
target := RandomSubD
sqrt := int(math.Ceil(math.Sqrt(float64(rs.size))))
if sqrt > target {
target = sqrt
}
if target > len(rspeers) {
target = len(rspeers)
}
xpeers := peerMapToList(rspeers)
shufflePeers(xpeers)
xpeers = xpeers[:target]
for _, p := range xpeers {
tosend[p] = struct{}{}
}
} else {
for p := range rspeers {
tosend[p] = struct{}{}
}
}
out := rpcWithMessages(msg.Message)
for p := range tosend {
mch, ok := rs.p.peers[p]
if !ok {
continue
}
select {
case mch <- out:
rs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
rs.tracer.DropRPC(out, p)
}
}
}
func (rs *RandomSubRouter) Join(topic string) {
rs.tracer.Join(topic)
}
func (rs *RandomSubRouter) Leave(topic string) {
rs.tracer.Join(topic)
}

1081
vendor/github.com/libp2p/go-libp2p-pubsub/score.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,423 @@
package pubsub
import (
"fmt"
"math"
"net"
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
type PeerScoreThresholds struct {
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool
// GossipThreshold is the score threshold below which gossip propagation is suppressed;
// should be negative.
GossipThreshold float64
// PublishThreshold is the score threshold below which we shouldn't publish when using flood
// publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold.
PublishThreshold float64
// GraylistThreshold is the score threshold below which message processing is suppressed altogether,
// implementing an effective gray list according to peer score; should be negative and <= PublishThreshold.
GraylistThreshold float64
// AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
// and limited to scores attainable by bootstrappers and other trusted nodes.
AcceptPXThreshold float64
// OpportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic
// grafting; this should have a small positive value.
OpportunisticGraftThreshold float64
}
func (p *PeerScoreThresholds) validate() error {
if !p.SkipAtomicValidation || p.PublishThreshold != 0 || p.GossipThreshold != 0 || p.GraylistThreshold != 0 {
if p.GossipThreshold > 0 || isInvalidNumber(p.GossipThreshold) {
return fmt.Errorf("invalid gossip threshold; it must be <= 0 and a valid number")
}
if p.PublishThreshold > 0 || p.PublishThreshold > p.GossipThreshold || isInvalidNumber(p.PublishThreshold) {
return fmt.Errorf("invalid publish threshold; it must be <= 0 and <= gossip threshold and a valid number")
}
if p.GraylistThreshold > 0 || p.GraylistThreshold > p.PublishThreshold || isInvalidNumber(p.GraylistThreshold) {
return fmt.Errorf("invalid graylist threshold; it must be <= 0 and <= publish threshold and a valid number")
}
}
if !p.SkipAtomicValidation || p.AcceptPXThreshold != 0 {
if p.AcceptPXThreshold < 0 || isInvalidNumber(p.AcceptPXThreshold) {
return fmt.Errorf("invalid accept PX threshold; it must be >= 0 and a valid number")
}
}
if !p.SkipAtomicValidation || p.OpportunisticGraftThreshold != 0 {
if p.OpportunisticGraftThreshold < 0 || isInvalidNumber(p.OpportunisticGraftThreshold) {
return fmt.Errorf("invalid opportunistic grafting threshold; it must be >= 0 and a valid number")
}
}
return nil
}
type PeerScoreParams struct {
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool
// Score parameters per topic.
Topics map[string]*TopicScoreParams
// Aggregate topic score cap; this limits the total contribution of topics towards a positive
// score. It must be positive (or 0 for no cap).
TopicScoreCap float64
// P5: Application-specific peer scoring
AppSpecificScore func(p peer.ID) float64
AppSpecificWeight float64
// P6: IP-colocation factor.
// The parameter has an associated counter which counts the number of peers with the same IP.
// If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
// is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
// If the number of peers in the same IP is less than the threshold, then the value is 0.
// The weight of the parameter MUST be negative, unless you want to disable for testing.
// Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
// thus disabling the IP colocation penalty.
IPColocationFactorWeight float64
IPColocationFactorThreshold int
IPColocationFactorWhitelist []*net.IPNet
// P7: behavioural pattern penalties.
// This parameter has an associated counter which tracks misbehaviour as detected by the
// router. The router currently applies penalties for the following behaviors:
// - attempting to re-graft before the prune backoff time has elapsed.
// - not following up in IWANT requests for messages advertised with IHAVE.
//
// The value of the parameter is the square of the counter over the threshold, which decays with
// BehaviourPenaltyDecay.
// The weight of the parameter MUST be negative (or zero to disable).
BehaviourPenaltyWeight, BehaviourPenaltyThreshold, BehaviourPenaltyDecay float64
// the decay interval for parameter counters.
DecayInterval time.Duration
// counter value below which it is considered 0.
DecayToZero float64
// time to remember counters for a disconnected peer.
RetainScore time.Duration
// time to remember a message delivery for. Default to global TimeCacheDuration if 0.
SeenMsgTTL time.Duration
}
type TopicScoreParams struct {
// whether it is allowed to just set some params and not all of them.
SkipAtomicValidation bool
// The weight of the topic.
TopicWeight float64
// P1: time in the mesh
// This is the time the peer has been grafted in the mesh.
// The value of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap.
// The weight of the parameter MUST be positive (or zero to disable).
TimeInMeshWeight float64
TimeInMeshQuantum time.Duration
TimeInMeshCap float64
// P2: first message deliveries
// This is the number of message deliveries in the topic.
// The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
// by FirstMessageDeliveriesCap.
// The weight of the parameter MUST be positive (or zero to disable).
FirstMessageDeliveriesWeight, FirstMessageDeliveriesDecay float64
FirstMessageDeliveriesCap float64
// P3: mesh message deliveries
// This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
// message validation; deliveries during validation also count and are retroactively applied
// when validation succeeds.
// This window accounts for the minimum time before a hostile mesh peer trying to game the score
// could replay back a valid message we just sent them.
// It effectively tracks first and near-first deliveries, i.e., a message seen from a mesh peer
// before we have forwarded it to them.
// The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
// If the counter exceeds the threshold, its value is 0.
// If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
// the deficit, ie (MessageDeliveriesThreshold - counter)^2
// The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
// The weight of the parameter MUST be negative (or zero to disable).
MeshMessageDeliveriesWeight, MeshMessageDeliveriesDecay float64
MeshMessageDeliveriesCap, MeshMessageDeliveriesThreshold float64
MeshMessageDeliveriesWindow, MeshMessageDeliveriesActivation time.Duration
// P3b: sticky mesh propagation failures
// This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
// mesh message delivery penalty.
// The weight of the parameter MUST be negative (or zero to disable)
MeshFailurePenaltyWeight, MeshFailurePenaltyDecay float64
// P4: invalid messages
// This is the number of invalid messages in the topic.
// The value of the parameter is the square of the counter, decaying with
// InvalidMessageDeliveriesDecay.
// The weight of the parameter MUST be negative (or zero to disable).
InvalidMessageDeliveriesWeight, InvalidMessageDeliveriesDecay float64
}
// peer score parameter validation
func (p *PeerScoreParams) validate() error {
for topic, params := range p.Topics {
err := params.validate()
if err != nil {
return fmt.Errorf("invalid score parameters for topic %s: %w", topic, err)
}
}
if !p.SkipAtomicValidation || p.TopicScoreCap != 0 {
// check that the topic score is 0 or something positive
if p.TopicScoreCap < 0 || isInvalidNumber(p.TopicScoreCap) {
return fmt.Errorf("invalid topic score cap; must be positive (or 0 for no cap) and a valid number")
}
}
// check that we have an app specific score; the weight can be anything (but expected positive)
if p.AppSpecificScore == nil {
if p.SkipAtomicValidation {
p.AppSpecificScore = func(p peer.ID) float64 {
return 0
}
} else {
return fmt.Errorf("missing application specific score function")
}
}
if !p.SkipAtomicValidation || p.IPColocationFactorWeight != 0 {
// check the IP collocation factor
if p.IPColocationFactorWeight > 0 || isInvalidNumber(p.IPColocationFactorWeight) {
return fmt.Errorf("invalid IPColocationFactorWeight; must be negative (or 0 to disable) and a valid number")
}
if p.IPColocationFactorWeight != 0 && p.IPColocationFactorThreshold < 1 {
return fmt.Errorf("invalid IPColocationFactorThreshold; must be at least 1")
}
}
// check the behaviour penalty
if !p.SkipAtomicValidation || p.BehaviourPenaltyWeight != 0 || p.BehaviourPenaltyThreshold != 0 {
if p.BehaviourPenaltyWeight > 0 || isInvalidNumber(p.BehaviourPenaltyWeight) {
return fmt.Errorf("invalid BehaviourPenaltyWeight; must be negative (or 0 to disable) and a valid number")
}
if p.BehaviourPenaltyWeight != 0 && (p.BehaviourPenaltyDecay <= 0 || p.BehaviourPenaltyDecay >= 1 || isInvalidNumber(p.BehaviourPenaltyDecay)) {
return fmt.Errorf("invalid BehaviourPenaltyDecay; must be between 0 and 1")
}
if p.BehaviourPenaltyThreshold < 0 || isInvalidNumber(p.BehaviourPenaltyThreshold) {
return fmt.Errorf("invalid BehaviourPenaltyThreshold; must be >= 0 and a valid number")
}
}
// check the decay parameters
if !p.SkipAtomicValidation || p.DecayInterval != 0 || p.DecayToZero != 0 {
if p.DecayInterval < time.Second {
return fmt.Errorf("invalid DecayInterval; must be at least 1s")
}
if p.DecayToZero <= 0 || p.DecayToZero >= 1 || isInvalidNumber(p.DecayToZero) {
return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
}
}
// no need to check the score retention; a value of 0 means that we don't retain scores
return nil
}
func (p *TopicScoreParams) validate() error {
// make sure we have a sane topic weight
if p.TopicWeight < 0 || isInvalidNumber(p.TopicWeight) {
return fmt.Errorf("invalid topic weight; must be >= 0 and a valid number")
}
// check P1
if err := p.validateTimeInMeshParams(); err != nil {
return err
}
// check P2
if err := p.validateMessageDeliveryParams(); err != nil {
return err
}
// check P3
if err := p.validateMeshMessageDeliveryParams(); err != nil {
return err
}
// check P3b
if err := p.validateMessageFailurePenaltyParams(); err != nil {
return err
}
// check P4
if err := p.validateInvalidMessageDeliveryParams(); err != nil {
return err
}
return nil
}
func (p *TopicScoreParams) validateTimeInMeshParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.TimeInMeshWeight == 0 && p.TimeInMeshQuantum == 0 && p.TimeInMeshCap == 0 {
return nil
}
}
// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.
if p.TimeInMeshQuantum == 0 {
return fmt.Errorf("invalid TimeInMeshQuantum; must be non zero")
}
if p.TimeInMeshWeight < 0 || isInvalidNumber(p.TimeInMeshWeight) {
return fmt.Errorf("invalid TimeInMeshWeight; must be positive (or 0 to disable) and a valid number")
}
if p.TimeInMeshWeight != 0 && p.TimeInMeshQuantum <= 0 {
return fmt.Errorf("invalid TimeInMeshQuantum; must be positive")
}
if p.TimeInMeshWeight != 0 && (p.TimeInMeshCap <= 0 || isInvalidNumber(p.TimeInMeshCap)) {
return fmt.Errorf("invalid TimeInMeshCap; must be positive and a valid number")
}
return nil
}
func (p *TopicScoreParams) validateMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.FirstMessageDeliveriesWeight == 0 && p.FirstMessageDeliveriesCap == 0 && p.FirstMessageDeliveriesDecay == 0 {
return nil
}
}
// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.
if p.FirstMessageDeliveriesWeight < 0 || isInvalidNumber(p.FirstMessageDeliveriesWeight) {
return fmt.Errorf("invallid FirstMessageDeliveriesWeight; must be positive (or 0 to disable) and a valid number")
}
if p.FirstMessageDeliveriesWeight != 0 && (p.FirstMessageDeliveriesDecay <= 0 || p.FirstMessageDeliveriesDecay >= 1 || isInvalidNumber(p.FirstMessageDeliveriesDecay)) {
return fmt.Errorf("invalid FirstMessageDeliveriesDecay; must be between 0 and 1")
}
if p.FirstMessageDeliveriesWeight != 0 && (p.FirstMessageDeliveriesCap <= 0 || isInvalidNumber(p.FirstMessageDeliveriesCap)) {
return fmt.Errorf("invalid FirstMessageDeliveriesCap; must be positive and a valid number")
}
return nil
}
func (p *TopicScoreParams) validateMeshMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in non-atomic mode, parameters at their zero values are dismissed from validation.
if p.MeshMessageDeliveriesWeight == 0 &&
p.MeshMessageDeliveriesCap == 0 &&
p.MeshMessageDeliveriesDecay == 0 &&
p.MeshMessageDeliveriesThreshold == 0 &&
p.MeshMessageDeliveriesWindow == 0 &&
p.MeshMessageDeliveriesActivation == 0 {
return nil
}
}
// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.
if p.MeshMessageDeliveriesWeight > 0 || isInvalidNumber(p.MeshMessageDeliveriesWeight) {
return fmt.Errorf("invalid MeshMessageDeliveriesWeight; must be negative (or 0 to disable) and a valid number")
}
if p.MeshMessageDeliveriesWeight != 0 && (p.MeshMessageDeliveriesDecay <= 0 || p.MeshMessageDeliveriesDecay >= 1 || isInvalidNumber(p.MeshMessageDeliveriesDecay)) {
return fmt.Errorf("invalid MeshMessageDeliveriesDecay; must be between 0 and 1")
}
if p.MeshMessageDeliveriesWeight != 0 && (p.MeshMessageDeliveriesCap <= 0 || isInvalidNumber(p.MeshMessageDeliveriesCap)) {
return fmt.Errorf("invalid MeshMessageDeliveriesCap; must be positive and a valid number")
}
if p.MeshMessageDeliveriesWeight != 0 && (p.MeshMessageDeliveriesThreshold <= 0 || isInvalidNumber(p.MeshMessageDeliveriesThreshold)) {
return fmt.Errorf("invalid MeshMessageDeliveriesThreshold; must be positive and a valid number")
}
if p.MeshMessageDeliveriesWindow < 0 {
return fmt.Errorf("invalid MeshMessageDeliveriesWindow; must be non-negative")
}
if p.MeshMessageDeliveriesWeight != 0 && p.MeshMessageDeliveriesActivation < time.Second {
return fmt.Errorf("invalid MeshMessageDeliveriesActivation; must be at least 1s")
}
return nil
}
func (p *TopicScoreParams) validateMessageFailurePenaltyParams() error {
if p.SkipAtomicValidation {
// in selective mode, parameters at their zero values are dismissed from validation.
if p.MeshFailurePenaltyDecay == 0 && p.MeshFailurePenaltyWeight == 0 {
return nil
}
}
// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.
if p.MeshFailurePenaltyWeight > 0 || isInvalidNumber(p.MeshFailurePenaltyWeight) {
return fmt.Errorf("invalid MeshFailurePenaltyWeight; must be negative (or 0 to disable) and a valid number")
}
if p.MeshFailurePenaltyWeight != 0 && (isInvalidNumber(p.MeshFailurePenaltyDecay) || p.MeshFailurePenaltyDecay <= 0 || p.MeshFailurePenaltyDecay >= 1) {
return fmt.Errorf("invalid MeshFailurePenaltyDecay; must be between 0 and 1")
}
return nil
}
func (p *TopicScoreParams) validateInvalidMessageDeliveryParams() error {
if p.SkipAtomicValidation {
// in selective mode, parameters at their zero values are dismissed from validation.
if p.InvalidMessageDeliveriesDecay == 0 && p.InvalidMessageDeliveriesWeight == 0 {
return nil
}
}
// either atomic validation mode, or some parameters have been set a value,
// hence, proceed with normal validation of all related parameters in this context.
if p.InvalidMessageDeliveriesWeight > 0 || isInvalidNumber(p.InvalidMessageDeliveriesWeight) {
return fmt.Errorf("invalid InvalidMessageDeliveriesWeight; must be negative (or 0 to disable) and a valid number")
}
if p.InvalidMessageDeliveriesDecay <= 0 || p.InvalidMessageDeliveriesDecay >= 1 || isInvalidNumber(p.InvalidMessageDeliveriesDecay) {
return fmt.Errorf("invalid InvalidMessageDeliveriesDecay; must be between 0 and 1")
}
return nil
}
const (
DefaultDecayInterval = time.Second
DefaultDecayToZero = 0.01
)
// ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s
// and that the value decays to zero if it drops below 0.01
func ScoreParameterDecay(decay time.Duration) float64 {
return ScoreParameterDecayWithBase(decay, DefaultDecayInterval, DefaultDecayToZero)
}
// ScoreParameterDecayWithBase computes the decay factor for a parameter using base as the DecayInterval
func ScoreParameterDecayWithBase(decay time.Duration, base time.Duration, decayToZero float64) float64 {
// the decay is linear, so after n ticks the value is factor^n
// so factor^n = decayToZero => factor = decayToZero^(1/n)
ticks := float64(decay / base)
return math.Pow(decayToZero, 1/ticks)
}
// checks whether the provided floating-point number is `Not a Number`
// or an infinite number.
func isInvalidNumber(num float64) bool {
return math.IsNaN(num) || math.IsInf(num, 0)
}

138
vendor/github.com/libp2p/go-libp2p-pubsub/sign.go generated vendored Normal file
View File

@@ -0,0 +1,138 @@
package pubsub
import (
"fmt"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
// MessageSignaturePolicy describes if signatures are produced, expected, and/or verified.
type MessageSignaturePolicy uint8
// LaxSign and LaxNoSign are deprecated. In the future msgSigning and msgVerification can be unified.
const (
// msgSigning is set when the locally produced messages must be signed
msgSigning MessageSignaturePolicy = 1 << iota
// msgVerification is set when external messages must be verfied
msgVerification
)
const (
// StrictSign produces signatures and expects and verifies incoming signatures
StrictSign = msgSigning | msgVerification
// StrictNoSign does not produce signatures and drops and penalises incoming messages that carry one
StrictNoSign = msgVerification
// LaxSign produces signatures and validates incoming signatures iff one is present
// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
LaxSign = msgSigning
// LaxNoSign does not produce signatures and validates incoming signatures iff one is present
// Deprecated: it is recommend to either strictly enable, or strictly disable, signatures.
LaxNoSign = 0
)
// mustVerify is true when a message signature must be verified.
// If signatures are not expected, verification checks if the signature is absent.
func (policy MessageSignaturePolicy) mustVerify() bool {
return policy&msgVerification != 0
}
// mustSign is true when messages should be signed, and incoming messages are expected to have a signature.
func (policy MessageSignaturePolicy) mustSign() bool {
return policy&msgSigning != 0
}
const SignPrefix = "libp2p-pubsub:"
func verifyMessageSignature(m *pb.Message) error {
pubk, err := messagePubKey(m)
if err != nil {
return err
}
xm := *m
xm.Signature = nil
xm.Key = nil
bytes, err := xm.Marshal()
if err != nil {
return err
}
bytes = withSignPrefix(bytes)
valid, err := pubk.Verify(bytes, m.Signature)
if err != nil {
return err
}
if !valid {
return fmt.Errorf("invalid signature")
}
return nil
}
func messagePubKey(m *pb.Message) (crypto.PubKey, error) {
var pubk crypto.PubKey
pid, err := peer.IDFromBytes(m.From)
if err != nil {
return nil, err
}
if m.Key == nil {
// no attached key, it must be extractable from the source ID
pubk, err = pid.ExtractPublicKey()
if err != nil {
return nil, fmt.Errorf("cannot extract signing key: %s", err.Error())
}
if pubk == nil {
return nil, fmt.Errorf("cannot extract signing key")
}
} else {
pubk, err = crypto.UnmarshalPublicKey(m.Key)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal signing key: %s", err.Error())
}
// verify that the source ID matches the attached key
if !pid.MatchesPublicKey(pubk) {
return nil, fmt.Errorf("bad signing key; source ID %s doesn't match key", pid)
}
}
return pubk, nil
}
func signMessage(pid peer.ID, key crypto.PrivKey, m *pb.Message) error {
bytes, err := m.Marshal()
if err != nil {
return err
}
bytes = withSignPrefix(bytes)
sig, err := key.Sign(bytes)
if err != nil {
return err
}
m.Signature = sig
pk, _ := pid.ExtractPublicKey()
if pk == nil {
pubk, err := crypto.MarshalPublicKey(key.GetPublic())
if err != nil {
return err
}
m.Key = pubk
}
return nil
}
func withSignPrefix(bytes []byte) []byte {
return append([]byte(SignPrefix), bytes...)
}

View File

@@ -0,0 +1,51 @@
package pubsub
import (
"context"
"sync"
)
// Subscription handles the details of a particular Topic subscription.
// There may be many subscriptions for a given Topic.
type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
ctx context.Context
err error
once sync.Once
}
// Topic returns the topic string associated with the Subscription
func (sub *Subscription) Topic() string {
return sub.topic
}
// Next returns the next message in our subscription
func (sub *Subscription) Next(ctx context.Context) (*Message, error) {
select {
case msg, ok := <-sub.ch:
if !ok {
return msg, sub.err
}
return msg, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe
// announcement to the network.
func (sub *Subscription) Cancel() {
select {
case sub.cancelCh <- sub:
case <-sub.ctx.Done():
}
}
func (sub *Subscription) close() {
sub.once.Do(func() {
close(sub.ch)
})
}

View File

@@ -0,0 +1,149 @@
package pubsub
import (
"errors"
"regexp"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
)
// ErrTooManySubscriptions may be returned by a SubscriptionFilter to signal that there are too many
// subscriptions to process.
var ErrTooManySubscriptions = errors.New("too many subscriptions")
// SubscriptionFilter is a function that tells us whether we are interested in allowing and tracking
// subscriptions for a given topic.
//
// The filter is consulted whenever a subscription notification is received by another peer; if the
// filter returns false, then the notification is ignored.
//
// The filter is also consulted when joining topics; if the filter returns false, then the Join
// operation will result in an error.
type SubscriptionFilter interface {
// CanSubscribe returns true if the topic is of interest and we can subscribe to it
CanSubscribe(topic string) bool
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// It should filter only the subscriptions of interest and my return an error if (for instance)
// there are too many subscriptions.
FilterIncomingSubscriptions(peer.ID, []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error)
}
// WithSubscriptionFilter is a pubsub option that specifies a filter for subscriptions
// in topics of interest.
func WithSubscriptionFilter(subFilter SubscriptionFilter) Option {
return func(ps *PubSub) error {
ps.subFilter = subFilter
return nil
}
}
// NewAllowlistSubscriptionFilter creates a subscription filter that only allows explicitly
// specified topics for local subscriptions and incoming peer subscriptions.
func NewAllowlistSubscriptionFilter(topics ...string) SubscriptionFilter {
allow := make(map[string]struct{})
for _, topic := range topics {
allow[topic] = struct{}{}
}
return &allowlistSubscriptionFilter{allow: allow}
}
type allowlistSubscriptionFilter struct {
allow map[string]struct{}
}
var _ SubscriptionFilter = (*allowlistSubscriptionFilter)(nil)
func (f *allowlistSubscriptionFilter) CanSubscribe(topic string) bool {
_, ok := f.allow[topic]
return ok
}
func (f *allowlistSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
return FilterSubscriptions(subs, f.CanSubscribe), nil
}
// NewRegexpSubscriptionFilter creates a subscription filter that only allows topics that
// match a regular expression for local subscriptions and incoming peer subscriptions.
//
// Warning: the user should take care to match start/end of string in the supplied regular
// expression, otherwise the filter might match unwanted topics unexpectedly.
func NewRegexpSubscriptionFilter(rx *regexp.Regexp) SubscriptionFilter {
return &rxSubscriptionFilter{allow: rx}
}
type rxSubscriptionFilter struct {
allow *regexp.Regexp
}
var _ SubscriptionFilter = (*rxSubscriptionFilter)(nil)
func (f *rxSubscriptionFilter) CanSubscribe(topic string) bool {
return f.allow.MatchString(topic)
}
func (f *rxSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
return FilterSubscriptions(subs, f.CanSubscribe), nil
}
// FilterSubscriptions filters (and deduplicates) a list of subscriptions.
// filter should return true if a topic is of interest.
func FilterSubscriptions(subs []*pb.RPC_SubOpts, filter func(string) bool) []*pb.RPC_SubOpts {
accept := make(map[string]*pb.RPC_SubOpts)
for _, sub := range subs {
topic := sub.GetTopicid()
if !filter(topic) {
continue
}
otherSub, ok := accept[topic]
if ok {
if sub.GetSubscribe() != otherSub.GetSubscribe() {
delete(accept, topic)
}
} else {
accept[topic] = sub
}
}
if len(accept) == 0 {
return nil
}
result := make([]*pb.RPC_SubOpts, 0, len(accept))
for _, sub := range accept {
result = append(result, sub)
}
return result
}
// WrapLimitSubscriptionFilter wraps a subscription filter with a hard limit in the number of
// subscriptions allowed in an RPC message.
func WrapLimitSubscriptionFilter(filter SubscriptionFilter, limit int) SubscriptionFilter {
return &limitSubscriptionFilter{filter: filter, limit: limit}
}
type limitSubscriptionFilter struct {
filter SubscriptionFilter
limit int
}
var _ SubscriptionFilter = (*limitSubscriptionFilter)(nil)
func (f *limitSubscriptionFilter) CanSubscribe(topic string) bool {
return f.filter.CanSubscribe(topic)
}
func (f *limitSubscriptionFilter) FilterIncomingSubscriptions(from peer.ID, subs []*pb.RPC_SubOpts) ([]*pb.RPC_SubOpts, error) {
if len(subs) > f.limit {
return nil, ErrTooManySubscriptions
}
return f.filter.FilterIncomingSubscriptions(from, subs)
}

259
vendor/github.com/libp2p/go-libp2p-pubsub/tag_tracer.go generated vendored Normal file
View File

@@ -0,0 +1,259 @@
package pubsub
import (
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
var (
// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
// tag that tracks message deliveries. Each time a peer is the first to deliver a
// message within a topic, we "bump" a tag by this amount, up to a maximum
// of GossipSubConnTagMessageDeliveryCap.
// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
// at every GossipSubConnTagDecayInterval.
GossipSubConnTagBumpMessageDelivery = 1
// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
GossipSubConnTagDecayInterval = 10 * time.Minute
// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
GossipSubConnTagDecayAmount = 1
// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
// track message deliveries.
GossipSubConnTagMessageDeliveryCap = 15
)
// tagTracer is an internal tracer that applies connection manager tags to peer
// connections based on their behavior.
//
// We tag a peer's connections for the following reasons:
// - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000).
// - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20).
// If a peer is in multiple topic meshes, they'll be tagged for each.
// - For each message that we receive, we bump a delivery tag for peer that delivered the message
// first.
// The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at
// a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval.
type tagTracer struct {
sync.RWMutex
cmgr connmgr.ConnManager
idGen *msgIDGenerator
decayer connmgr.Decayer
decaying map[string]connmgr.DecayingTag
direct map[peer.ID]struct{}
// a map of message ids to the set of peers who delivered the message after the first delivery,
// but before the message was finished validating
nearFirst map[string]map[peer.ID]struct{}
}
func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
decayer, ok := connmgr.SupportsDecay(cmgr)
if !ok {
log.Debugf("connection manager does not support decaying tags, delivery tags will not be applied")
}
return &tagTracer{
cmgr: cmgr,
idGen: newMsgIdGenerator(),
decayer: decayer,
decaying: make(map[string]connmgr.DecayingTag),
nearFirst: make(map[string]map[peer.ID]struct{}),
}
}
func (t *tagTracer) Start(gs *GossipSubRouter) {
if t == nil {
return
}
t.idGen = gs.p.idGen
t.direct = gs.direct
}
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
if t.direct == nil {
return
}
// tag peer if it is a direct peer
_, direct := t.direct[p]
if direct {
t.cmgr.Protect(p, "pubsub:<direct>")
}
}
func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) {
tag := topicTag(topic)
t.cmgr.Protect(p, tag)
}
func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) {
tag := topicTag(topic)
t.cmgr.Unprotect(p, tag)
}
func topicTag(topic string) string {
return fmt.Sprintf("pubsub:%s", topic)
}
func (t *tagTracer) addDeliveryTag(topic string) {
if t.decayer == nil {
return
}
name := fmt.Sprintf("pubsub-deliveries:%s", topic)
t.Lock()
defer t.Unlock()
tag, err := t.decayer.RegisterDecayingTag(
name,
GossipSubConnTagDecayInterval,
connmgr.DecayFixed(GossipSubConnTagDecayAmount),
connmgr.BumpSumBounded(0, GossipSubConnTagMessageDeliveryCap))
if err != nil {
log.Warnf("unable to create decaying delivery tag: %s", err)
return
}
t.decaying[topic] = tag
}
func (t *tagTracer) removeDeliveryTag(topic string) {
t.Lock()
defer t.Unlock()
tag, ok := t.decaying[topic]
if !ok {
return
}
err := tag.Close()
if err != nil {
log.Warnf("error closing decaying connmgr tag: %s", err)
}
delete(t.decaying, topic)
}
func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error {
t.RLock()
defer t.RUnlock()
tag, ok := t.decaying[topic]
if !ok {
return fmt.Errorf("no decaying tag registered for topic %s", topic)
}
return tag.Bump(p, GossipSubConnTagBumpMessageDelivery)
}
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
topic := msg.GetTopic()
err := t.bumpDeliveryTag(p, topic)
if err != nil {
log.Warnf("error bumping delivery tag: %s", err)
}
}
// nearFirstPeers returns the peers who delivered the message while it was still validating
func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
t.Lock()
defer t.Unlock()
peersMap, ok := t.nearFirst[t.idGen.ID(msg)]
if !ok {
return nil
}
peers := make([]peer.ID, 0, len(peersMap))
for p := range peersMap {
peers = append(peers, p)
}
return peers
}
// -- RawTracer interface methods
var _ RawTracer = (*tagTracer)(nil)
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
t.tagPeerIfDirect(p)
}
func (t *tagTracer) Join(topic string) {
t.addDeliveryTag(topic)
}
func (t *tagTracer) DeliverMessage(msg *Message) {
nearFirst := t.nearFirstPeers(msg)
t.bumpTagsForMessage(msg.ReceivedFrom, msg)
for _, p := range nearFirst {
t.bumpTagsForMessage(p, msg)
}
// delete the delivery state for this message
t.Lock()
delete(t.nearFirst, t.idGen.ID(msg))
t.Unlock()
}
func (t *tagTracer) Leave(topic string) {
t.removeDeliveryTag(topic)
}
func (t *tagTracer) Graft(p peer.ID, topic string) {
t.tagMeshPeer(p, topic)
}
func (t *tagTracer) Prune(p peer.ID, topic string) {
t.untagMeshPeer(p, topic)
}
func (t *tagTracer) ValidateMessage(msg *Message) {
t.Lock()
defer t.Unlock()
// create map to start tracking the peers who deliver while we're validating
id := t.idGen.ID(msg)
if _, exists := t.nearFirst[id]; exists {
return
}
t.nearFirst[id] = make(map[peer.ID]struct{})
}
func (t *tagTracer) DuplicateMessage(msg *Message) {
t.Lock()
defer t.Unlock()
id := t.idGen.ID(msg)
peers, ok := t.nearFirst[id]
if !ok {
return
}
peers[msg.ReceivedFrom] = struct{}{}
}
func (t *tagTracer) RejectMessage(msg *Message, reason string) {
t.Lock()
defer t.Unlock()
// We want to delete the near-first delivery tracking for messages that have passed through
// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
// queue, so we don't want to remove the state in case the message is still validating.
switch reason {
case RejectValidationThrottled:
fallthrough
case RejectValidationIgnored:
fallthrough
case RejectValidationFailed:
delete(t.nearFirst, t.idGen.ID(msg))
}
}
func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
func (t *tagTracer) RecvRPC(rpc *RPC) {}
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) UndeliverableMessage(msg *Message) {}

View File

@@ -0,0 +1,56 @@
package timecache
import (
"context"
"sync"
"time"
)
// FirstSeenCache is a time cache that only marks the expiry of a message when first added.
type FirstSeenCache struct {
lk sync.RWMutex
m map[string]time.Time
ttl time.Duration
done func()
}
var _ TimeCache = (*FirstSeenCache)(nil)
func newFirstSeenCache(ttl time.Duration) *FirstSeenCache {
tc := &FirstSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)
return tc
}
func (tc *FirstSeenCache) Done() {
tc.done()
}
func (tc *FirstSeenCache) Has(s string) bool {
tc.lk.RLock()
defer tc.lk.RUnlock()
_, ok := tc.m[s]
return ok
}
func (tc *FirstSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()
_, ok := tc.m[s]
if ok {
return false
}
tc.m[s] = time.Now().Add(tc.ttl)
return true
}

View File

@@ -0,0 +1,58 @@
package timecache
import (
"context"
"sync"
"time"
)
// LastSeenCache is a time cache that extends the expiry of a seen message when added
// or checked for presence with Has..
type LastSeenCache struct {
lk sync.Mutex
m map[string]time.Time
ttl time.Duration
done func()
}
var _ TimeCache = (*LastSeenCache)(nil)
func newLastSeenCache(ttl time.Duration) *LastSeenCache {
tc := &LastSeenCache{
m: make(map[string]time.Time),
ttl: ttl,
}
ctx, done := context.WithCancel(context.Background())
tc.done = done
go background(ctx, &tc.lk, tc.m)
return tc
}
func (tc *LastSeenCache) Done() {
tc.done()
}
func (tc *LastSeenCache) Add(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()
_, ok := tc.m[s]
tc.m[s] = time.Now().Add(tc.ttl)
return !ok
}
func (tc *LastSeenCache) Has(s string) bool {
tc.lk.Lock()
defer tc.lk.Unlock()
_, ok := tc.m[s]
if ok {
tc.m[s] = time.Now().Add(tc.ttl)
}
return ok
}

View File

@@ -0,0 +1,52 @@
package timecache
import (
"time"
logger "github.com/ipfs/go-log/v2"
)
var log = logger.Logger("pubsub/timecache")
// Stategy is the TimeCache expiration strategy to use.
type Strategy uint8
const (
// Strategy_FirstSeen expires an entry from the time it was added.
Strategy_FirstSeen Strategy = iota
// Stategy_LastSeen expires an entry from the last time it was touched by an Add or Has.
Strategy_LastSeen
)
// TimeCache is a cahe of recently seen messages (by id).
type TimeCache interface {
// Add adds an id into the cache, if it is not already there.
// Returns true if the id was newly added to the cache.
// Depending on the implementation strategy, it may or may not update the expiry of
// an existing entry.
Add(string) bool
// Has checks the cache for the presence of an id.
// Depending on the implementation strategy, it may or may not update the expiry of
// an existing entry.
Has(string) bool
// Done signals that the user is done with this cache, which it may stop background threads
// and relinquish resources.
Done()
}
// NewTimeCache defaults to the original ("first seen") cache implementation
func NewTimeCache(ttl time.Duration) TimeCache {
return NewTimeCacheWithStrategy(Strategy_FirstSeen, ttl)
}
func NewTimeCacheWithStrategy(strategy Strategy, ttl time.Duration) TimeCache {
switch strategy {
case Strategy_FirstSeen:
return newFirstSeenCache(ttl)
case Strategy_LastSeen:
return newLastSeenCache(ttl)
default:
// Default to the original time cache implementation
return newFirstSeenCache(ttl)
}
}

View File

@@ -0,0 +1,35 @@
package timecache
import (
"context"
"sync"
"time"
)
var backgroundSweepInterval = time.Minute
func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) {
ticker := time.NewTicker(backgroundSweepInterval)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
sweep(lk, m, now)
case <-ctx.Done():
return
}
}
}
func sweep(lk sync.Locker, m map[string]time.Time, now time.Time) {
lk.Lock()
defer lk.Unlock()
for k, expiry := range m {
if expiry.Before(now) {
delete(m, k)
}
}
}

477
vendor/github.com/libp2p/go-libp2p-pubsub/topic.go generated vendored Normal file
View File

@@ -0,0 +1,477 @@
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")
// ErrNilSignKey is returned if a nil private key was provided
var ErrNilSignKey = errors.New("nil sign key")
// ErrEmptyPeerID is returned if an empty peer ID was provided
var ErrEmptyPeerID = errors.New("empty peer ID")
// Topic is the handle for a pubsub topic
type Topic struct {
p *PubSub
topic string
evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}
mux sync.RWMutex
closed bool
}
// String returns the topic associated with t
func (t *Topic) String() string {
return t.topic
}
// SetScoreParams sets the topic score parameters if the pubsub router supports peer
// scoring
func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
err := p.validate()
if err != nil {
return fmt.Errorf("invalid topic score parameters: %w", err)
}
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return ErrTopicClosed
}
result := make(chan error, 1)
update := func() {
gs, ok := t.p.rt.(*GossipSubRouter)
if !ok {
result <- fmt.Errorf("pubsub router is not gossipsub")
return
}
if gs.score == nil {
result <- fmt.Errorf("peer scoring is not enabled in router")
return
}
err := gs.score.SetTopicScoreParams(t.topic, p)
result <- err
}
select {
case t.p.eval <- update:
err = <-result
return err
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
}
// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
h := &TopicEventHandler{
topic: t,
err: nil,
evtLog: make(map[peer.ID]EventType),
evtLogCh: make(chan struct{}, 1),
}
for _, opt := range opts {
err := opt(h)
if err != nil {
return nil, err
}
}
done := make(chan struct{}, 1)
select {
case t.p.eval <- func() {
tmap := t.p.topics[t.topic]
for p := range tmap {
h.evtLog[p] = PeerJoin
}
t.evtHandlerMux.Lock()
t.evtHandlers[h] = struct{}{}
t.evtHandlerMux.Unlock()
done <- struct{}{}
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
<-done
return h, nil
}
func (t *Topic) sendNotification(evt PeerEvent) {
t.evtHandlerMux.RLock()
defer t.evtHandlerMux.RUnlock()
for h := range t.evtHandlers {
h.sendNotification(evt)
}
}
// Subscribe returns a new Subscription for the topic.
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
sub := &Subscription{
topic: t.topic,
ctx: t.p.ctx,
}
for _, opt := range opts {
err := opt(sub)
if err != nil {
return nil, err
}
}
if sub.ch == nil {
// apply the default size
sub.ch = make(chan *Message, 32)
}
out := make(chan *Subscription, 1)
t.p.disc.Discover(sub.topic)
select {
case t.p.addSub <- &addSubReq{
sub: sub,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
return <-out, nil
}
// Relay enables message relaying for the topic and returns a reference
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return nil, ErrTopicClosed
}
out := make(chan RelayCancelFunc, 1)
t.p.disc.Discover(t.topic)
select {
case t.p.addRelay <- &addRelayReq{
topic: t.topic,
resp: out,
}:
case <-t.p.ctx.Done():
return nil, t.p.ctx.Err()
}
return <-out, nil
}
// RouterReady is a function that decides if a router is ready to publish
type RouterReady func(rt PubSubRouter, topic string) (bool, error)
// ProvideKey is a function that provides a private key and its associated peer ID when publishing a new message
type ProvideKey func() (crypto.PrivKey, peer.ID)
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
}
type PubOpt func(pub *PublishOptions) error
// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return ErrTopicClosed
}
pid := t.p.signID
key := t.p.signKey
pub := &PublishOptions{}
for _, opt := range opts {
err := opt(pub)
if err != nil {
return err
}
}
if pub.customKey != nil && !pub.local {
key, pid = pub.customKey()
if key == nil {
return ErrNilSignKey
}
if len(pid) == 0 {
return ErrEmptyPeerID
}
}
m := &pb.Message{
Data: data,
Topic: &t.topic,
From: nil,
Seqno: nil,
}
if pid != "" {
m.From = []byte(pid)
m.Seqno = t.p.nextSeqno()
}
if key != nil {
m.From = []byte(pid)
err := signMessage(pid, key, m)
if err != nil {
return err
}
}
if pub.ready != nil {
if t.p.disc.discovery != nil {
t.p.disc.Bootstrap(ctx, t.topic, pub.ready)
} else {
// TODO: we could likely do better than polling every 200ms.
// For example, block this goroutine on a channel,
// and check again whenever events tell us that the number of
// peers has increased.
var ticker *time.Ticker
readyLoop:
for {
// Check if ready for publishing.
// Similar to what disc.Bootstrap does.
res := make(chan bool, 1)
select {
case t.p.eval <- func() {
done, _ := pub.ready(t.p.rt, t.topic)
res <- done
}:
if <-res {
break readyLoop
}
case <-t.p.ctx.Done():
return t.p.ctx.Err()
case <-ctx.Done():
return ctx.Err()
}
if ticker == nil {
ticker = time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
}
select {
case <-ticker.C:
case <-ctx.Done():
return fmt.Errorf("router is not ready: %w", ctx.Err())
}
}
}
}
return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil, pub.local})
}
// WithReadiness returns a publishing option for only publishing when the router is ready.
// This option is not useful unless PubSub is also using WithDiscovery
func WithReadiness(ready RouterReady) PubOpt {
return func(pub *PublishOptions) error {
pub.ready = ready
return nil
}
}
// WithLocalPublication returns a publishing option to notify in-process subscribers only.
// It prevents message publication to mesh peers.
// Useful in edge cases where the msg needs to be only delivered to the in-process subscribers,
// e.g. not to spam the network with outdated msgs.
// Should not be used specifically for in-process pubsubing.
func WithLocalPublication(local bool) PubOpt {
return func(pub *PublishOptions) error {
pub.local = local
return nil
}
}
// WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
// This option is useful when we want to send messages from "virtual", never-connectable peers in the network
func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
return func(pub *PublishOptions) error {
pub.customKey = func() (crypto.PrivKey, peer.ID) {
return key, pid
}
return nil
}
}
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
return nil
}
req := &rmTopicReq{t, make(chan error, 1)}
select {
case t.p.rmTopic <- req:
case <-t.p.ctx.Done():
return t.p.ctx.Err()
}
err := <-req.resp
if err == nil {
t.closed = true
}
return err
}
// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
return []peer.ID{}
}
return t.p.ListPeers(t.topic)
}
type EventType int
const (
PeerJoin EventType = iota
PeerLeave
)
// TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
type TopicEventHandler struct {
topic *Topic
err error
evtLogMx sync.Mutex
evtLog map[peer.ID]EventType
evtLogCh chan struct{}
}
type TopicEventHandlerOpt func(t *TopicEventHandler) error
type PeerEvent struct {
Type EventType
Peer peer.ID
}
// Cancel closes the topic event handler
func (t *TopicEventHandler) Cancel() {
topic := t.topic
t.err = fmt.Errorf("topic event handler cancelled by calling handler.Cancel()")
topic.evtHandlerMux.Lock()
delete(topic.evtHandlers, t)
t.topic.evtHandlerMux.Unlock()
}
func (t *TopicEventHandler) sendNotification(evt PeerEvent) {
t.evtLogMx.Lock()
t.addToEventLog(evt)
t.evtLogMx.Unlock()
}
// addToEventLog assumes a lock has been taken to protect the event log
func (t *TopicEventHandler) addToEventLog(evt PeerEvent) {
e, ok := t.evtLog[evt.Peer]
if !ok {
t.evtLog[evt.Peer] = evt.Type
// send signal that an event has been added to the event log
select {
case t.evtLogCh <- struct{}{}:
default:
}
} else if e != evt.Type {
delete(t.evtLog, evt.Peer)
}
}
// pullFromEventLog assumes a lock has been taken to protect the event log
func (t *TopicEventHandler) pullFromEventLog() (PeerEvent, bool) {
for k, v := range t.evtLog {
evt := PeerEvent{Peer: k, Type: v}
delete(t.evtLog, k)
return evt, true
}
return PeerEvent{}, false
}
// NextPeerEvent returns the next event regarding subscribed peers
// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order.
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent.
func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
for {
t.evtLogMx.Lock()
evt, ok := t.pullFromEventLog()
if ok {
// make sure an event log signal is available if there are events in the event log
if len(t.evtLog) > 0 {
select {
case t.evtLogCh <- struct{}{}:
default:
}
}
t.evtLogMx.Unlock()
return evt, nil
}
t.evtLogMx.Unlock()
select {
case <-t.evtLogCh:
continue
case <-ctx.Done():
return PeerEvent{}, ctx.Err()
}
}
}

530
vendor/github.com/libp2p/go-libp2p-pubsub/trace.go generated vendored Normal file
View File

@@ -0,0 +1,530 @@
package pubsub
import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// EventTracer is a generic event tracer interface.
// This is a high level tracing interface which delivers tracing events, as defined by the protobuf
// schema in pb/trace.proto.
type EventTracer interface {
Trace(evt *pb.TraceEvent)
}
// RawTracer is a low level tracing interface that allows an application to trace the internal
// operation of the pubsub subsystem.
//
// Note that the tracers are invoked synchronously, which means that application tracers must
// take care to not block or modify arguments.
//
// Warning: this interface is not fixed, we may be adding new methods as necessitated by the system
// in the future.
type RawTracer interface {
// AddPeer is invoked when a new peer is added.
AddPeer(p peer.ID, proto protocol.ID)
// RemovePeer is invoked when a peer is removed.
RemovePeer(p peer.ID)
// Join is invoked when a new topic is joined
Join(topic string)
// Leave is invoked when a topic is abandoned
Leave(topic string)
// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
Graft(p peer.ID, topic string)
// Prune is invoked when a peer is pruned from the message (gossipsub)
Prune(p peer.ID, topic string)
// ValidateMessage is invoked when a message first enters the validation pipeline.
ValidateMessage(msg *Message)
// DeliverMessage is invoked when a message is delivered
DeliverMessage(msg *Message)
// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
RejectMessage(msg *Message, reason string)
// DuplicateMessage is invoked when a duplicate message is dropped.
DuplicateMessage(msg *Message)
// ThrottlePeer is invoked when a peer is throttled by the peer gater.
ThrottlePeer(p peer.ID)
// RecvRPC is invoked when an incoming RPC is received.
RecvRPC(rpc *RPC)
// SendRPC is invoked when a RPC is sent.
SendRPC(rpc *RPC, p peer.ID)
// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
DropRPC(rpc *RPC, p peer.ID)
// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
UndeliverableMessage(msg *Message)
}
// pubsub tracer details
type pubsubTracer struct {
tracer EventTracer
raw []RawTracer
pid peer.ID
idGen *msgIDGenerator
}
func (t *pubsubTracer) PublishMessage(msg *Message) {
if t == nil {
return
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
PublishMessage: &pb.TraceEvent_PublishMessage{
MessageID: []byte(t.idGen.ID(msg)),
Topic: msg.Message.Topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) ValidateMessage(msg *Message) {
if t == nil {
return
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.raw {
tr.ValidateMessage(msg)
}
}
}
func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
if t == nil {
return
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.raw {
tr.RejectMessage(msg, reason)
}
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REJECT_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RejectMessage: &pb.TraceEvent_RejectMessage{
MessageID: []byte(t.idGen.ID(msg)),
ReceivedFrom: []byte(msg.ReceivedFrom),
Reason: &reason,
Topic: msg.Topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DuplicateMessage(msg *Message) {
if t == nil {
return
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.raw {
tr.DuplicateMessage(msg)
}
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
MessageID: []byte(t.idGen.ID(msg)),
ReceivedFrom: []byte(msg.ReceivedFrom),
Topic: msg.Topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DeliverMessage(msg *Message) {
if t == nil {
return
}
if msg.ReceivedFrom != t.pid {
for _, tr := range t.raw {
tr.DeliverMessage(msg)
}
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DeliverMessage: &pb.TraceEvent_DeliverMessage{
MessageID: []byte(t.idGen.ID(msg)),
Topic: msg.Topic,
ReceivedFrom: []byte(msg.ReceivedFrom),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.AddPeer(p, proto)
}
if t.tracer == nil {
return
}
protoStr := string(proto)
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_ADD_PEER.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
AddPeer: &pb.TraceEvent_AddPeer{
PeerID: []byte(p),
Proto: &protoStr,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) RemovePeer(p peer.ID) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.RemovePeer(p)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REMOVE_PEER.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RemovePeer: &pb.TraceEvent_RemovePeer{
PeerID: []byte(p),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) RecvRPC(rpc *RPC) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.RecvRPC(rpc)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_RECV_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RecvRPC: &pb.TraceEvent_RecvRPC{
ReceivedFrom: []byte(rpc.from),
Meta: t.traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.SendRPC(rpc, p)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_SEND_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
SendRPC: &pb.TraceEvent_SendRPC{
SendTo: []byte(p),
Meta: t.traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.DropRPC(rpc, p)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DROP_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DropRPC: &pb.TraceEvent_DropRPC{
SendTo: []byte(p),
Meta: t.traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) UndeliverableMessage(msg *Message) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.UndeliverableMessage(msg)
}
}
func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
rpcMeta := new(pb.TraceEvent_RPCMeta)
var msgs []*pb.TraceEvent_MessageMeta
for _, m := range rpc.Publish {
msgs = append(msgs, &pb.TraceEvent_MessageMeta{
MessageID: []byte(t.idGen.RawID(m)),
Topic: m.Topic,
})
}
rpcMeta.Messages = msgs
var subs []*pb.TraceEvent_SubMeta
for _, sub := range rpc.Subscriptions {
subs = append(subs, &pb.TraceEvent_SubMeta{
Subscribe: sub.Subscribe,
Topic: sub.Topicid,
})
}
rpcMeta.Subscription = subs
if rpc.Control != nil {
var ihave []*pb.TraceEvent_ControlIHaveMeta
for _, ctl := range rpc.Control.Ihave {
var mids [][]byte
for _, mid := range ctl.MessageIDs {
mids = append(mids, []byte(mid))
}
ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{
Topic: ctl.TopicID,
MessageIDs: mids,
})
}
var iwant []*pb.TraceEvent_ControlIWantMeta
for _, ctl := range rpc.Control.Iwant {
var mids [][]byte
for _, mid := range ctl.MessageIDs {
mids = append(mids, []byte(mid))
}
iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{
MessageIDs: mids,
})
}
var graft []*pb.TraceEvent_ControlGraftMeta
for _, ctl := range rpc.Control.Graft {
graft = append(graft, &pb.TraceEvent_ControlGraftMeta{
Topic: ctl.TopicID,
})
}
var prune []*pb.TraceEvent_ControlPruneMeta
for _, ctl := range rpc.Control.Prune {
peers := make([][]byte, 0, len(ctl.Peers))
for _, pi := range ctl.Peers {
peers = append(peers, pi.PeerID)
}
prune = append(prune, &pb.TraceEvent_ControlPruneMeta{
Topic: ctl.TopicID,
Peers: peers,
})
}
rpcMeta.Control = &pb.TraceEvent_ControlMeta{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
}
}
return rpcMeta
}
func (t *pubsubTracer) Join(topic string) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.Join(topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_JOIN.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Join: &pb.TraceEvent_Join{
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Leave(topic string) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.Leave(topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_LEAVE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Leave: &pb.TraceEvent_Leave{
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Graft(p peer.ID, topic string) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.Graft(p, topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_GRAFT.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Graft: &pb.TraceEvent_Graft{
PeerID: []byte(p),
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Prune(p peer.ID, topic string) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.Prune(p, topic)
}
if t.tracer == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PRUNE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Prune: &pb.TraceEvent_Prune{
PeerID: []byte(p),
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) ThrottlePeer(p peer.ID) {
if t == nil {
return
}
for _, tr := range t.raw {
tr.ThrottlePeer(p)
}
}

303
vendor/github.com/libp2p/go-libp2p-pubsub/tracer.go generated vendored Normal file
View File

@@ -0,0 +1,303 @@
package pubsub
import (
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
)
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
var MinTraceBatchSize = 16
// rejection reasons
const (
RejectBlacklstedPeer = "blacklisted peer"
RejectBlacklistedSource = "blacklisted source"
RejectMissingSignature = "missing signature"
RejectUnexpectedSignature = "unexpected signature"
RejectUnexpectedAuthInfo = "unexpected auth info"
RejectInvalidSignature = "invalid signature"
RejectValidationQueueFull = "validation queue full"
RejectValidationThrottled = "validation throttled"
RejectValidationFailed = "validation failed"
RejectValidationIgnored = "validation ignored"
RejectSelfOrigin = "self originated message"
)
type basicTracer struct {
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
closed bool
}
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
t.mx.Lock()
defer t.mx.Unlock()
if t.closed {
return
}
if t.lossy && len(t.buf) > TraceBufferSize {
log.Debug("trace buffer overflow; dropping trace event")
} else {
t.buf = append(t.buf, evt)
}
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *basicTracer) Close() {
t.mx.Lock()
defer t.mx.Unlock()
if !t.closed {
t.closed = true
close(t.ch)
}
}
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
basicTracer
w io.WriteCloser
}
// NewJsonTracer creates a new JSONTracer writing traces to file.
func NewJSONTracer(file string) (*JSONTracer, error) {
return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *JSONTracer) doWrite() {
var buf []*pb.TraceEvent
enc := json.NewEncoder(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := enc.Encode(evt)
if err != nil {
log.Warnf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
var _ EventTracer = (*JSONTracer)(nil)
// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
basicTracer
w io.WriteCloser
}
func NewPBTracer(file string) (*PBTracer, error) {
return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *PBTracer) doWrite() {
var buf []*pb.TraceEvent
w := protoio.NewDelimitedWriter(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := w.WriteMsg(evt)
if err != nil {
log.Warnf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
var _ EventTracer = (*PBTracer)(nil)
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
// RemoteTracer is a tracer that sends trace events to a remote peer
type RemoteTracer struct {
basicTracer
ctx context.Context
host host.Host
peer peer.ID
}
// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
go tr.doWrite()
return tr, nil
}
func (t *RemoteTracer) doWrite() {
var buf []*pb.TraceEvent
s, err := t.openStream()
if err != nil {
log.Debugf("error opening remote tracer stream: %s", err.Error())
return
}
var batch pb.TraceEventBatch
gzipW := gzip.NewWriter(s)
w := protoio.NewDelimitedWriter(gzipW)
for {
_, ok := <-t.ch
// deadline for batch accumulation
deadline := time.Now().Add(time.Second)
t.mx.Lock()
for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
t.mx.Unlock()
time.Sleep(100 * time.Millisecond)
t.mx.Lock()
}
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
if len(buf) == 0 {
goto end
}
batch.Batch = buf
err = w.WriteMsg(&batch)
if err != nil {
log.Debugf("error writing trace event batch: %s", err)
goto end
}
err = gzipW.Flush()
if err != nil {
log.Debugf("error flushin gzip stream: %s", err)
goto end
}
end:
// nil out the buffer to gc consumed events
for i := range buf {
buf[i] = nil
}
if !ok {
if err != nil {
s.Reset()
} else {
gzipW.Close()
s.Close()
}
return
}
if err != nil {
s.Reset()
s, err = t.openStream()
if err != nil {
log.Debugf("error opening remote tracer stream: %s", err.Error())
return
}
gzipW.Reset(s)
}
}
}
func (t *RemoteTracer) openStream() (network.Stream, error) {
for {
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
cancel()
if err != nil {
if t.ctx.Err() != nil {
return nil, err
}
// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
return s, nil
}
}
var _ EventTracer = (*RemoteTracer)(nil)

590
vendor/github.com/libp2p/go-libp2p-pubsub/validation.go generated vendored Normal file
View File

@@ -0,0 +1,590 @@
package pubsub
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
)
const (
defaultValidateQueueSize = 32
defaultValidateConcurrency = 1024
defaultValidateThrottle = 8192
)
// ValidationError is an error that may be signalled from message publication when the message
// fails validation
type ValidationError struct {
Reason string
}
func (e ValidationError) Error() string {
return e.Reason
}
// Validator is a function that validates a message with a binary decision: accept or reject.
type Validator func(context.Context, peer.ID, *Message) bool
// ValidatorEx is an extended validation function that validates a message with an enumerated decision
type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult
// ValidationResult represents the decision of an extended validator
type ValidationResult int
const (
// ValidationAccept is a validation decision that indicates a valid message that should be accepted and
// delivered to the application and forwarded to the network.
ValidationAccept = ValidationResult(0)
// ValidationReject is a validation decision that indicates an invalid message that should not be
// delivered to the application or forwarded to the application. Furthermore the peer that forwarded
// the message should be penalized by peer scoring routers.
ValidationReject = ValidationResult(1)
// ValidationIgnore is a validation decision that indicates a message that should be ignored: it will
// be neither delivered to the application nor forwarded to the network. However, in contrast to
// ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers.
ValidationIgnore = ValidationResult(2)
// internal
validationThrottled = ValidationResult(-1)
)
// ValidatorOpt is an option for RegisterTopicValidator.
type ValidatorOpt func(addVal *addValReq) error
// validation represents the validator pipeline.
// The validator pipeline performs signature validation and runs a
// sequence of user-configured validators per-topic. It is possible to
// adjust various concurrency parameters, such as the number of
// workers and the max number of simultaneous validations. The user
// can also attach inline validators that will be executed
// synchronously; this may be useful to prevent superfluous
// context-switching for lightweight tasks.
type validation struct {
p *PubSub
tracer *pubsubTracer
// mx protects the validator map
mx sync.Mutex
// topicVals tracks per topic validators
topicVals map[string]*validatorImpl
// defaultVals tracks default validators applicable to all topics
defaultVals []*validatorImpl
// validateQ is the front-end to the validation pipeline
validateQ chan *validateReq
// validateThrottle limits the number of active validation goroutines
validateThrottle chan struct{}
// this is the number of synchronous validation workers
validateWorkers int
}
// validation requests
type validateReq struct {
vals []*validatorImpl
src peer.ID
msg *Message
}
// representation of topic validators
type validatorImpl struct {
topic string
validate ValidatorEx
validateTimeout time.Duration
validateThrottle chan struct{}
validateInline bool
}
// async request to add a topic validators
type addValReq struct {
topic string
validate interface{}
timeout time.Duration
throttle int
inline bool
resp chan error
}
// async request to remove a topic validator
type rmValReq struct {
topic string
resp chan error
}
// newValidation creates a new validation pipeline
func newValidation() *validation {
return &validation{
topicVals: make(map[string]*validatorImpl),
validateQ: make(chan *validateReq, defaultValidateQueueSize),
validateThrottle: make(chan struct{}, defaultValidateThrottle),
validateWorkers: runtime.NumCPU(),
}
}
// Start attaches the validation pipeline to a pubsub instance and starts background
// workers
func (v *validation) Start(p *PubSub) {
v.p = p
v.tracer = p.tracer
for i := 0; i < v.validateWorkers; i++ {
go v.validateWorker()
}
}
// AddValidator adds a new validator
func (v *validation) AddValidator(req *addValReq) {
val, err := v.makeValidator(req)
if err != nil {
req.resp <- err
return
}
v.mx.Lock()
defer v.mx.Unlock()
topic := val.topic
_, ok := v.topicVals[topic]
if ok {
req.resp <- fmt.Errorf("duplicate validator for topic %s", topic)
return
}
v.topicVals[topic] = val
req.resp <- nil
}
func (v *validation) makeValidator(req *addValReq) (*validatorImpl, error) {
makeValidatorEx := func(v Validator) ValidatorEx {
return func(ctx context.Context, p peer.ID, msg *Message) ValidationResult {
if v(ctx, p, msg) {
return ValidationAccept
} else {
return ValidationReject
}
}
}
var validator ValidatorEx
switch v := req.validate.(type) {
case func(ctx context.Context, p peer.ID, msg *Message) bool:
validator = makeValidatorEx(Validator(v))
case Validator:
validator = makeValidatorEx(v)
case func(ctx context.Context, p peer.ID, msg *Message) ValidationResult:
validator = ValidatorEx(v)
case ValidatorEx:
validator = v
default:
topic := req.topic
if req.topic == "" {
topic = "(default)"
}
return nil, fmt.Errorf("unknown validator type for topic %s; must be an instance of Validator or ValidatorEx", topic)
}
val := &validatorImpl{
topic: req.topic,
validate: validator,
validateTimeout: 0,
validateThrottle: make(chan struct{}, defaultValidateConcurrency),
validateInline: req.inline,
}
if req.timeout > 0 {
val.validateTimeout = req.timeout
}
if req.throttle > 0 {
val.validateThrottle = make(chan struct{}, req.throttle)
}
return val, nil
}
// RemoveValidator removes an existing validator
func (v *validation) RemoveValidator(req *rmValReq) {
v.mx.Lock()
defer v.mx.Unlock()
topic := req.topic
_, ok := v.topicVals[topic]
if ok {
delete(v.topicVals, topic)
req.resp <- nil
} else {
req.resp <- fmt.Errorf("no validator for topic %s", topic)
}
}
// PushLocal synchronously pushes a locally published message and performs applicable
// validations.
// Returns an error if validation fails
func (v *validation) PushLocal(msg *Message) error {
v.p.tracer.PublishMessage(msg)
err := v.p.checkSigningPolicy(msg)
if err != nil {
return err
}
vals := v.getValidators(msg)
return v.validate(vals, msg.ReceivedFrom, msg, true)
}
// Push pushes a message into the validation pipeline.
// It returns true if the message can be forwarded immediately without validation.
func (v *validation) Push(src peer.ID, msg *Message) bool {
vals := v.getValidators(msg)
if len(vals) > 0 || msg.Signature != nil {
select {
case v.validateQ <- &validateReq{vals, src, msg}:
default:
log.Debugf("message validation throttled: queue full; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationQueueFull)
}
return false
}
return true
}
// getValidators returns all validators that apply to a given message
func (v *validation) getValidators(msg *Message) []*validatorImpl {
v.mx.Lock()
defer v.mx.Unlock()
var vals []*validatorImpl
vals = append(vals, v.defaultVals...)
topic := msg.GetTopic()
val, ok := v.topicVals[topic]
if !ok {
return vals
}
return append(vals, val)
}
// validateWorker is an active goroutine performing inline validation
func (v *validation) validateWorker() {
for {
select {
case req := <-v.validateQ:
v.validate(req.vals, req.src, req.msg, false)
case <-v.p.ctx.Done():
return
}
}
}
// validate performs validation and only sends the message if all validators succeed
func (v *validation) validate(vals []*validatorImpl, src peer.ID, msg *Message, synchronous bool) error {
// If signature verification is enabled, but signing is disabled,
// the Signature is required to be nil upon receiving the message in PubSub.pushMsg.
if msg.Signature != nil {
if !v.validateSignature(msg) {
log.Debugf("message signature validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectInvalidSignature)
return ValidationError{Reason: RejectInvalidSignature}
}
}
// we can mark the message as seen now that we have verified the signature
// and avoid invoking user validators more than once
id := v.p.idGen.ID(msg)
if !v.p.markSeen(id) {
v.tracer.DuplicateMessage(msg)
return nil
} else {
v.tracer.ValidateMessage(msg)
}
var inline, async []*validatorImpl
for _, val := range vals {
if val.validateInline || synchronous {
inline = append(inline, val)
} else {
async = append(async, val)
}
}
// apply inline (synchronous) validators
result := ValidationAccept
loop:
for _, val := range inline {
switch val.validateMsg(v.p.ctx, src, msg) {
case ValidationAccept:
case ValidationReject:
result = ValidationReject
break loop
case ValidationIgnore:
result = ValidationIgnore
}
}
if result == ValidationReject {
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationFailed)
return ValidationError{Reason: RejectValidationFailed}
}
// apply async validators
if len(async) > 0 {
select {
case v.validateThrottle <- struct{}{}:
go func() {
v.doValidateTopic(async, src, msg, result)
<-v.validateThrottle
}()
default:
log.Debugf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationThrottled)
}
return nil
}
if result == ValidationIgnore {
v.tracer.RejectMessage(msg, RejectValidationIgnored)
return ValidationError{Reason: RejectValidationIgnored}
}
// no async validators, accepted message, send it!
select {
case v.p.sendMsg <- msg:
return nil
case <-v.p.ctx.Done():
return v.p.ctx.Err()
}
}
func (v *validation) validateSignature(msg *Message) bool {
err := verifyMessageSignature(msg.Message)
if err != nil {
log.Debugf("signature verification error: %s", err.Error())
return false
}
return true
}
func (v *validation) doValidateTopic(vals []*validatorImpl, src peer.ID, msg *Message, r ValidationResult) {
result := v.validateTopic(vals, src, msg)
if result == ValidationAccept && r != ValidationAccept {
result = r
}
switch result {
case ValidationAccept:
v.p.sendMsg <- msg
case ValidationReject:
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationFailed)
return
case ValidationIgnore:
log.Debugf("message validation punted; ignoring message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationIgnored)
return
case validationThrottled:
log.Debugf("message validation throttled; ignoring message from %s", src)
v.tracer.RejectMessage(msg, RejectValidationThrottled)
default:
// BUG: this would be an internal programming error, so a panic seems appropiate.
panic(fmt.Errorf("unexpected validation result: %d", result))
}
}
func (v *validation) validateTopic(vals []*validatorImpl, src peer.ID, msg *Message) ValidationResult {
if len(vals) == 1 {
return v.validateSingleTopic(vals[0], src, msg)
}
ctx, cancel := context.WithCancel(v.p.ctx)
defer cancel()
rch := make(chan ValidationResult, len(vals))
rcount := 0
for _, val := range vals {
rcount++
select {
case val.validateThrottle <- struct{}{}:
go func(val *validatorImpl) {
rch <- val.validateMsg(ctx, src, msg)
<-val.validateThrottle
}(val)
default:
log.Debugf("validation throttled for topic %s", val.topic)
rch <- validationThrottled
}
}
result := ValidationAccept
loop:
for i := 0; i < rcount; i++ {
switch <-rch {
case ValidationAccept:
case ValidationReject:
result = ValidationReject
break loop
case ValidationIgnore:
// throttled validation has the same effect, but takes precedence over Ignore as it is not
// known whether the throttled validator would have signaled rejection.
if result != validationThrottled {
result = ValidationIgnore
}
case validationThrottled:
result = validationThrottled
}
}
return result
}
// fast path for single topic validation that avoids the extra goroutine
func (v *validation) validateSingleTopic(val *validatorImpl, src peer.ID, msg *Message) ValidationResult {
select {
case val.validateThrottle <- struct{}{}:
res := val.validateMsg(v.p.ctx, src, msg)
<-val.validateThrottle
return res
default:
log.Debugf("validation throttled for topic %s", val.topic)
return validationThrottled
}
}
func (val *validatorImpl) validateMsg(ctx context.Context, src peer.ID, msg *Message) ValidationResult {
start := time.Now()
defer func() {
log.Debugf("validation done; took %s", time.Since(start))
}()
if val.validateTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, val.validateTimeout)
defer cancel()
}
r := val.validate(ctx, src, msg)
switch r {
case ValidationAccept:
fallthrough
case ValidationReject:
fallthrough
case ValidationIgnore:
return r
default:
log.Warnf("Unexpected result from validator: %d; ignoring message", r)
return ValidationIgnore
}
}
// / Options
// WithDefaultValidator adds a validator that applies to all topics by default; it can be used
// more than once and add multiple validators. Having a defult validator does not inhibit registering
// a per topic validator.
func WithDefaultValidator(val interface{}, opts ...ValidatorOpt) Option {
return func(ps *PubSub) error {
addVal := &addValReq{
validate: val,
}
for _, opt := range opts {
err := opt(addVal)
if err != nil {
return err
}
}
val, err := ps.val.makeValidator(addVal)
if err != nil {
return err
}
ps.val.defaultVals = append(ps.val.defaultVals, val)
return nil
}
}
// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
// When queue is full, validation is throttled and new messages are dropped.
func WithValidateQueueSize(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateQ = make(chan *validateReq, n)
return nil
}
return fmt.Errorf("validate queue size must be > 0")
}
}
// WithValidateThrottle sets the upper bound on the number of active validation
// goroutines across all topics. The default is 8192.
func WithValidateThrottle(n int) Option {
return func(ps *PubSub) error {
ps.val.validateThrottle = make(chan struct{}, n)
return nil
}
}
// WithValidateWorkers sets the number of synchronous validation worker goroutines.
// Defaults to NumCPU.
//
// The synchronous validation workers perform signature validation, apply inline
// user validators, and schedule asynchronous user validators.
// You can adjust this parameter to devote less cpu time to synchronous validation.
func WithValidateWorkers(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateWorkers = n
return nil
}
return fmt.Errorf("number of validation workers must be > 0")
}
}
// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator.
// By default there is no timeout in asynchronous validators.
func WithValidatorTimeout(timeout time.Duration) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.timeout = timeout
return nil
}
}
// WithValidatorConcurrency is an option that sets the topic validator throttle.
// This controls the number of active validation goroutines for the topic; the default is 1024.
func WithValidatorConcurrency(n int) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.throttle = n
return nil
}
}
// WithValidatorInline is an option that sets the validation disposition to synchronous:
// it will be executed inline in validation front-end, without spawning a new goroutine.
// This is suitable for simple or cpu-bound validators that do not block.
func WithValidatorInline(inline bool) ValidatorOpt {
return func(addVal *addValReq) error {
addVal.inline = inline
return nil
}
}

View File

@@ -0,0 +1,101 @@
package pubsub
import (
"context"
"encoding/binary"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
)
// PeerMetadataStore is an interface for storing and retrieving per peer metadata
type PeerMetadataStore interface {
// Get retrieves the metadata associated with a peer;
// It should return nil if there is no metadata associated with the peer and not an error.
Get(context.Context, peer.ID) ([]byte, error)
// Put sets the metadata associated with a peer.
Put(context.Context, peer.ID, []byte) error
}
// BasicSeqnoValidator is a basic validator, usable as a default validator, that ignores replayed
// messages outside the seen cache window. The validator uses the message seqno as a peer-specific
// nonce to decide whether the message should be propagated, comparing to the maximal nonce store
// in the peer metadata store. This is useful to ensure that there can be no infinitely propagating
// messages in the network regardless of the seen cache span and network diameter.
// It requires that pubsub is instantiated with a strict message signing policy and that seqnos
// are not disabled, ie it doesn't support anonymous mode.
//
// Warning: See https://github.com/libp2p/rust-libp2p/issues/3453
// TL;DR: rust is currently violating the spec by issuing a random seqno, which creates an
// interoperability hazard. We expect this issue to be addressed in the not so distant future,
// but keep this in mind if you are in a mixed environment with (older) rust nodes.
type BasicSeqnoValidator struct {
mx sync.RWMutex
meta PeerMetadataStore
}
// NewBasicSeqnoValidator constructs a BasicSeqnoValidator using the givven PeerMetadataStore.
func NewBasicSeqnoValidator(meta PeerMetadataStore) ValidatorEx {
val := &BasicSeqnoValidator{
meta: meta,
}
return val.validate
}
func (v *BasicSeqnoValidator) validate(ctx context.Context, _ peer.ID, m *Message) ValidationResult {
p := m.GetFrom()
v.mx.RLock()
nonceBytes, err := v.meta.Get(ctx, p)
v.mx.RUnlock()
if err != nil {
log.Warn("error retrieving peer nonce: %s", err)
return ValidationIgnore
}
var nonce uint64
if len(nonceBytes) > 0 {
nonce = binary.BigEndian.Uint64(nonceBytes)
}
var seqno uint64
seqnoBytes := m.GetSeqno()
if len(seqnoBytes) > 0 {
seqno = binary.BigEndian.Uint64(seqnoBytes)
}
// compare against the largest seen nonce
if seqno <= nonce {
return ValidationIgnore
}
// get the nonce and compare again with an exclusive lock before commiting (cf concurrent validation)
v.mx.Lock()
defer v.mx.Unlock()
nonceBytes, err = v.meta.Get(ctx, p)
if err != nil {
log.Warn("error retrieving peer nonce: %s", err)
return ValidationIgnore
}
if len(nonceBytes) > 0 {
nonce = binary.BigEndian.Uint64(nonceBytes)
}
if seqno <= nonce {
return ValidationIgnore
}
// update the nonce
nonceBytes = make([]byte, 8)
binary.BigEndian.PutUint64(nonceBytes, seqno)
err = v.meta.Put(ctx, p, nonceBytes)
if err != nil {
log.Warn("error storing peer nonce: %s", err)
}
return ValidationAccept
}