or: How I Learned to Stop Worrying and Love Go Channels

Prelude is an adversary simulation company, run by a few former MITRE CALDERA devs, with a mostly-open-source autonomous red-team platform aimed at disrupting a cost prohibitive security solutions market.

We’re going to look at Prelude Operator’s GoLang agent Pneuma, my fork of Pneuma called P2Pneuma that supports multiple simultaneous async network transports, optional P2P messaging, and the problems friends I made along the way. This post won’t explain the basics of Operator so if you’re new to it then start here.

Full disclosure: I use the Operator Enterprise at dayjob but I’m writing this post for funsies and 3 or 4 likes so I can show my very patient partner I’m doing numbers on twitter dot com.

Pneuma

Pneuma is Operator’s default agent, written in GoLang.

A single network transport, defined at run-time by a command switch, determines how the agent beacons to Operator. The beacon receives some tasks, executes them as shell commands or binary payloads, and returns the result back to Operator. Currently, supported protocols are; TCP, UDP, gRPC, and HTTP. The agent can use one transport at a time and while you can update the port and host, the transport protocol itself cannot be swapped out during run-time.

This is completely fine but it needs some adjustments if we’re going to implement P2P support.

Design goals

Adding P2P networking, specifically via IPFS libp2p, to a C2 framework is something I’ve been meaning to write-up for a while.

Operator doesn’t support IPFS (although someone could implement an IPFS redirector), so P2Pneuma needs to support both libp2p networking and one or more of the built-in transports, like HTTP. We’re going to use libp2p for a few things; to forward beacons between nodes (agents), to relay beacons from nodes to Operator (via TCP/HTTP/whatever transport), and maybe some stretch goals like tunneling.

The first step then is to add support for multiple network transports.

To do this, I’ll need to;

  • Use message passing to manage the beacon, agent config, and execution state across goroutines.
  • Use message passing on transport send/recv channels.
  • Decouple Pneuma’s main event loop from the transport send/recv loop(s).

Message passing

Pneuma initializes a beacon in main and mutates it on each iteration of the EventLoop as shown below. The beacon is sent to Operator via the transport’s Communicate method which is an interface to a transport, Operator responds with a set of tasks to execute, and once executed the result is passed back to the EventLoop as respBeacon to repeat the whole thing.

// https://github.com/preludeorg/pneuma/blob/2db9be726a5428f24e29a817b8376a15155c5a5c/sockets/contact.go#L16
func EventLoop(agent *util.AgentConfig, beacon util.Beacon) {
	if respBeacon, err := util.CommunicationChannels[agent.Contact].Communicate(agent, beacon); err == nil {
		refreshBeacon(agent, &respBeacon)
		util.DebugLogf("C2 refreshed. [%s] agent at PID %d.", agent.Address, agent.Pid)
		EventLoop(agent, respBeacon)
	}
}

Let’s look at each of the goroutines we’ll use to manage various parts of our program.

Operation channels

We need our transports to be asynchronous and in GoLang the idiomatic way of communicating between threads is by message passing across channels. These channels will be used by our goroutines to manage read/write operations on the beacon, config, and other data shared between threads.

The channels.go file contains the channels, the operation structs they accept, and some helper functions to act on them.

I put a lot of comments in my code so it’s easier for someone new. These code snippets are just copied from the P2Pneuma GitHub repo. They will change. You should refer to the repo if you’re reading this with an interest in contributing the project.

// p2pneuma/channels/channels.go
// Each channel is initialized once with sync.Once
var agentConfigOpChanOnce sync.Once
var AgentConfigOpsChannel chan AgentConfigOp

// We'll call this function to init our channels during main init()
func InitChannels() {
	agentConfigOpChanOnce.Do(func() {
		AgentConfigOpsChannel = make(chan AgentConfigOp)
	})
}

// Each channel accepts a operation struct that tells the goroutine what to do (read/write)
type AgentConfigOp struct {
	Type           string
	Config         *util.AgentConfig
	ResponseStatus chan bool
	ResponseConfig chan *util.AgentConfig
}

