WEB3DEV

Cover image for Construindo uma aplicação echo com a libp2p
Rafael Ojeda
Rafael Ojeda

Posted on • Atualizado em

Construindo uma aplicação echo com a libp2p

Construindo uma aplicação echo com a libp2p

Recentemente eu tive a oportunidade de trabalhar em um projeto que usa a libp2p. Ele usa um Kademlia DHT para a descoberta de pares e realiza chamadas de rpc usando a biblioteca libp2p-gorpc. Finalmente tive a oportunidade de criar algo usando as tecnologias que pesquisei e descobri para minha tese de mestrado. Foi ótimo ver como estas bibliotecas funcionam e colocar algo em funcionamento. Existem bons exemplos de partes de códigos suficientes no github, embora fosse bastante difícil encontrar um bom guia ou tutorial que descrevesse os detalhes que eu estava procurando. É por isso que criei um esqueleto de aplicação básica do com uma arquitetura que pode ser estendida facilmente.

Você pode encontrar a fonte do projeto em github.com/ldej/echo.

Host

A primeira coisa a fazer em uma aplicação da libp2p, é criar um Host. O Host é a peça central da comunicação com os pares.

Vou usar as palavras "peer" e "node" de forma intercambiável. Quando eu as uso, quero dizer uma instância em execução da aplicação.

Um Host contém todas as funcionalidades centrais que você precisa para conectar um par a outro. Um *Host* contém uma *ID* que é a identidade de um par. O Host também contém uma Peerstore que é como um livro de endereços. Com um Host você pode se Conectar a outros pares e criar Streams entre eles. Um Stream representa um canal de comunicação entre dois pares em uma rede libp2p.

A ID de um par é derivada de sua chave pública. Isto significa que, para criar um Host, um par de chaves públicas-privadas precisa ser gerado primeiro. A seguir, exceto que criei uma função chamada NewHost, que cria um par de chaves público-privado e um host.

package main

import (
    "context"
    "crypto/rand"
    "fmt"
    "io"
    mrand "math/rand"

    "github.com/libp2p/go-libp2p"
    "github.com/libp2p/go-libp2p-core/crypto"
    "github.com/libp2p/go-libp2p-core/host"
    "github.com/multiformats/go-multiaddr"
)

func NewHost(ctx context.Context, seed int64, port int) (host.Host, error) {

    // If the seed is zero, use real cryptographic randomness. Otherwise, use a
    // deterministic randomness source to make generated keys stay the same
    // across multiple runs
    var r io.Reader
    if seed == 0 {
        r = rand.Reader
    } else {
        r = mrand.New(mrand.NewSource(seed))
    }

    priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
    if err != nil {
        return nil, err
    }

    addr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port))

    return libp2p.New(ctx,
        libp2p.ListenAddrs(addr),
        libp2p.Identity(priv),
    )
}
Enter fullscreen mode Exit fullscreen mode

Quando você desenvolve uma aplicação, talvez queira ter um identificador previsível para sua aplicação em cada execução. Isso facilita a conexão e a depuração. É por isso que uma fonte de aleatoriedade diferente é escolhida. O exemplo de chat da libp2p está fazendo a mesma coisa. A biblioteca crypto/rsa de Golang quer evitar a previsibilidade, por isso incluiu o randutil.MaybeReadByte(random), o que significa que mesmo que você queira previsibilidade, você não a obtém. Uma questão foi aberta no repositório do go-libp2p-examples, explicando que o Ed25519 pode ser usado no lugar do RSA.

Na linha 33, um novo endereço é criado onde o host estará escutando. Quando você fornecer 0 como porta, ele encontrará automaticamente uma porta disponível para você.

Descoberta por pares: DHT ou mDNS?

Após criar um host, como os hosts irão se descobrir uns aos outros? Há duas opções disponíveis na libp2p: DNS multicast (mDNS) e uma Tabela de Hash Distribuído (DHT).

O mDNS envia uma mensagem UDP multicast na porta 5353, anunciando sua presença. Isto é utilizado, por exemplo, pela Bonjour da Apple ou por impressoras. Funciona em redes locais, mas é claro que não funciona através da Internet.

