Construindo uma aplicação echo com a libp2p
Recentemente eu tive a oportunidade de trabalhar em um projeto que usa a libp2p. Ele usa um Kademlia DHT para a descoberta de pares e realiza chamadas de rpc usando a biblioteca libp2p-gorpc. Finalmente tive a oportunidade de criar algo usando as tecnologias que pesquisei e descobri para minha tese de mestrado. Foi ótimo ver como estas bibliotecas funcionam e colocar algo em funcionamento. Existem bons exemplos de partes de códigos suficientes no github, embora fosse bastante difícil encontrar um bom guia ou tutorial que descrevesse os detalhes que eu estava procurando. É por isso que criei um esqueleto de aplicação básica do com uma arquitetura que pode ser estendida facilmente.
Você pode encontrar a fonte do projeto em github.com/ldej/echo.
Host
A primeira coisa a fazer em uma aplicação da libp2p, é criar um Host
. O Host é a peça central da comunicação com os pares.
Vou usar as palavras "peer" e "node" de forma intercambiável. Quando eu as uso, quero dizer uma instância em execução da aplicação.
Um Host
contém todas as funcionalidades centrais que você precisa para conectar um par a outro. Um *Host*
contém uma *ID*
que é a identidade de um par. O Host
também contém uma Peerstore
que é como um livro de endereços. Com um Host
você pode se Conectar
a outros pares e criar Streams
entre eles. Um Stream
representa um canal de comunicação entre dois pares em uma rede libp2p.
A ID de um par é derivada de sua chave pública. Isto significa que, para criar um Host
, um par de chaves públicas-privadas precisa ser gerado primeiro. A seguir, exceto que criei uma função chamada NewHost
, que cria um par de chaves público-privado e um host.
package main
import (
"context"
"crypto/rand"
"fmt"
"io"
mrand "math/rand"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/multiformats/go-multiaddr"
)
func NewHost(ctx context.Context, seed int64, port int) (host.Host, error) {
// If the seed is zero, use real cryptographic randomness. Otherwise, use a
// deterministic randomness source to make generated keys stay the same
// across multiple runs
var r io.Reader
if seed == 0 {
r = rand.Reader
} else {
r = mrand.New(mrand.NewSource(seed))
}
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
if err != nil {
return nil, err
}
addr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))
return libp2p.New(ctx,
libp2p.ListenAddrs(addr),
libp2p.Identity(priv),
)
}
Quando você desenvolve uma aplicação, talvez queira ter um identificador previsível para sua aplicação em cada execução. Isso facilita a conexão e a depuração. É por isso que uma fonte de aleatoriedade diferente é escolhida. O exemplo de chat da libp2p está fazendo a mesma coisa. A biblioteca crypto/rsa
de Golang quer evitar a previsibilidade, por isso incluiu o randutil.MaybeReadByte(random)
, o que significa que mesmo que você queira previsibilidade, você não a obtém. Uma questão foi aberta no repositório do go-libp2p-examples, explicando que o Ed25519
pode ser usado no lugar do RSA
.
Na linha 33, um novo endereço é criado onde o host estará escutando. Quando você fornecer 0
como porta, ele encontrará automaticamente uma porta disponível para você.
Descoberta por pares: DHT ou mDNS?
Após criar um host, como os hosts irão se descobrir uns aos outros? Há duas opções disponíveis na libp2p: DNS multicast (mDNS) e uma Tabela de Hash Distribuído (DHT).
O mDNS envia uma mensagem UDP multicast na porta 5353, anunciando sua presença. Isto é utilizado, por exemplo, pela Bonjour da Apple ou por impressoras. Funciona em redes locais, mas é claro que não funciona através da Internet.
Um DHT também pode ser usado para descobrir os pares. Quando um par se junta a um DHT, ele pode usar a loja de valores chave para anunciar sua presença e encontrar outros pares na rede. A chave utilizada para anunciar sua presença é chamada de rendezvous-point.
Há duas grandes diferenças entre usar o mDNS ou um DHT para descobrir os pares. A primeira que já mencionei, o mDNS não funciona através da Internet, onde um DHT funciona. A segunda diferença é que um DHT requer nós de bootstrapping. Outros nós podem entrar na rede conectando-se a um nó de bootstrapping, e depois descobrir o resto da rede.
A Tabela hash distribuída ( DHT ) local Kademlia
No código abaixo, um DHT é iniciado e uma conexão é feita com os pares de inicialização que são fornecidos usando um parâmetro. Nas linhas 18-20 é adicionada uma opção para instruir o par que, caso não sejam fornecidos pares de inicialização, deve entrar em modo servidor. No modo servidor, ele atua como um nó de inicialização, permitindo que outros pares se juntem a ele.
Para isto funcionar, eu tinha que habilitar UPnP na configuração do meu roteador. Caso você não queira ou não tenha acesso a isso, tente executar os nós em uma máquina virtual ou em containers dockers.
Caso você queira se juntar ao Kademlia DHT global da libp2p, você pode usar os pares de inicialização em dht.DefaultBootstrapPeers
.
package main
import (
"context"
"log"
"sync"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
disc "github.com/libp2p/go-libp2p-discovery"
"github.com/libp2p/go-libp2p-kad-dht"
"github.com/multiformats/go-multiaddr"
)
func NewKDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Multiaddr) (*disc.RoutingDiscovery, error) {
var options []dht.Option
if len(bootstrapPeers) == 0 {
options = append(options, dht.Mode(dht.ModeServer))
}
kdht, err := dht.New(ctx, host, options...)
if err != nil {
return nil, err
}
if err = kdht.Bootstrap(ctx); err != nil {
return nil, err
}
for _, peerAddr := range bootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := host.Connect(ctx, *peerinfo); err != nil {
log.Printf("Error while connecting to node %q: %-v", peerinfo, err)
} else {
log.Printf("Connection established with bootstrap node: %q", *peerinfo)
}
}()
}
wg.Wait()
return disc.NewRoutingDiscovery(kdht), nil
Descobrindo outros pares
Com o DHT instalado, é hora de descobrir outros pares. Primeiro, na linha 15, o DHT se envolve em um objeto de discovery.RoutingDiscovery
. O RoutingDiscovery
fornece as funções Advertise
e FindPeers
.
A função Advertise
inicia uma rotina que continua com a propaganda até que o contexto seja cancelado. Ela anuncia sua presença a cada 3 horas. Isto pode ser reduzido fornecendo uma opção TTL (time to live) como um quarto parâmetro.
A função FindPeers
nos fornece todos os pares que foram descobertos no rendezvous-point. Como o nó em si também faz parte dos pares descobertos, ele precisa ser filtrado (linha 33). Para todos os outros pares, verifique se eles já estão conectados, se não estiverem, então Disque
para criar uma conexão.
package main
import (
"context"
"log"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-discovery"
"github.com/libp2p/go-libp2p-kad-dht"
)
func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
var routingDiscovery = discovery.NewRoutingDiscovery(dht)
discovery.Advertise(ctx, routingDiscovery, rendezvous)
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
peers, err := discovery.FindPeers(ctx, routingDiscovery, rendezvous)
if err != nil {
log.Fatal(err)
}
for _, p := range peers {
if p.ID == h.ID() {
continue
}
if h.Network().Connectedness(p.ID) != network.Connected {
_, err = h.Network().DialPeer(ctx, p.ID)
if err != nil {
continue
}
}
}
}
}
}
RPC
Agora que os pares foram descobertos, é hora de configurar o RPC usando o go-libp2p-gorpc
. Vamos adicionar uma função simples que envia uma mensagem a todos os pares, e cada par ecoa a mesma mensagem de volta.
package main
import "context"
const (
EchoService = "EchoRPCAPI"
EchoServiceFuncEcho = "Echo"
)
type EchoRPCAPI struct {
service *Service
}
type Envelope struct {
Message string
}
func (e *EchoRPCAPI) Echo(ctx context.Context, in Envelope, out *Envelope) error {
*out = r.service.ReceiveEcho(in)
return nil
}
Um serviço rpc consiste de uma estrutura (linhas 10-12) com um método definido sobre ela. Neste caso há um método rpc chamado Echo
definido nas linhas 18-21. Um método rpc precisa ter uma assinatura específica:
o receptor precisa ser um indicador (e *EchoRPCAPI
)
o primeiro parâmetro precisa ser um context.Context
o segundo parâmetro, os dados recebidos, precisa ser um tipo concreto
o terceiro parâmetro, os dados de saída, precisa ser um indicador
o tipo de retorno tem que ser error
No exemplo de implementação é definida uma estrutura de Envelope
que é utilizada tanto para os dados de entrada quanto para os de saída. Caso não sejam necessários dados de entrada, uma estrutura vazia pode ser definida como um parâmetro: In struct{}
. No caso de não serem necessários dados de saída, um indicador para uma estrutura vazia pode ser usado como parâmetro: out *struct{}
.
Para separar a lógica rpc da "lógica empresarial", o EchoRPCAPI
tem um serviço que é usado no método Echo
.
Nas linhas 6 e 7 são definidas duas strings que serão usadas no código abaixo. A primeira representa o nome exato da estrutura para o EchoRPCAPI
, a segunda representa o nome do método Echo
que será chamado.
Serviço
Com a echo rpc configurada, vamos dar uma olhada no serviço que a chama.
Primeiro, vamos dar uma olhada no método SetupRPC
. Ele cria o rpc.Server
, este servidor é usado para receber chamadas de outros pares. Depois ele cria uma instância de EchoRPCAPI
e a registra com o servidor. Finalmente, ele cria um rpc.Client
e passa o rpc.Server
como um argumento. O rpc.Client
pode realizar chamadas em seu próprio servidor como se fosse apenas mais um par.
package main
import (
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-gorpc"
)
type Service struct {
rpcServer *rpc.Server
rpcClient *rpc.Client
host host.Host
protocol protocol.ID
counter int
}
func NewService(host host.Host, protocol protocol.ID) *Service {
return &Service{
host: host,
protocol: protocol,
}
}
func (s *Service) SetupRPC() error {
s.rpcServer = rpc.NewServer(s.host, s.protocol)
echoRPCAPI := EchoRPCAPI{service: s}
err := s.rpcServer.Register(&echoRPCAPI)
if err != nil {
return err
}
s.rpcClient = rpc.NewClientWithServer(s.host, s.protocol, s.rpcServer)
return nil
}
func (s *Service) StartMessaging(ctx context.Context) {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.counter++
s.Echo(fmt.Sprintf("Message (%d): Hello from %s", s.counter, s.host.ID().Pretty()))
}
}
}
func (s *Service) Echo(message string) {
peers := s.host.Peerstore().Peers()
var replies = make([]*Envelope, len(peers))
errs := s.rpcClient.MultiCall(
Ctxts(len(peers)),
peers,
EchoService,
EchoServiceFuncEcho,
Envelope{Message: message},
CopyEnvelopesToIfaces(replies),
)
for i, err := range errs {
if err != nil {
fmt.Printf("Peer %s returned error: %-v\n", peers[i].Pretty(), err)
} else {
fmt.Printf("Peer %s echoed: %s\n", peers[i].Pretty(), replies[i].Message)
}
}
}
func (s *Service) ReceiveEcho(envelope Envelope) Envelope {
return Envelope{Message: fmt.Sprintf("Peer %s echoing: %s", s.host.ID(), envelope.Message)}
}
func Ctxts(n int) []context.Context {
ctxs := make([]context.Context, n)
for i := 0; i < n; i++ {
ctxs[i] = context.Background()
}
return ctxs
}
func CopyEnvelopesToIfaces(in []*Envelope) []interface{} {
ifaces := make([]interface{}, len(in))
for i := range in {
in[i] = &Envelope{}
ifaces[i] = in[i]
}
return ifaces
}
E então é isto que temos construído: realizar uma chamada de procedimento remoto. Nas linhas 57 a 77, é feita uma chamada de procedimento remoto. A chamada é dirigida a todos os pares na PeerStore
, o que inclui o próprio par. Neste caso, é feita uma chamada MultiCall
.
O método MultiCall
tem uma assinatura a que levei algum tempo para me acostumar. O primeiro argumento é uma fatia dos contextos, uma para cada par. O contexto é o primeiro parâmetro do método Echo
que foi definido no EchoRPCAPI
. O segundo argumento é a lista de pares sobre os quais a chamada deve ser realizada. O terceiro parâmetro é o nome do serviço que deve ser chamado, neste caso é o EchoRPCAPI
que já foi definido na constante EchoService
em rpc_api.go
. O quarto argumento é o método que deve ser chamado, neste caso o método Echo
como definido na constante EchoServiceFuncEcho
. O quinto parâmetro é o parâmetro in
do método Echo
. Isto não é uma fatia, então isto significa que cada par receberá exatamente o mesmo valor. Se você quiser valores diferentes para pares diferentes, você precisa usar rpc.Server.Call
ao invés de MultiCall
e realizar uma chamada para cada par individualmente. O sexto e último parâmetro é para as respostas. O parâmetro aceita apenas uma fatia de interfaces que consiste em ponteiros para os objetos reais que no final irão conter as respostas. Os Ctxts
e CopyEnvelopesToIfaces
estão lá para ajudar a criar as estruturas de dados corretas para esses parâmetros. Esta é uma estratégia que eu encontrei no projeto ipfs-cluster. Ela também inclui uma função RPCDiscardReplies
que é útil para fazer uma chamada MultiCall
a um método rpc que tem que ser do tipo de resposta.
O método MultiCall
retorna uma fatia que tem o comprimento exato do número de pares para os quais a chamada foi feita. Isto permite a iteração sobre a fatia de erros para verificar se algum deles retornou um erro. Há uma variedade de erros que podem ser devolvidos. Por exemplo, quando um par é inalcançável, ele retornará um dial Backoff
erro. Quando a função Echo
retorna um erro (ao invés do nil
que é retornado agora), ele será um erro nesta fatia.
Ligando tudo isso
Com todas as peças montadas, é hora de montar as peças em main.go
. As bandeiras da linha de comando são usadas para parametrizar a aplicação. Em seguida é criado um ´´host´´, o DHT é iniciado, o serviço com rpc é configurado e finalmente é iniciada a descoberta de pares e o envio de mensagens.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"
)
func main() {
config := Config{}
flag.StringVar(&config.Rendezvous, "rendezvous", "ldej/echo", "")
flag.Int64Var(&config.Seed, "seed", 0, "Seed value for generating a PeerID, 0 is random")
flag.Var(&config.DiscoveryPeers, "peer", "Peer multiaddress for peer discovery")
flag.StringVar(&config.ProtocolID, "protocolid", "/p2p/rpc/ldej", "")
flag.IntVar(&config.Port, "port", 0, "")
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
h, err := NewHost(ctx, config.Seed, config.Port)
if err != nil {
log.Fatal(err)
}
log.Printf("Host ID: %s", h.ID().Pretty())
log.Printf("Connect to me on:")
for _, addr := range h.Addrs() {
log.Printf(" %s/p2p/%s", addr, h.ID().Pretty())
}
dht, err := NewDHT(ctx, h, config.DiscoveryPeers)
if err != nil {
log.Fatal(err)
}
service := NewService(h, protocol.ID(config.ProtocolID))
err = service.SetupRPC()
if err != nil {
log.Fatal(err)
}
go Discover(ctx, h, dht, config.Rendezvous)
go service.StartMessaging(ctx)
run(h, cancel)
}
func run(h host.Host, cancel func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-c
fmt.Printf("\rExiting...\n")
cancel()
if err := h.Close(); err != nil {
panic(err)
}
os.Exit(0)
}
type Config struct {
Port int
ProtocolID string
Rendezvous string
Seed int64
DiscoveryPeers addrList
}
type addrList []multiaddr.Multiaddr
func (al *addrList) String() string {
strs := make([]string, len(*al))
for i, addr := range *al {
strs[i] = addr.String()
}
return strings.Join(strs, ",")
}
func (al *addrList) Set(value string) error {
addr, err := multiaddr.NewMultiaddr(value)
if err != nil {
return err
}
*al = append(*al, addr)
return nil
}
Executando a aplicação
$ git clone [email protected]:ldej/echo.git
$ go run .
2021/01/20 12:56:42 Host ID: QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:56:42 Connect to me on:
2021/01/20 12:56:42 /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:56:42 /ip4/127.0.0.1/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
$ # open a second terminal
$ go run . -peer /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:57:45 Host ID: QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45 Connect to me on:
2021/01/20 12:57:45 /ip4/192.168.1.8/tcp/39957/p2p/QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45 /ip4/127.0.0.1/tcp/39957/p2p/QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45 Connection established with bootstrap node: "{QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz: [/ip4/192.168.1.8/tcp/45363]}"
$ # open a third terminal
$ go run . -peer /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:59:06 Host ID: QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:59:06 Connect to me on:
2021/01/20 12:59:06 /ip4/192.168.1.8/tcp/42967/p2p/QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:59:06 /ip4/127.0.0.1/tcp/42967/p2p/QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:57:45 Connection established with bootstrap node: "{QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz: [/ip4/192.168.1.8/tcp/45363]}"
Gerenciando os pares
Em um sistema ponto a ponto, nunca se sabe quando os pares partem ou quando eles ficam indisponíveis. A PeerStore
se lembra dos colegas até que seu TTL (Time To Live) tenha expirado. No DHT
, um par anuncia sua presença a cada 3 horas. Neste exemplo de aplicação, a saída dos pares não é gerenciada de forma alguma. Dependendo dos requisitos de sua aplicação, isto pode ser importante ou não.
Conclusão
E aí está, uma aplicação básica da libp2p que usa o Kademlia DHT para a descoberta de pares e que pode realizar chamadas usando rpc. No final, foi muito divertido descobrir e construir tudo isso, estou feliz com o resultado e vou usá-lo como base para criar aplicações mais descentralizadas.
Em um futuro post, vou dar uma olhada na implementação de relógios lógicos e talvez dê uma olhada nos algoritmos de consenso.
Artigo escrito por Laurence De Jong e traduzido para o português por Rafael Ojeda
Você pode ler o artigo original em inglês aqui.
Oldest comments (0)