WEB3DEV

Cover image for Aprenda a Construir sua Própria Blockchain Proof of Work com o Python
Panegali
Panegali

Posted on

Aprenda a Construir sua Própria Blockchain Proof of Work com o Python

Introdução

Desde 2010, a ideia de computação descentralizada, a matemática impressionante, os problemas únicos e as novas maneiras de resolvê-los me deixaram ansioso para descobrir tudo sobre essa nova tecnologia e realmente mergulhar nos detalhes para ver o que a faz funcionar.

Graças à facilidade de acesso do Python a coisas de baixo nível em C sem sobrecarga, é o ponto de entrada perfeito para quem deseja aprender o básico e construir sua própria blockchain.

Pré-Requisitos

Você precisará de pelo menos o Python v3 e o Pip v3 para que este tutorial funcione conforme o esperado, as instruções de instalação para ambos podem ser encontradas aqui.

Depois de instalado, você precisará obter essas bibliotecas usando o Pip antes de continuar.

pip install cryptography
pip install asyncio
pip install threading
pip install socketserver
pip install random
Enter fullscreen mode Exit fullscreen mode

Estrutura

Estaremos estabelecendo classes para a blockchain, as transações, os blocos, os mineradores, os nós e, mais importante, a rede p2p para que todos possam conversar entre si.

Há muito mais trabalho a ser feito para tornar esta cadeia totalmente viável e segura, mas para um ponto de aprendizado, ela oferece muito para construir a partir dela, estando próxima de ser um sistema de nível de produção, só precisa que você gaste tempo aprendendo, o que é a melhor parte!

Estrutura de Classe da Blockchain

Construindo Nossas Classes

Classes Blockchain

A classe blockchain rastreará a recompensa base, o ano base, reduzindo a frequência pela metade em anos e um número de dificuldade inicial. Vamos inicializá-lo com sua cadeia inicial, um array de transação, o validador de nó inicializado, um array de nós em branco e o nó de mineração atual.

Nossa classe calculará recompensas para o bloco, criará um bloco gênese na inicialização, terá a capacidade de adicionar transações, adicionar nós, consultar/atualizar saldos, resolver conflitos e verificar a validade dos blocos.