Um DHT também pode ser usado para descobrir os pares. Quando um par se junta a um DHT, ele pode usar a loja de valores chave para anunciar sua presença e encontrar outros pares na rede. A chave utilizada para anunciar sua presença é chamada de rendezvous-point.

Há duas grandes diferenças entre usar o mDNS ou um DHT para descobrir os pares. A primeira que já mencionei, o mDNS não funciona através da Internet, onde um DHT funciona. A segunda diferença é que um DHT requer nós de bootstrapping. Outros nós podem entrar na rede conectando-se a um nó de bootstrapping, e depois descobrir o resto da rede.

A Tabela hash distribuída ( DHT ) local Kademlia

No código abaixo, um DHT é iniciado e uma conexão é feita com os pares de inicialização que são fornecidos usando um parâmetro. Nas linhas 18-20 é adicionada uma opção para instruir o par que, caso não sejam fornecidos pares de inicialização, deve entrar em modo servidor. No modo servidor, ele atua como um nó de inicialização, permitindo que outros pares se juntem a ele.

Para isto funcionar, eu tinha que habilitar UPnP na configuração do meu roteador. Caso você não queira ou não tenha acesso a isso, tente executar os nós em uma máquina virtual ou em containers dockers.

Caso você queira se juntar ao Kademlia DHT global da libp2p, você pode usar os pares de inicialização em dht.DefaultBootstrapPeers.

package main

import (
    "context"
    "log"
    "sync"

    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/peer"
    disc "github.com/libp2p/go-libp2p-discovery"
    "github.com/libp2p/go-libp2p-kad-dht"
    "github.com/multiformats/go-multiaddr"
)

func NewKDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Multiaddr) (*disc.RoutingDiscovery, error) {
    var options []dht.Option
    if len(bootstrapPeers) == 0 {
        options = append(options, dht.Mode(dht.ModeServer))
    }

    kdht, err := dht.New(ctx, host, options...)
    if err != nil {
        return nil, err
    }

    if err = kdht.Bootstrap(ctx); err != nil {
        return nil, err
    }

    for _, peerAddr := range bootstrapPeers {
        peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)

        wg.Add(1)
        go func() {
            defer wg.Done()
            if err := host.Connect(ctx, *peerinfo); err != nil {
    log.Printf("Error while connecting to node %q: %-v", peerinfo, err)
            } else {
    log.Printf("Connection established with bootstrap node: %q", *peerinfo)
            }
        }()
    }
    wg.Wait()

    return disc.NewRoutingDiscovery(kdht), nil
Enter fullscreen mode Exit fullscreen mode

Descobrindo outros pares

Com o DHT instalado, é hora de descobrir outros pares. Primeiro, na linha 15, o DHT se envolve em um objeto de discovery.RoutingDiscovery. O RoutingDiscovery fornece as funções Advertise e FindPeers.

A função Advertise inicia uma rotina que continua com a propaganda até que o contexto seja cancelado. Ela anuncia sua presença a cada 3 horas. Isto pode ser reduzido fornecendo uma opção TTL (time to live) como um quarto parâmetro.

A função FindPeers nos fornece todos os pares que foram descobertos no rendezvous-point. Como o nó em si também faz parte dos pares descobertos, ele precisa ser filtrado (linha 33). Para todos os outros pares, verifique se eles já estão conectados, se não estiverem, então Disque para criar uma conexão.

package main

import (
    "context"
    "log"
    "time"

    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/network"
    "github.com/libp2p/go-libp2p-discovery"
    "github.com/libp2p/go-libp2p-kad-dht"
)