Beacon goroutine

The beacon goroutine accepts an operation message that tells it to return a copy of the beacon for a transport, to append new tasks to the beacon, remove old tasks that have been executed, or refresh values stored in the agent config.

// p2pneuma/channels/beacon.go
// InitBeaconManager goroutine manages read/write ops on the Beacon.
func InitBeaconManager() {
	// Init the Beacon from AgentConfig to set default Beacon values.
	agent := ReadAgentConfig()
	beacon := agent.BuildBeacon("tcp")
	
	for {
		op := <-BeaconOpsChannel
		switch {
		// op.Contact string is used to select the contact target (address:port) from the AgentConfig.
		case op.Type == "read":
			b := beacon
			b.Target = agent.Contact[op.Contact]
			op.ResponseBeacon <- &b
		// Calls RefreshBeacon to update the Beacon with the current AgentConfig.
		case op.Type == "refresh":
			a := ReadAgentConfig()
			util.RefreshBeacon(a, &beacon, "tcp")
			op.ResponseStatus <- true
		// Append Links to the Beacon.
		case op.Type == "append":
			for _, link := range op.Links {
				beacon.Links = append(beacon.Links, link)
			}
			op.ResponseStatus <- true
		// Removes Links that have completed execution and have been sent to Operator.
		case op.Type == "trim":
			TrimSentLinks(&beacon)
			op.ResponseStatus <- true
		default:
			util.DebugLogf("[Beacon goroutine] unknown op type: %#v\n", op)
		}
	}
}

The link cache goroutine manages the execution state of each task (in Operator, referred to as a Link) in the beacon. Since our agent may send a beacon on multiple transports, and receive multiple responses from Operator to execute the same links, we need to ensure each Link is executed only once and that we return the Link only after it has executed.

// p2pneuma/channels/linkcache.go
// InitLinkCacheManager goroutine manages read/write ops on the Link Cache.
func InitLinkCacheManager() {
	cache := make(map[string]util.CachedLink)

	for {
		// Reads instructions of specified state from cache.
		op := <-CacheOpsChannel
		switch {
		// Returns the cache.
		case op.Type == "read":
			op.ResponseLinks <- cache
		// Writes a CachedLink struct to the cache. 
		// This struct is used to track the Links execution status.
		case op.Type == "write":
			timestamp := time.Now()
			cache[op.Link] = util.CachedLink{
				State: op.State,
				Sent:  op.Sent,
				Time:  timestamp,
			}
			op.ResponseStatus <- true
        // TrimLinkCache removes old links from the link cache.
		case op.Type == "trim":
			TrimLinkCache(&cache)
		default:
		  util.DebugLogf("[LinkCache goroutine] unknown op type: %#v\n", op)
		}
	}
}

Connection goroutine

The connection goroutine manages the transports, each defined in the Connection struct, that store the transports send/recv channels.

// p2pneuma/channels/connection.go
// InitConnectionManager manages read/write ops on the Connection map.
func InitConnectionManager() {
	// Reads the AgentConfig and returns a slice of Connections for each Contact with a target.
	agent := ReadAgentConfig()
	connections := make(map[string]*util.Connection)
	util.RefreshConnections(agent, connections)

	for {
		op := <-ConnectionOpsChannel
		switch {
		// Read ops return a slice of pointers to Connections.
		case op.Type == "read":
			op.ResponseConnections <- connections
		// Close ops closes a Connection of the specified name.
		case op.Type == "close":
			connections[op.Name].Cleanup()
			op.ResponseStatus <- true
		// Refresh ops restart closed connections.
		case op.Type == "refresh":
			util.RemoveClosedConnection(connections)
			util.RefreshConnections(agent, connections)
			op.ResponseStatus <- true
		}
	}
}

AgentConfig goroutine

The AgentConfig just needs to have a reader and writer op. It stores agent configuration values, such as the sleep time, and we’ll use it to periodically refresh the beacon.

