This commit is contained in:
2025-05-23 16:10:49 +04:00
parent 3f01074e3f
commit 29d0d25a3b
133 changed files with 346 additions and 168 deletions

View File

@@ -0,0 +1,109 @@
# Mycelium Client
A Go client for the Mycelium overlay network. This package allows you to connect to a Mycelium node via its HTTP API and perform operations like sending/receiving messages and managing peers.
## Features
- Send and receive messages through the Mycelium network
- List, add, and remove peers
- View network routes
- Query node information
- Reply to received messages
- Check message status
## Usage
### Basic Client Usage
```go
// Create a new client with default configuration (localhost:8989)
client := mycelium_client.NewClient("")
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Get node info
info, err := client.GetNodeInfo(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Node subnet: %s\n", info.NodeSubnet)
// List peers
peers, err := client.ListPeers(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Found %d peers\n", len(peers))
// Send a message
dest := mycelium_client.MessageDestination{
PK: "publicKeyHexString", // or IP: "myceliumIPv6Address"
}
payload := []byte("Hello from mycelium client!")
waitForReply := false
replyTimeout := 0 // not used when waitForReply is false
_, msgID, err := client.SendMessage(ctx, dest, payload, "example.topic", waitForReply, replyTimeout)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Message sent with ID: %s\n", msgID)
// Receive a message with 10 second timeout
msg, err := client.ReceiveMessage(ctx, 10, "", false)
if err != nil {
log.Fatal(err)
}
if msg != nil {
payload, _ := msg.Decode()
fmt.Printf("Received message: %s\n", string(payload))
}
```
### Command Line Tool
The package includes a command-line tool for interacting with a Mycelium node:
```
Usage: mycelium-client [flags] COMMAND [args...]
Flags:
-api string
Mycelium API URL (default "http://localhost:8989")
-json
Output in JSON format
-timeout int
Client timeout in seconds (default 30)
Commands:
info Get node information
peers List connected peers
add-peer ENDPOINT Add a new peer
del-peer ENDPOINT Remove a peer
send [--pk=PK|--ip=IP] [--topic=TOPIC] [--wait] [--reply-timeout=N] MESSAGE
Send a message to a destination
receive [--topic=TOPIC] [--timeout=N]
Receive a message
reply ID [--topic=TOPIC] MESSAGE
Reply to a message
status ID Get status of a sent message
routes [selected|fallback] List routes (default: selected)
```
## Building the Command Line Tool
```bash
cd pkg/mycelium_client/cmd
go build -o mycelium-client
```
## Examples
See the `examples` directory for full usage examples.
## Notes
- This client requires a running Mycelium node accessible via HTTP API.
- The default API endpoint is http://localhost:8989.
- Messages are automatically encoded/decoded from base64 when working with the API.

View File