func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) {
    var routingDiscovery = discovery.NewRoutingDiscovery(dht)
    discovery.Advertise(ctx, routingDiscovery, rendezvous)

    ticker := time.NewTicker(time.Second * 1)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:

            peers, err := discovery.FindPeers(ctx, routingDiscovery, rendezvous)
            if err != nil {
                log.Fatal(err)
            }

            for _, p := range peers {
                if p.ID == h.ID() {
                    continue
                }
                if h.Network().Connectedness(p.ID) != network.Connected {
                    _, err = h.Network().DialPeer(ctx, p.ID)
                    if err != nil {
                        continue
                    }
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

RPC

Agora que os pares foram descobertos, é hora de configurar o RPC usando o go-libp2p-gorpc. Vamos adicionar uma função simples que envia uma mensagem a todos os pares, e cada par ecoa a mesma mensagem de volta.

package main

import "context"

const (
    EchoService         = "EchoRPCAPI"
    EchoServiceFuncEcho = "Echo"
)

type EchoRPCAPI struct {
    service *Service
}

type Envelope struct {
    Message string
}

func (e *EchoRPCAPI) Echo(ctx context.Context, in Envelope, out *Envelope) error {
    *out = r.service.ReceiveEcho(in)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Um serviço rpc consiste de uma estrutura (linhas 10-12) com um método definido sobre ela. Neste caso há um método rpc chamado Echo definido nas linhas 18-21. Um método rpc precisa ter uma assinatura específica:

o receptor precisa ser um indicador (e *EchoRPCAPI)
o primeiro parâmetro precisa ser um context.Context
o segundo parâmetro, os dados recebidos, precisa ser um tipo concreto
o terceiro parâmetro, os dados de saída, precisa ser um indicador
o tipo de retorno tem que ser error

No exemplo de implementação é definida uma estrutura de Envelope que é utilizada tanto para os dados de entrada quanto para os de saída. Caso não sejam necessários dados de entrada, uma estrutura vazia pode ser definida como um parâmetro: In struct{}. No caso de não serem necessários dados de saída, um indicador para uma estrutura vazia pode ser usado como parâmetro: out *struct{}.

Para separar a lógica rpc da "lógica empresarial", o EchoRPCAPI tem um serviço que é usado no método Echo.

Nas linhas 6 e 7 são definidas duas strings que serão usadas no código abaixo. A primeira representa o nome exato da estrutura para o EchoRPCAPI, a segunda representa o nome do método Echo que será chamado.

Serviço

Com a echo rpc configurada, vamos dar uma olhada no serviço que a chama.

Primeiro, vamos dar uma olhada no método SetupRPC. Ele cria o rpc.Server, este servidor é usado para receber chamadas de outros pares. Depois ele cria uma instância de EchoRPCAPI e a registra com o servidor. Finalmente, ele cria um rpc.Client e passa o rpc.Server como um argumento. O rpc.Client pode realizar chamadas em seu próprio servidor como se fosse apenas mais um par.

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/protocol"
    "github.com/libp2p/go-libp2p-gorpc"
)

type Service struct {
    rpcServer *rpc.Server
    rpcClient *rpc.Client
    host      host.Host
    protocol  protocol.ID
    counter   int
}

func NewService(host host.Host, protocol protocol.ID) *Service {
    return &Service{
        host:     host,
        protocol: protocol,
    }
}

func (s *Service) SetupRPC() error {
    s.rpcServer = rpc.NewServer(s.host, s.protocol)

    echoRPCAPI := EchoRPCAPI{service: s}
    err := s.rpcServer.Register(&echoRPCAPI)
    if err != nil {
        return err
    }

    s.rpcClient = rpc.NewClientWithServer(s.host, s.protocol, s.rpcServer)
    return nil
}

func (s *Service) StartMessaging(ctx context.Context) {
    ticker := time.NewTicker(time.Second * 1)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            s.counter++
            s.Echo(fmt.Sprintf("Message (%d): Hello from %s", s.counter, s.host.ID().Pretty()))
        }
    }
}

func (s *Service) Echo(message string) {
    peers := s.host.Peerstore().Peers()
    var replies = make([]*Envelope, len(peers))

    errs := s.rpcClient.MultiCall(
        Ctxts(len(peers)),
        peers,
        EchoService,
        EchoServiceFuncEcho,
        Envelope{Message: message},
        CopyEnvelopesToIfaces(replies),
    )

    for i, err := range errs {
        if err != nil {
            fmt.Printf("Peer %s returned error: %-v\n", peers[i].Pretty(), err)
        } else {
            fmt.Printf("Peer %s echoed: %s\n", peers[i].Pretty(), replies[i].Message)
        }
    }
}

func (s *Service) ReceiveEcho(envelope Envelope) Envelope {
    return Envelope{Message: fmt.Sprintf("Peer %s echoing: %s", s.host.ID(), envelope.Message)}
}

func Ctxts(n int) []context.Context {
    ctxs := make([]context.Context, n)
    for i := 0; i < n; i++ {
        ctxs[i] = context.Background()
    }
    return ctxs
}

func CopyEnvelopesToIfaces(in []*Envelope) []interface{} {
    ifaces := make([]interface{}, len(in))
    for i := range in {
        in[i] = &Envelope{}
        ifaces[i] = in[i]
    }
    return ifaces
}
Enter fullscreen mode Exit fullscreen mode

E então é isto que temos construído: realizar uma chamada de procedimento remoto. Nas linhas 57 a 77, é feita uma chamada de procedimento remoto. A chamada é dirigida a todos os pares na PeerStore, o que inclui o próprio par. Neste caso, é feita uma chamada MultiCall.

O método MultiCall tem uma assinatura a que levei algum tempo para me acostumar. O primeiro argumento é uma fatia dos contextos, uma para cada par. O contexto é o primeiro parâmetro do método Echo que foi definido no EchoRPCAPI. O segundo argumento é a lista de pares sobre os quais a chamada deve ser realizada. O terceiro parâmetro é o nome do serviço que deve ser chamado, neste caso é o EchoRPCAPI que já foi definido na constante EchoService em rpc_api.go. O quarto argumento é o método que deve ser chamado, neste caso o método Echo como definido na constante EchoServiceFuncEcho. O quinto parâmetro é o parâmetro in do método Echo. Isto não é uma fatia, então isto significa que cada par receberá exatamente o mesmo valor. Se você quiser valores diferentes para pares diferentes, você precisa usar rpc.Server.Call ao invés de MultiCall e realizar uma chamada para cada par individualmente. O sexto e último parâmetro é para as respostas. O parâmetro aceita apenas uma fatia de interfaces que consiste em ponteiros para os objetos reais que no final irão conter as respostas. Os Ctxts e CopyEnvelopesToIfaces estão lá para ajudar a criar as estruturas de dados corretas para esses parâmetros. Esta é uma estratégia que eu encontrei no projeto ipfs-cluster. Ela também inclui uma função RPCDiscardReplies que é útil para fazer uma chamada MultiCall a um método rpc que tem que ser do tipo de resposta.

O método MultiCall retorna uma fatia que tem o comprimento exato do número de pares para os quais a chamada foi feita. Isto permite a iteração sobre a fatia de erros para verificar se algum deles retornou um erro. Há uma variedade de erros que podem ser devolvidos. Por exemplo, quando um par é inalcançável, ele retornará um dial Backoff erro. Quando a função Echo retorna um erro (ao invés do nil que é retornado agora), ele será um erro nesta fatia.

Ligando tudo isso

Com todas as peças montadas, é hora de montar as peças em main.go. As bandeiras da linha de comando são usadas para parametrizar a aplicação. Em seguida é criado um ´´host´´, o DHT é iniciado, o serviço com rpc é configurado e finalmente é iniciada a descoberta de pares e o envio de mensagens.

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "strings"
    "syscall"

    "github.com/libp2p/go-libp2p-core/host"
    "github.com/libp2p/go-libp2p-core/protocol"
    "github.com/multiformats/go-multiaddr"
)