// p2pneuma/channels/agentconfig.go
// InitAgentConfigManager manages read/write ops on the AgentConfig.
func InitAgentConfigManager(agent *util.AgentConfig) {
	// Loop read/write ops on the config.
	for {
		op := <-AgentConfigOpsChannel
		switch {
		// Read ops return the AgentConfig.
		case op.Type == "read":
			op.ResponseConfig <- agent
        // Write ops modify the AgentConfig.
		case op.Type == "write":
			agent = op.Config
			op.ResponseStatus <- true
		default:
			util.DebugLogf("[AgentConfig goroutine] unknown op type: %#v\n", op)
		}
	}
}

Envelope goroutine

Inspired by Sliver’s use of envelopes to wrap messages passed to RPC handlers, P2Pneuma envelopes are used to wrap beacons as they’re passed between transports and handlers.

The P2Pneuma handlers don’t really do anything for the moment (except fall through to the default executors), but by integrating handlers now we’ll be able to support a wider range of features (rpc, tunnels, etc.) or even integrate P2Pneuma with other C2 frameworks.

This goroutine is just responsible for accepting Envelopes from transports, checking if the Links have been executed already, and sending them their handlers.

// p2pneuma/channels/envelope.go
// InitEnvelopeManager passes Envelopes to the EnvelopeHandler.
func InitEnvelopeManager(f func(envelope *util.Envelope)) {
	for {
		envelope := <-Envelopes
		cacheLinks := ReadCacheLinks()

		// Check if each Link in the Beacon is cached.
		for _, link := range envelope.Beacon.Links {
			if util.CacheContains(cacheLinks, link.ID) {
				// If a Link is complete not sent we retry execution.
				if cacheLinks[link.ID].State == "complete" {
					if cacheLinks[link.ID].Sent == false {
						e := util.BuildSingleLinkEnvelope(envelope, link)
						f(e)
					}
				}
				// Pass it to an executor if not cached.
			} else {
				e := util.BuildSingleLinkEnvelope(envelope, link)
				// Links are marked as pending prior to being handled by the executor.
				if WriteCacheLink("pending", false, link.ID) {
					f(e)
				}
			}
		}
	}
}

Decoupling transports

I like how Sliver does transports. I like it so much I stole it. The idea is to init a transport which starts a couple goroutines to manage send/recv on the transport connection, holds some data about the transport itself, has some boilerplate to clean up the connection when needed, and forwards received messages to the RPC/tunnel/whatever handlers to do the thing.

VVX7 writing questionable code. (WFH, circa 2021, colourized)

In Pneuma the transport is embedded in the EventLoop and feeds directly into the executor which blocks until all links are executed. We’re going to break that out so we can start multiple transports which are multiplexed to the envelope goroutine that passes envelopes to the handlers. With this new architecture we can keep one or more transports busy, say passing messages to peers, while our executors do the work assigned by the C2.

Let’s look at the P2Pneuma TCP transport. Each transport follows this patten including the P2P stuff.

// p2pneuma/sockets/rawtcp.go
type TCP struct{}

func init() {
	util.CommunicationChannels["tcp"] = TCP{}
}