@@ -0,0 +1,428 @@
// pkg/mycelium_client/client.go
package mycelium_client
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
)
// DefaultAPIPort is the default port on which the Mycelium HTTP API listens
const DefaultAPIPort = 8989
// Default timeout values
const (
DefaultClientTimeout = 30 * time.Second
DefaultReplyTimeout = 60 // seconds
DefaultReceiveWait = 10 // seconds
)
// MyceliumClient represents a client for interacting with the Mycelium API
type MyceliumClient struct {
BaseURL string
HTTPClient *http.Client
}
// NewClient creates a new Mycelium client with the given base URL
// If baseURL is empty, it defaults to "http://localhost:8989"
func NewClient(baseURL string) *MyceliumClient {
if baseURL == "" {
baseURL = fmt.Sprintf("http://localhost:%d", DefaultAPIPort)
}
return &MyceliumClient{
BaseURL: baseURL,
HTTPClient: &http.Client{Timeout: DefaultClientTimeout},
}
}
// SetTimeout sets the HTTP client timeout
func (c *MyceliumClient) SetTimeout(timeout time.Duration) {
c.HTTPClient.Timeout = timeout
}
// Message Structures
// MessageDestination represents a destination for a message, either by IP or public key
type MessageDestination struct {
IP string `json:"ip,omitempty"` // IPv6 address in the overlay network
PK string `json:"pk,omitempty"` // Public key hex encoded
}
// PushMessage represents a message to be sent
type PushMessage struct {
Dst MessageDestination `json:"dst"`
Topic string `json:"topic,omitempty"`
Payload string `json:"payload"` // Base64 encoded
}
// InboundMessage represents a received message
type InboundMessage struct {
ID string `json:"id"`
SrcIP string `json:"srcIp"`
SrcPK string `json:"srcPk"`
DstIP string `json:"dstIp"`
DstPK string `json:"dstPk"`
Topic string `json:"topic,omitempty"`
Payload string `json:"payload"` // Base64 encoded
}
// MessageResponse represents the ID of a pushed message
type MessageResponse struct {
ID string `json:"id"`
}
// NodeInfo represents general information about the Mycelium node
type NodeInfo struct {
NodeSubnet string `json:"nodeSubnet"`
}
// PeerStats represents statistics about a peer
type PeerStats struct {
Endpoint Endpoint `json:"endpoint"`
Type string `json:"type"` // static, inbound, linkLocalDiscovery
ConnectionState string `json:"connectionState"` // alive, connecting, dead
TxBytes int64 `json:"txBytes,omitempty"`
RxBytes int64 `json:"rxBytes,omitempty"`
}
// Endpoint represents connection information for a peer
type Endpoint struct {
Proto string `json:"proto"` // tcp, quic
SocketAddr string `json:"socketAddr"` // IP:port
}
// Route represents a network route
type Route struct {
Subnet string `json:"subnet"`
NextHop string `json:"nextHop"`
Metric interface{} `json:"metric"` // Can be int or string "infinite"
Seqno int `json:"seqno"`
}
// Decode decodes the base64 payload of an inbound message
func (m *InboundMessage) Decode() ([]byte, error) {
return base64.StdEncoding.DecodeString(m.Payload)
}
// GetNodeInfo retrieves general information about the Mycelium node
func (c *MyceliumClient) GetNodeInfo(ctx context.Context) (*NodeInfo, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/api/v1/admin", nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var info NodeInfo
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, err
}
return &info, nil
}
// SendMessage sends a message to a specified destination
// If waitForReply is true, it will wait for a reply up to the specified timeout
func (c *MyceliumClient) SendMessage(ctx context.Context, dst MessageDestination, payload []byte, topic string, waitForReply bool, replyTimeout int) (*InboundMessage, string, error) {
// Encode payload to base64
encodedPayload := base64.StdEncoding.EncodeToString(payload)
msg := PushMessage{
Dst: dst,
Topic: topic,
Payload: encodedPayload,
}
reqBody, err := json.Marshal(msg)
if err != nil {
return nil, "", err
}
// Build URL with optional reply_timeout
url := fmt.Sprintf("%s/api/v1/messages", c.BaseURL)
if waitForReply && replyTimeout > 0 {
url = fmt.Sprintf("%s?reply_timeout=%d", url, replyTimeout)
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
if err != nil {
return nil, "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
// Check for error status codes
if resp.StatusCode >= 400 {
body, _ := ioutil.ReadAll(resp.Body)
return nil, "", fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
// If we got a reply (status 200)
if resp.StatusCode == http.StatusOK && waitForReply {
var reply InboundMessage
if err := json.NewDecoder(resp.Body).Decode(&reply); err != nil {
return nil, "", err
}
return &reply, "", nil
}
// If we just got a message ID (status 201)
var result MessageResponse
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, "", err
}
return nil, result.ID, nil
}
// ReplyToMessage sends a reply to a previously received message
func (c *MyceliumClient) ReplyToMessage(ctx context.Context, msgID string, payload []byte, topic string) error {
encodedPayload := base64.StdEncoding.EncodeToString(payload)
msg := PushMessage{
Dst: MessageDestination{}, // Not needed for replies
Topic: topic,
Payload: encodedPayload,
}
reqBody, err := json.Marshal(msg)
if err != nil {
return err
}
url := fmt.Sprintf("%s/api/v1/messages/reply/%s", c.BaseURL, msgID)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// ReceiveMessage waits for and receives a message, optionally filtering by topic
// If timeout is 0, it will return immediately if no message is available
func (c *MyceliumClient) ReceiveMessage(ctx context.Context, timeout int, topic string, peek bool) (*InboundMessage, error) {
params := url.Values{}
if timeout > 0 {
params.Add("timeout", fmt.Sprintf("%d", timeout))
}
if topic != "" {
params.Add("topic", topic)
}
if peek {
params.Add("peek", "true")
}
url := fmt.Sprintf("%s/api/v1/messages?%s", c.BaseURL, params.Encode())
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// No message available
if resp.StatusCode == http.StatusNoContent {
return nil, nil
}
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var msg InboundMessage
if err := json.NewDecoder(resp.Body).Decode(&msg); err != nil {
return nil, err
}
return &msg, nil
}
// GetMessageStatus checks the status of a previously sent message
func (c *MyceliumClient) GetMessageStatus(ctx context.Context, msgID string) (map[string]interface{}, error) {
url := fmt.Sprintf("%s/api/v1/messages/status/%s", c.BaseURL, msgID)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var status map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
return nil, err
}
return status, nil
}
// ListPeers retrieves a list of known peers
func (c *MyceliumClient) ListPeers(ctx context.Context) ([]PeerStats, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/api/v1/admin/peers", nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var peers []PeerStats
if err := json.NewDecoder(resp.Body).Decode(&peers); err != nil {
return nil, err
}
return peers, nil
}
// AddPeer adds a new peer to the network
func (c *MyceliumClient) AddPeer(ctx context.Context, endpoint string) error {
// The API expects a direct endpoint string, not a JSON object
reqBody := []byte(endpoint)
req, err := http.NewRequestWithContext(ctx, "POST", c.BaseURL+"/api/v1/admin/peers", bytes.NewBuffer(reqBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "text/plain")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// RemovePeer removes a peer from the network
func (c *MyceliumClient) RemovePeer(ctx context.Context, endpoint string) error {
url := fmt.Sprintf("%s/api/v1/admin/peers/%s", c.BaseURL, url.PathEscape(endpoint))
req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil)
if err != nil {
return err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
body, _ := ioutil.ReadAll(resp.Body)
return fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// ListSelectedRoutes retrieves a list of selected routes
func (c *MyceliumClient) ListSelectedRoutes(ctx context.Context) ([]Route, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/api/v1/admin/routes/selected", nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var routes []Route
if err := json.NewDecoder(resp.Body).Decode(&routes); err != nil {
return nil, err
}
return routes, nil
}
// ListFallbackRoutes retrieves a list of fallback routes
func (c *MyceliumClient) ListFallbackRoutes(ctx context.Context) ([]Route, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/api/v1/admin/routes/fallback", nil)
if err != nil {
return nil, err
}
resp, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("API returned status %d: %s", resp.StatusCode, string(body))
}
var routes []Route
if err := json.NewDecoder(resp.Body).Decode(&routes); err != nil {
return nil, err
}
return routes, nil
}

View File

@@ -0,0 +1,414 @@
// pkg/mycelium_client/cmd/main.go
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"git.ourworld.tf/herocode/heroagent/pkg/mycelium_client"
)
type config struct {
baseURL string
command string
peerEndpoint string
message string
destination string
topic string
timeout int
wait bool
replyTimeout int
messageID string
outputJSON bool
}
// Commands
const (
cmdInfo = "info"
cmdPeers = "peers"
cmdAddPeer = "add-peer"
cmdDelPeer = "del-peer"
cmdSend = "send"
cmdReceive = "receive"
cmdReply = "reply"
cmdStatus = "status"
cmdRoutes = "routes"
)
func main() {
// Create config with default values
cfg := config{
baseURL: fmt.Sprintf("http://localhost:%d", mycelium_client.DefaultAPIPort),
timeout: 30,
replyTimeout: mycelium_client.DefaultReplyTimeout,
}
// Parse command line flags
flag.StringVar(&cfg.baseURL, "api", cfg.baseURL, "Mycelium API URL")
flag.IntVar(&cfg.timeout, "timeout", cfg.timeout, "Client timeout in seconds")
flag.BoolVar(&cfg.outputJSON, "json", false, "Output in JSON format")
flag.Parse()
// Get the command
args := flag.Args()
if len(args) == 0 {
printUsage()
os.Exit(1)
}
cfg.command = args[0]
args = args[1:]
// Create client
client := mycelium_client.NewClient(cfg.baseURL)
client.SetTimeout(time.Duration(cfg.timeout) * time.Second)
// Create context with cancellation for graceful shutdowns
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\nReceived interrupt signal, shutting down...")
cancel()
}()
// Execute command
var err error
switch cfg.command {
case cmdInfo:
err = showNodeInfo(ctx, client)
case cmdPeers:
err = listPeers(ctx, client, cfg.outputJSON)
case cmdAddPeer:
if len(args) < 1 {
fmt.Println("Missing peer endpoint argument")
printUsage()
os.Exit(1)
}
cfg.peerEndpoint = args[0]
err = addPeer(ctx, client, cfg.peerEndpoint)
case cmdDelPeer:
if len(args) < 1 {
fmt.Println("Missing peer endpoint argument")
printUsage()
os.Exit(1)
}
cfg.peerEndpoint = args[0]
err = removePeer(ctx, client, cfg.peerEndpoint)
case cmdSend:
parseMessageArgs(&cfg, args)
err = sendMessage(ctx, client, cfg)
case cmdReceive:
parseReceiveArgs(&cfg, args)
err = receiveMessage(ctx, client, cfg)
case cmdReply:
parseReplyArgs(&cfg, args)
err = replyToMessage(ctx, client, cfg)
case cmdStatus:
if len(args) < 1 {
fmt.Println("Missing message ID argument")
printUsage()
os.Exit(1)
}
cfg.messageID = args[0]
err = getMessageStatus(ctx, client, cfg.messageID)
case cmdRoutes:
var routeType string
if len(args) > 0 {
routeType = args[0]
}
err = listRoutes(ctx, client, routeType, cfg.outputJSON)
default:
fmt.Printf("Unknown command: %s\n", cfg.command)
printUsage()
os.Exit(1)
}
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func printUsage() {
fmt.Println("Usage: mycelium-client [flags] COMMAND [args...]")
fmt.Println("\nFlags:")
flag.PrintDefaults()
fmt.Println("\nCommands:")
fmt.Println(" info Get node information")
fmt.Println(" peers List connected peers")
fmt.Println(" add-peer ENDPOINT Add a new peer")
fmt.Println(" del-peer ENDPOINT Remove a peer")
fmt.Println(" send [--pk=PK|--ip=IP] [--topic=TOPIC] [--wait] [--reply-timeout=N] MESSAGE")
fmt.Println(" Send a message to a destination")
fmt.Println(" receive [--topic=TOPIC] [--timeout=N]")
fmt.Println(" Receive a message")
fmt.Println(" reply ID [--topic=TOPIC] MESSAGE")
fmt.Println(" Reply to a message")
fmt.Println(" status ID Get status of a sent message")
fmt.Println(" routes [selected|fallback] List routes (default: selected)")
}
func parseMessageArgs(cfg *config, args []string) {
// Create a temporary flag set
fs := flag.NewFlagSet("send", flag.ExitOnError)
fs.StringVar(&cfg.destination, "pk", "", "Destination public key (hex encoded)")
fs.StringVar(&cfg.destination, "ip", "", "Destination IP address")
fs.StringVar(&cfg.topic, "topic", "", "Message topic")
fs.BoolVar(&cfg.wait, "wait", false, "Wait for reply")
fs.IntVar(&cfg.replyTimeout, "reply-timeout", cfg.replyTimeout, "Reply timeout in seconds")
// Parse args
fs.Parse(args)
// Remaining args are the message
remainingArgs := fs.Args()
if len(remainingArgs) == 0 {
fmt.Println("Missing message content")
printUsage()
os.Exit(1)
}
cfg.message = strings.Join(remainingArgs, " ")
}
func parseReceiveArgs(cfg *config, args []string) {
// Create a temporary flag set
fs := flag.NewFlagSet("receive", flag.ExitOnError)
fs.StringVar(&cfg.topic, "topic", "", "Message topic filter")
fs.IntVar(&cfg.timeout, "timeout", 10, "Receive timeout in seconds")
// Parse args
fs.Parse(args)
}
func parseReplyArgs(cfg *config, args []string) {
if len(args) < 1 {
fmt.Println("Missing message ID argument")
printUsage()
os.Exit(1)
}
cfg.messageID = args[0]
args = args[1:]
// Create a temporary flag set
fs := flag.NewFlagSet("reply", flag.ExitOnError)
fs.StringVar(&cfg.topic, "topic", "", "Message topic")
// Parse args
fs.Parse(args)
// Remaining args are the message
remainingArgs := fs.Args()
if len(remainingArgs) == 0 {
fmt.Println("Missing reply message content")
printUsage()
os.Exit(1)
}
cfg.message = strings.Join(remainingArgs, " ")
}
func showNodeInfo(ctx context.Context, client *mycelium_client.MyceliumClient) error {
info, err := client.GetNodeInfo(ctx)
if err != nil {
return err
}
fmt.Println("Node Information:")
fmt.Printf(" Subnet: %s\n", info.NodeSubnet)
return nil
}
func listPeers(ctx context.Context, client *mycelium_client.MyceliumClient, jsonOutput bool) error {
peers, err := client.ListPeers(ctx)
if err != nil {
return err
}
if jsonOutput {
// TODO: Output JSON
fmt.Printf("Found %d peers\n", len(peers))
} else {
fmt.Printf("Connected Peers (%d):\n", len(peers))
if len(peers) == 0 {
fmt.Println(" No peers connected")
return nil
}
for i, peer := range peers {
fmt.Printf(" %d. %s://%s\n", i+1, peer.Endpoint.Proto, peer.Endpoint.SocketAddr)
fmt.Printf(" Type: %s, State: %s\n", peer.Type, peer.ConnectionState)
if peer.TxBytes > 0 || peer.RxBytes > 0 {
fmt.Printf(" TX: %d bytes, RX: %d bytes\n", peer.TxBytes, peer.RxBytes)
}
}
}
return nil
}
func addPeer(ctx context.Context, client *mycelium_client.MyceliumClient, endpoint string) error {
if err := client.AddPeer(ctx, endpoint); err != nil {
return err
}
fmt.Printf("Peer added: %s\n", endpoint)
return nil
}
func removePeer(ctx context.Context, client *mycelium_client.MyceliumClient, endpoint string) error {
if err := client.RemovePeer(ctx, endpoint); err != nil {
return err
}
fmt.Printf("Peer removed: %s\n", endpoint)
return nil
}
func sendMessage(ctx context.Context, client *mycelium_client.MyceliumClient, cfg config) error {
var dst mycelium_client.MessageDestination
if cfg.destination == "" {
return fmt.Errorf("destination is required (--pk or --ip)")
}
// Determine destination type
if strings.HasPrefix(cfg.destination, "--pk=") {
dst.PK = strings.TrimPrefix(cfg.destination, "--pk=")
} else if strings.HasPrefix(cfg.destination, "--ip=") {
dst.IP = strings.TrimPrefix(cfg.destination, "--ip=")
} else {
// Try to guess format
if strings.Contains(cfg.destination, ":") {
dst.IP = cfg.destination
} else {
dst.PK = cfg.destination
}
}
// Send message
payload := []byte(cfg.message)
reply, id, err := client.SendMessage(ctx, dst, payload, cfg.topic, cfg.wait, cfg.replyTimeout)
if err != nil {
return err
}
if reply != nil {
fmt.Println("Received reply:")
printMessage(reply)
} else {
fmt.Printf("Message sent successfully. ID: %s\n", id)
}
return nil
}
func receiveMessage(ctx context.Context, client *mycelium_client.MyceliumClient, cfg config) error {
fmt.Printf("Waiting for message (timeout: %d seconds)...\n", cfg.timeout)
msg, err := client.ReceiveMessage(ctx, cfg.timeout, cfg.topic, false)
if err != nil {
return err
}
if msg == nil {
fmt.Println("No message received within timeout")
return nil
}
fmt.Println("Message received:")
printMessage(msg)
return nil
}
func replyToMessage(ctx context.Context, client *mycelium_client.MyceliumClient, cfg config) error {
if err := client.ReplyToMessage(ctx, cfg.messageID, []byte(cfg.message), cfg.topic); err != nil {
return err
}
fmt.Printf("Reply sent to message ID: %s\n", cfg.messageID)
return nil
}
func getMessageStatus(ctx context.Context, client *mycelium_client.MyceliumClient, messageID string) error {
status, err := client.GetMessageStatus(ctx, messageID)
if err != nil {
return err
}
fmt.Printf("Message Status (ID: %s):\n", messageID)
for k, v := range status {
fmt.Printf(" %s: %v\n", k, v)
}
return nil
}
func listRoutes(ctx context.Context, client *mycelium_client.MyceliumClient, routeType string, jsonOutput bool) error {
var routes []mycelium_client.Route
var err error
// Default to selected routes
if routeType == "" || routeType == "selected" {
routes, err = client.ListSelectedRoutes(ctx)
if err != nil {
return err
}
fmt.Printf("Selected Routes (%d):\n", len(routes))
} else if routeType == "fallback" {
routes, err = client.ListFallbackRoutes(ctx)
if err != nil {
return err
}
fmt.Printf("Fallback Routes (%d):\n", len(routes))
} else {
return fmt.Errorf("unknown route type: %s (use 'selected' or 'fallback')", routeType)
}
if jsonOutput {
// TODO: Output JSON
fmt.Printf("Found %d routes\n", len(routes))
} else {
if len(routes) == 0 {
fmt.Println(" No routes found")
return nil
}
for i, route := range routes {
fmt.Printf(" %d. Subnet: %s\n", i+1, route.Subnet)
fmt.Printf(" Next Hop: %s\n", route.NextHop)
fmt.Printf(" Metric: %v, Sequence: %d\n", route.Metric, route.Seqno)
}
}
return nil
}
func printMessage(msg *mycelium_client.InboundMessage) {
payload, err := msg.Decode()
fmt.Printf(" ID: %s\n", msg.ID)
fmt.Printf(" From: %s (IP: %s)\n", msg.SrcPK, msg.SrcIP)
fmt.Printf(" To: %s (IP: %s)\n", msg.DstPK, msg.DstIP)
if msg.Topic != "" {
fmt.Printf(" Topic: %s\n", msg.Topic)
}
if err != nil {
fmt.Printf(" Payload (base64): %s\n", msg.Payload)
fmt.Printf(" Error decoding payload: %v\n", err)
} else {
fmt.Printf(" Payload: %s\n", string(payload))
}
}

View File

@@ -0,0 +1,95 @@
// pkg/mycelium_client/examples/basic_usage.go
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"git.ourworld.tf/herocode/heroagent/pkg/mycelium_client"
)
func main() {
// Create a new client with default configuration (localhost:8989)
client := mycelium_client.NewClient("")
// Set a custom timeout if needed
client.SetTimeout(60 * time.Second)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Example 1: Get node info
fmt.Println("Getting node info...")
info, err := client.GetNodeInfo(ctx)
if err != nil {
log.Printf("Failed to get node info: %v", err)
} else {
fmt.Printf("Node subnet: %s\n", info.NodeSubnet)
}
// Example 2: List peers
fmt.Println("\nListing peers...")
peers, err := client.ListPeers(ctx)
if err != nil {
log.Printf("Failed to list peers: %v", err)
} else {
fmt.Printf("Found %d peers:\n", len(peers))
for i, peer := range peers {
fmt.Printf(" %d. %s://%s (%s)\n",
i+1,
peer.Endpoint.Proto,
peer.Endpoint.SocketAddr,
peer.ConnectionState)
}
}
// Example 3: Send a message (if there are peers)
if len(os.Args) > 1 && os.Args[1] == "send" {
fmt.Println("\nSending a message...")
// In a real application, you would get this from the peer
// This is just a placeholder public key
dest := mycelium_client.MessageDestination{
PK: "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32",
}
payload := []byte("Hello from mycelium client!")
topic := "exampletopic"
// Send without waiting for reply
_, msgID, err := client.SendMessage(ctx, dest, payload, topic, false, 0)
if err != nil {
log.Printf("Failed to send message: %v", err)
} else {
fmt.Printf("Message sent with ID: %s\n", msgID)
}
}
// Example 4: Receive a message (with a short timeout)
if len(os.Args) > 1 && os.Args[1] == "receive" {
fmt.Println("\nWaiting for a message (5 seconds)...")
receiveCtx, receiveCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer receiveCancel()
msg, err := client.ReceiveMessage(receiveCtx, 5, "", false)
if err != nil {
log.Printf("Error receiving message: %v", err)
} else if msg == nil {
fmt.Println("No message received within timeout")
} else {
payload, err := msg.Decode()
if err != nil {
log.Printf("Failed to decode message payload: %v", err)
} else {
fmt.Printf("Received message (ID: %s):\n", msg.ID)
fmt.Printf(" From: %s\n", msg.SrcPK)
fmt.Printf(" Topic: %s\n", msg.Topic)
fmt.Printf(" Payload: %s\n", string(payload))
}
}
}
}