func main() {
    config := Config{}

    flag.StringVar(&config.Rendezvous, "rendezvous", "ldej/echo", "")
    flag.Int64Var(&config.Seed, "seed", 0, "Seed value for generating a PeerID, 0 is random")
    flag.Var(&config.DiscoveryPeers, "peer", "Peer multiaddress for peer discovery")
    flag.StringVar(&config.ProtocolID, "protocolid", "/p2p/rpc/ldej", "")
    flag.IntVar(&config.Port, "port", 0, "")
    flag.Parse()

    ctx, cancel := context.WithCancel(context.Background())

    h, err := NewHost(ctx, config.Seed, config.Port)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Host ID: %s", h.ID().Pretty())
    log.Printf("Connect to me on:")
    for _, addr := range h.Addrs() {
        log.Printf("  %s/p2p/%s", addr, h.ID().Pretty())
    }

    dht, err := NewDHT(ctx, h, config.DiscoveryPeers)
    if err != nil {
        log.Fatal(err)
    }

    service := NewService(h, protocol.ID(config.ProtocolID))
    err = service.SetupRPC()
    if err != nil {
        log.Fatal(err)
    }

    go Discover(ctx, h, dht, config.Rendezvous)
    go service.StartMessaging(ctx)

    run(h, cancel)
}