func (contact TCP) Communicate(agent *util.AgentConfig, name string) (*util.Connection, error) {
	// Dial a TCP connection.
	conn, err := net.Dial("tcp", agent.Contact["tcp"])
	if err != nil {
		util.DebugLogf("[-] %s is either unavailable or a firewall is blocking traffic.", agent.Contact["tcp"])
		return nil, err
	}

	// Create the Envelope Send/Recv channels.
	send := make(chan *util.Envelope)
	recv := make(chan *util.Envelope)

	// Create the Connection to be returned to the caller.
	// This Connection is stored in the Connection map and checked if it needs to be refreshed on each beacon.
	connection := &util.Connection{
		Name:   name,
		Type:   "tcp",
		Send:   send,
		Recv:   recv,
		IsOpen: true,
		Cleanup: func() {
			util.DebugLogf("[tcp] cleaning up connection.")
			close(send)
			conn.Close()
			close(recv)
		},
	}

	// Write socket goroutine reads from the connection Send chan and writes to the socket.
	go func() {
		defer connection.Cleanup()
		for {
			envelope := <-send
			bufferedSend(conn, *envelope.Beacon)
			channels.UpdateSentLinks(envelope)
		}
	}()

	// Read socket goroutine reads from the socket and writes to the connection Recv chan.
	go func() {
		defer connection.Cleanup()
		for {
			beacon, err := tcpRead(conn)
			if err != nil {
				break
			}

			if beacon != nil {
				envelope := util.BuildEnvelope(beacon, connection)
				recv <- envelope
			}
			util.JitterSleep(agent.CommandJitter, "SILENT")
		}
	}()

    // Sends the envelopes from the recv chan to the envelope channel.
	go func() {
		defer connection.Cleanup()
		for {
			env := <-recv
			channels.Envelopes <- env
		}
	}()

	return connection, nil
}

Transports

We saw that the transports are multiplexed to an envelope handler but it’s also important to understand how the beacon is shared between transports, because it’s quite different than Pneuma.

With Pneuma, we just recursively mutate a single beacon as the event loop go brrr.

With P2Pneuma, we make a copy of the latest beacon and send it out on all available transports. The responses and the resulting execution results, links, are then appended to the shared beacon in beacon goroutine to be copied and sent on the next event loop iteration.

Notably, we don’t need to know or care if a particular transport is successful so long as at least one transport succeeds.

// p2pneuma/main.go
// Set up the channel operation managers.
go channels.InitAgentConfigManager(agent)
go channels.InitBeaconManager()
go channels.InitEnvelopeManager(sockets.EnvelopeHandler)
go channels.InitLinkCacheManager()
go channels.InitConnectionManager()

for {
    // Wait for each contact's EnvelopeForwarder to complete before sending the next beacon.
    var wg sync.WaitGroup

    // Refresh contacts as they may be updated or removed.
    a := channels.ReadAgentConfig()

    // Refresh the connections to avoid spawning an event loop for closed connections.
    _ = channels.RefreshConnections()
    connections := channels.ReadConnections()

    // Read from Beacon chan to send current data.
    beacon := channels.ReadBeacon()

    // Send the beacon out on each transport.
    for _, conn := range connections {
        wg.Add(1)

        // Copy and modify the beacon to include the correct transport target (address:port).
        b := beacon
        b.Target = a.Contact[conn.Type]

        // Construct the Envelope that holds the Beacon and this Connection.
        envelope := util.BuildEnvelope(beacon, conn)

        // EnvelopeForwarder passes the beacon to the connection send channel.
        go util.EnvelopeForwarder(conn, envelope, &wg)
    }
    wg.Wait()

    // Sleep before sending out the next beacon.
    util.JitterSleep(a.Sleep, "JITTER")
    
    // Remove Links that were executed and sent to Operator.
    channels.TrimBeaconLinks()
}

Friends I made along the way

Wow. Fuck Go channels.

I’ve heard that Sandcat, the CALDARA GoLang agent, uses channels. Now I understand now why Pneuma does not.

I want my code to be simple and easy to understand. Using channels for everything, instead using them sparingly and otherwise a mutex or atomic, seems to have complicated the agent without clear advantages.

The ability to use multiple channels is pretty cool in theory but there are considerations that may preclude their use in an engagement; multiple channels are noisy; failed connections on each beacon look sus; system resource usage; stuff someone smarter than me will inevitably point out as obviously borked.

There’s also the issue that multiple control channels could result is unexpected behaviours (if you do something silly like connect a single agent to multiple Operator instances). Maybe that’s a feature.

Fin

Thanks for reading! Next time we’ll look at IPFS integration in P2Pneuma and how we can augment a C2 framework with P2P messaging and decentralized listening posts.