class Blockchain:
    BASE_REWARD = 50
    BASE_YEAR = 2023
    HALVING_FREQUENCY = 4
    DIFFICULTY = 1

    def __init__(self):
        self.chain = [self.create_genesis_block()]
        self.transactions = []
        self.nodes = []
        self.miner_node = Node(self.add_node())

    def calculate_reward(self):
        current_year = datetime.now().year
        elapsed_years = current_year - self.BASE_YEAR
        return self.BASE_REWARD / (2 ** (elapsed_years // self.HALVING_FREQUENCY))

    def create_genesis_block(self):
        return Block(0, time.time(), [], "0", 0)

    def add_transaction(self, transaction):
        self.transactions.append(transaction)

        sender = transaction.sender
        recipient = transaction.recipient
        amount = transaction.amount

        sender_balance = getattr(self.get_node_by_address(sender), 'balance', 0)
        recipient_balance = getattr(self.get_node_by_address(recipient), 'balance', 0)

        setattr(self.get_node_by_address(sender), 'balance', sender_balance - amount)
        setattr(self.get_node_by_address(recipient), 'balance', recipient_balance + amount)

        return len(self.chain) + 1

    def new_transaction(self, transaction):
        self.transactions.append(transaction)
        return len(self.chain) - 1

    def add_node(self, address=None):
        if address is None:
            node = Node(str(uuid.uuid4()))
            self.miner_node = node
            self.nodes.append(node)
            self.update_balances()  # Atualiza os saldos após adicionar um novo nó
            return node.address
        else:
            node = self.get_node_by_address(address)
            if node is None:
                node = Node(address)
                self.miner_node = node
                self.nodes.append(node)
                self.update_balances()  # Atualiza os saldos após adicionar um novo nó
                return node.address
            else:
                return node.address

    def get_balances(self):
        balances = {}
        for block in self.chain:
            for transaction in block.transactions:
                sender = transaction.sender
                recipient = transaction.recipient
                amount = transaction.amount

                balances[sender] = balances.get(sender, 0) - amount
                balances[recipient] = balances.get(recipient, 0) + amount

        # Atualiza saldos com base em transações pendentes
        for transaction in self.transactions:
            sender = transaction.sender
            recipient = transaction.recipient
            amount = transaction.amount

            balances[sender] = balances.get(sender, 0) - amount
            balances[recipient] = balances.get(recipient, 0) + amount

        return balances

    def update_balances(self):
        balances = self.get_balances()
        for node in self.nodes:
            address = node.address
            balance = balances.get(address, 0)
            setattr(node, 'balance', balance)

    def mine_block(self):
        try:
            last_block = self.chain[-1]
            index = last_block.index + 1
            timestamp = time.time()
            transactions = self.transactions.copy()
            reward = self.calculate_reward()

            transactions.append(Transaction(self.miner_node.address, self.miner_node.address, reward, "reward", miner_node=self.miner_node))
            previous_hash = last_block.hash_block()

            max_attempts = 1000
            nonce = 0
            while nonce < max_attempts:
                block = Block(index, timestamp, transactions, previous_hash, nonce)
                block_hash = block.hash_block()
                if block_hash[:self.DIFFICULTY] == '0' * self.DIFFICULTY and self.node_validator.validate_block(block, self.nodes):
                    self.chain.append(block)
                    self.transactions = []
                    return block
                nonce += 1

            return None

        except Exception as e:
            print("An error occurred while mining a new block:")
            print(str(e))
            traceback.print_exc()
        return None

    def get_node_by_address(self, address):
        for node in self.nodes:
            if node.address == address:
                return node
        return None

    def resolve_conflicts(self):
        longest_chain = None
        max_length = len(self.chain)

        for node in self.nodes:
            response = requests.get(f'http://{node.address}/blocks')

            if response.status_code == 200:
                length = response.json()['length']
                chain = response.json()['chain']

                if length > max_length and self.is_valid(chain):
                    max_length = length
                    longest_chain = chain

        if longest_chain:
            self.chain = longest_chain
            return True

        return False

    def is_valid(self, chain=None):
        if chain is None:
            chain = self.chain

        for i in range(1, len(chain)):
            current_block = chain[i]
            previous_block = chain[i - 1]
            if current_block.previous_hash != previous_block.hash_block():
                return False
        return True

    def __str__(self):
        return json.dumps([block.to_dict() for block in self.chain], indent=2)
Enter fullscreen mode Exit fullscreen mode

O segredo da nossa blockchain POW (prova de trabalho) é a dificuldade e a função mine_block. À medida que os blocos são extraídos, a dificuldade aumenta com o tempo. Resolvemos quaisquer conflitos no estado do bloco entre os nós usando a regra da cadeia mais longa e adicionamos uma camada extra de validação no método is_valid() para verificar novamente a integridade do bloco.

Classe Block (Bloco)

Colocaremos isso no arquivo blockchain.py, pois os blocos são contêineres simples e não requerem muita lógica pesada. Dados baseados em tempo, transações, hashes de bloco anterior e posterior, um nonce e uma assinatura de verificação é tudo o que precisamos.

Temos uma função hash_block que gera um hash SHA256 do dicionário de dados e o imprime no bloco antes de adicioná-lo à cadeia. Adicione esta classe acima de sua classe blockchain no mesmo arquivo chamado blockchain.py.

class Block:
    def __init__(self, index, timestamp, transactions, previous_hash, nonce, validations=None, signature=None):
        self.index = index
        self.timestamp = timestamp
        self.transactions = transactions
        self.previous_hash = previous_hash
        self.nonce = nonce
        self.validations = validations if validations else []
        self.signature = signature if signature else b''

    def hash_block(self):
        block_str = json.dumps(self.to_dict(), sort_keys=True)
        digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
        digest.update(block_str.encode())
        return digest.finalize().hex()

    def to_dict(self):
        return {
            "index": self.index,
            "timestamp": self.timestamp,
            "transactions": [transaction.to_dict() for transaction in self.transactions],
            "previous_hash": self.previous_hash,
            "nonce": self.nonce,
            "validations": self.validations,
            "signature": self.signature.hex() if self.signature else None
        }
Enter fullscreen mode Exit fullscreen mode

Classes Node (Nó)

Essas classes lidam com nossa assinatura e verificação de transações na rede. Crie um novo arquivo chamado node.py e cole o seguinte código para ativar nossa propagação de transação. Mas agora precisamos que as transações sejam transmitidas.

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes

class Node:
    def __init__(self, address=None):
        self.address = address if address else str(uuid.uuid4())
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        self.public_key = self.private_key.public_key()

    def sign(self, message):
        digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
        digest.update(message)
        signature = self.private_key.sign(
            digest.finalize(),
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return signature

    def verify(self, message, signature):
        digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
        digest.update(message)
        try:
            self.public_key.verify(
                signature,
                digest.finalize(),
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH,
                ),
                hashes.SHA256(),
            )
            return True
        except Exception:
            return False

class NodeValidator:
    def __init__(self):
        self.confirmations_needed = 20

    def validate_block(self, block, nodes):
        confirmations = 0
        for node in nodes:
            if self.is_block_approved_by_node(block, node):
                confirmations += 1
            if confirmations >= self.confirmations_needed:
                return True
        return False

    def is_block_approved_by_node(self, block, node):
        try:
            node.verify(block.hash_block().encode(), block.signature)
            return True
        except Exception:
            return False
Enter fullscreen mode Exit fullscreen mode

Classe Transaction (transação)

Cada txn na rede será uma representação dos dados desta classe. Crie um novo arquivo transaction.py e cole o seguinte. Essa classe é o controlador funcional que outros métodos chamam para validar a integridade e a saúde da transação.

class Transaction:
    def __init__(self, sender, recipient, amount, data, signature=None, miner_node=None):
        if sender is None or recipient is None or amount is None or data is None:
            raise ValueError("Invalid transaction parameters")
        self.sender = sender
        self.recipient = recipient
        self.amount = float(amount)
        self.data = data
        self.signature = signature if signature else b''
        self.miner_node = miner_node if miner_node else None

    def to_dict(self):
        return {
            "sender": self.sender,
            "recipient": self.recipient,
            "amount": self.amount,
            "data": self.data,
            "signature": self.signature.hex() if self.signature else None,
        }

    def sign(self):
        sender_bytes = self.sender if isinstance(self.sender, bytes) else self.sender.encode()
        recipient_bytes = self.recipient if isinstance(self.recipient, bytes) else self.recipient.encode()
        amount_bytes = str(self.amount).encode()
        data_bytes = self.data if isinstance(self.data, bytes) else self.data.encode()

        message = sender_bytes + recipient_bytes + amount_bytes + data_bytes
        self.signature = self.miner_node.sign(message)

    def verify(self):
        sender_bytes = self.sender if isinstance(self.sender, bytes) else self.sender.encode()
        recipient_bytes = self.recipient if isinstance(self.recipient, bytes) else self.recipient.encode()
        amount_bytes = str(self.amount).encode()
        data_bytes = self.data if isinstance(self.data, bytes) else self.data.encode()

        message = sender_bytes + recipient_bytes + amount_bytes + data_bytes
        return self.miner_node.verify(message, self.signature)
Enter fullscreen mode Exit fullscreen mode

Classe P2P

A mágica de tudo isso é a capacidade de cada nó e minerador na rede se comunicarem, desde que tenham acesso TCP/IP. Crie o arquivo p2p.py e cole o seguinte código para ativar a rede.

Isso precisará de mais para ser utilizável em um cenário de rede cruzada do mundo real, portanto, observe a descoberta de pares na internet e os desafios que você precisará superar para tornar esta classe utilizável no mundo real.

import asyncio
import time
import threading
from socketserver import BaseRequestHandler, ThreadingTCPServer
from transaction import Transaction

class P2PRequestHandler(BaseRequestHandler):
    MAX_REQUESTS_PER_MINUTE = 1000

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.requests = 0
        self.reset_time = time.time()

    def handle(self):
        try:
            self.requests += 1
            current_time = time.time()
            if current_time - self.reset_time > 60:
                self.requests = 0
                self.reset_time = current_time

            if self.requests > self.MAX_REQUESTS_PER_MINUTE:
                raise Exception("Too many requests")

            request = self.request.recv(1024).decode()
            response = asyncio.run(self.node.handle_request(request))
            return response.encode()

        except Exception as e:
            error_message = f"An error occurred: {str(e)}"
            traceback_str = traceback.format_exc()
            print(error_message)
            print(traceback_str)
            return f"HTTP/1.1 500 Internal Server Error\r\n\r\n{error_message}\n{traceback_str}".encode()

class P2PNode:
    def __init__(self, blockchain):
        self.peers = []
        self.server_address = ('', 5000)
        self.running = False
        self.blockchain = blockchain
        self.server = None

    async def handle_request(self, request):
        try:
            request_str = request.decode()
            print("Received request:", request_str)

            response = await self.process_request(request_str)
            return response.encode()

        except Exception as e:
            print("An error occurred in the request handler:")
            print(str(e))
            traceback.print_exc()
            return "HTTP/1.1 500 Internal Server Error\r\n\r\n".encode()

    async def process_request(self, request):
        method, *headers_and_body = request.split('\r\n\r\n')
        headers = headers_and_body[0]
        body = headers_and_body[1] if len(headers_and_body) > 1 else ''
        path = headers.split(' ')[1]

        if method == 'POST':
            if path == '/transactions/new':
                return await self.new_transaction(body)
            elif path == '/mine':
                return await self.mine()
            elif path == '/blocks':
                return await self.full_chain()
            elif path == '/peers/new':
                return await self.add_peer(body)
        elif method == 'GET' and path == '/blocks':
            return await self.full_chain()

        return "HTTP/1.1 404 Not Found\r\n\r\n"

    async def start(self):
        try:
            self.running = True
            print("Server started.")

            self.server = ThreadingTCPServer(self.server_address, P2PRequestHandler)
            self.server.node = self

            server_thread = threading.Thread(target=self.server.serve_forever)
            server_thread.start()

        except Exception as e:
            print("An error occurred in the server:", str(e))
Enter fullscreen mode Exit fullscreen mode

Main (principal)

Crie um arquivo chamado main.py que usaremos para testar a cadeia e suas diversas funções.

import threading
import asyncio
import time
import random
from blockchain import Blockchain
from transaction import Transaction
from p2p import P2PNode

try:
    blockchain = Blockchain()
    print("Blockchain created")

    # Inicia 25 nós e financie-os com 100 moedas cada
    nodes = []
    miner_address = blockchain.add_node()  # Add the miner's address
    for _ in range(25):
        address = blockchain.add_node()
        nodes.append(address)
        transaction = Transaction(miner_address, address, 100, "Initial funds", miner_node=blockchain.miner_node)
        transaction.sign()
        blockchain.new_transaction(transaction)

    print("25 nodes created and funded.")

    # Imprime saldos de todos os nós
    balances = blockchain.get_balances()
    print("Balances after funding:")
    for address, balance in balances.items():
        print(f"{address}: {balance}")

    async def start_server():
        node = P2PNode(blockchain)
        await node.start()

    thread = threading.Thread(target=lambda: asyncio.run(start_server()))
    thread.start()

    time.sleep(1)  # Aguardar a sincronização dos nós

    # Cada nó envia uma transação para três destinatários aleatórios
    for sender in nodes:
        recipient = random.choice(nodes)
        if recipient != sender:
            sender_address = sender
            recipient_address = recipient
            sender_balance = blockchain.get_balances().get(sender_address, 0)
            print("Sender balance:", sender_balance)
            if sender_balance >= 1:  # Verifica se o remetente tem fundos suficientes
                print("Sender:", sender_address)
                print("Recipient:", recipient_address)
                transaction = Transaction(sender_address, recipient_address, 1, "Transaction", miner_node=blockchain.miner_node)
                transaction.sign()
                try:
                    blockchain.new_transaction(transaction)
                except Exception as e:
                    print("An error occurred:", str(e))
                    traceback.print_exc()
            else:
                print("Sender does not have sufficient funds to send the transaction.")

    # Inicia a mineração até que três blocos sejam minerados
    for _ in range(3):
        block = blockchain.mine_block()
        if block:
            print("New block mined successfully.")
            print(f"Block hash: {block.hash_block()}")
        else:
            print("Failed to mine a new block.")
            break

    print("Three blocks mined successfully.")

    # Imprime saldos de todos os nós
    balances = blockchain.get_balances()
    print("Balances after all transactions:")
    for address, balance in balances.items():
        print(f"{address}: {balance}")

    print("Printing the blockchain:")
    print(blockchain)

    # Começa minerar até que as recompensas do minerador atinjam 20 moedas
    while blockchain.get_balances()[blockchain.miner_node.address] < 20:
        block = blockchain.mine_block()
        if block:
            print("New block mined successfully.")
            print(f"Block hash: {block.hash_block()}")
        else:
            print("Failed to mine a new block.")
            break

    print("Miner rewards reached 20 coins.")

    # Envia 0,5 moeda para cada nó do minerador
    for recipient in nodes:
        transaction = Transaction(blockchain.miner_node.address, str(recipient), 0.5, "Reward", miner_node=blockchain.miner_node)
        transaction.sign()
        blockchain.new_transaction(transaction)

    print("Reward transactions sent to nodes.")

    time.sleep(1)  # Aguarda o processamento das transações

    # Imprime saldos de todos os nós
    balances = blockchain.get_balances()
    print("Balances after all transactions:")
    for address, balance in balances.items():
        print(f"{address}: {balance}")

    print("Printing the blockchain:")
    print(blockchain)

except Exception as e:
    print(f"An error occurred: {e}")
Enter fullscreen mode Exit fullscreen mode

Teste

Abra uma janela de terminal como um usuário administrador e digite return para que a cadeia seja ativada e você a veja em ação!

python main.py
Enter fullscreen mode Exit fullscreen mode

Blockchain enviando transações

Conclusão

Este é um ponto de partida básico para uma blockchain e não possui as camadas extras de segurança e validação para que ela esteja pronta para produção, mas é um ponto de partida fantástico para qualquer um aprender!

A fonte completa pode ser encontrada aqui.


Artigo escrito por Robert McMenemy. Traduzido por Marcelo Panegali.

Oldest comments (0)