func run(h host.Host, cancel func()) {
    c := make(chan os.Signal, 1)

    signal.Notify(c, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
    <-c

    fmt.Printf("\rExiting...\n")

    cancel()

    if err := h.Close(); err != nil {
        panic(err)
    }
    os.Exit(0)
}

type Config struct {
    Port           int
    ProtocolID     string
    Rendezvous     string
    Seed           int64
    DiscoveryPeers addrList
}

type addrList []multiaddr.Multiaddr

func (al *addrList) String() string {
    strs := make([]string, len(*al))
    for i, addr := range *al {
        strs[i] = addr.String()
    }
    return strings.Join(strs, ",")
}

func (al *addrList) Set(value string) error {
    addr, err := multiaddr.NewMultiaddr(value)
    if err != nil {
        return err
    }
    *al = append(*al, addr)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Executando a aplicação

$ git clone [email protected]:ldej/echo.git
$ go run .
2021/01/20 12:56:42 Host ID: QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:56:42 Connect to me on:
2021/01/20 12:56:42   /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:56:42   /ip4/127.0.0.1/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz

$ # open a second terminal
$ go run . -peer /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:57:45 Host ID: QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45 Connect to me on:
2021/01/20 12:57:45   /ip4/192.168.1.8/tcp/39957/p2p/QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45   /ip4/127.0.0.1/tcp/39957/p2p/QmSP59U51bSsERKobDE4CyrChJ4uSWv6RV1kiAs51DLLRF
2021/01/20 12:57:45 Connection established with bootstrap node: "{QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz: [/ip4/192.168.1.8/tcp/45363]}"

$ # open a third terminal
$ go run . -peer /ip4/192.168.1.8/tcp/45363/p2p/QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz
2021/01/20 12:59:06 Host ID: QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:59:06 Connect to me on:
2021/01/20 12:59:06   /ip4/192.168.1.8/tcp/42967/p2p/QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:59:06   /ip4/127.0.0.1/tcp/42967/p2p/QmPLsZDrgPLFie9PkvrdBbiMa8C5W9eKjZ429kimkP2SB8
2021/01/20 12:57:45 Connection established with bootstrap node: "{QmNpf6rQUFFTR9syqLASvzTsfDdBaUYvu3QkgVMXodyJUz: [/ip4/192.168.1.8/tcp/45363]}"
Enter fullscreen mode Exit fullscreen mode

Gerenciando os pares

Em um sistema ponto a ponto, nunca se sabe quando os pares partem ou quando eles ficam indisponíveis. A PeerStore se lembra dos colegas até que seu TTL (Time To Live) tenha expirado. No DHT, um par anuncia sua presença a cada 3 horas. Neste exemplo de aplicação, a saída dos pares não é gerenciada de forma alguma. Dependendo dos requisitos de sua aplicação, isto pode ser importante ou não.

Conclusão

E aí está, uma aplicação básica da libp2p que usa o Kademlia DHT para a descoberta de pares e que pode realizar chamadas usando rpc. No final, foi muito divertido descobrir e construir tudo isso, estou feliz com o resultado e vou usá-lo como base para criar aplicações mais descentralizadas.

Em um futuro post, vou dar uma olhada na implementação de relógios lógicos e talvez dê uma olhada nos algoritmos de consenso.

Artigo escrito por Laurence De Jong e traduzido para o português por Rafael Ojeda

Você pode ler o artigo original em inglês aqui.

Oldest comments (0)