Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 79 additions & 60 deletions dissect/cobaltstrike/beacon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import hashlib
import io
import ipaddress
import itertools
import logging
import sys
import time
Expand All @@ -36,8 +35,10 @@

from dissect import cstruct
from dissect.cobaltstrike import pe
from dissect.cobaltstrike.guardrails import GuardrailMetadata, iter_guardrail_configs_with_beacon
from dissect.cobaltstrike.utils import (
catch_sigpipe,
grouper,
iter_find_needle,
p8,
u16be,
Expand Down Expand Up @@ -464,13 +465,6 @@ def iter_settings(fobj: Union[bytes, BinaryIO]) -> Iterator["Setting"]:
yield setting


def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)


def parse_recover_binary(program: bytes) -> List[Tuple[str, Union[int, bool]]]:
"""Parse ``SETTING_C2_RECOVER`` (`.http-get.server.output`) data"""
rsteps: List[Tuple[str, Union[int, bool]]] = []
Expand Down Expand Up @@ -773,6 +767,8 @@ def __init__(self, config_block: bytes) -> None:
""" PE compile timestamp, ``None`` if unknown. """
self.architecture: Optional[str] = None
""" PE architecture, ``"x86"`` or ``"x64"`` and ``None`` if unknown. """
self.guardrails: Optional[GuardrailMetadata] = None
""" Guardrails metadata, ``None`` if not available. """

# Used for caching
self._settings: Optional[Mapping[str, Any]] = None
Expand Down Expand Up @@ -809,6 +805,22 @@ def from_file(cls, fobj: BinaryIO, xor_keys: List[bytes] = None, all_xor_keys: b
bconfig.architecture = pe.find_architecture(fh)
# Return the first found beacon config.
return bconfig

# Try finding Beacon config protected with Guardrails
try:
fxor = XorEncodedFile.from_file(fobj)
except ValueError:
fxor = fobj
for grconfig in iter_guardrail_configs_with_beacon(fxor):
if not grconfig.unmasked_beacon_config:
continue
bconfig = cls(grconfig.unmasked_beacon_config)
bconfig.guardrails = grconfig
bconfig.xorkey = grconfig.beacon_xor_key
bconfig.pe_compile_stamp, bconfig.pe_export_stamp = pe.find_compile_stamps(fxor)
bconfig.architecture = pe.find_architecture(fxor)
return bconfig

raise ValueError("No valid Beacon configuration found")

@classmethod
Expand Down Expand Up @@ -1108,18 +1120,17 @@ def build_parser():
import argparse

parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("input", metavar="FILE", help="Beacon to dump")
parser.add_argument("input", metavar="FILE", nargs="+", help="Beacon to dump")
parser.add_argument(
"-x",
"--xorkey",
action="append",
help="override default xor key(s) (default: -x 0x69 -x 0x2e -x 0x00)",
)
parser.add_argument(
"-a",
"--all",
"--default-xor-keys-only",
action="store_true",
help="try all other single byte xor keys when default ones fail",
help="only try the default xor keys instead of all possible ones",
)
parser.add_argument(
"-t",
Expand Down Expand Up @@ -1150,63 +1161,71 @@ def main():
level = levels[min(len(levels) - 1, args.verbose)]
logging.basicConfig(
level=level,
datefmt="[%X]",
format="%(asctime)s %(name)s %(message)s",
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)

xor_keys = None
if args.xorkey:
xor_keys = tuple(utils.pack_be(int(x, 0)) for x in args.xorkey)

try:
if args.input in ("-", "/dev/stdin"):
with io.BytesIO(sys.stdin.buffer.read()) as fin:
config = BeaconConfig.from_file(fin, xor_keys=xor_keys, all_xor_keys=args.all)
else:
config = BeaconConfig.from_path(args.input, xor_keys=xor_keys, all_xor_keys=args.all)
except ValueError:
print(f"{args.input}: No beacon configuration found.", file=sys.stderr)
return 1

if args.type == "raw":
for setting in config.settings_tuple:
print(setting)
elif args.type == "dumpstruct":
cstruct.hexdump(config.config_block)
print("-----")
for setting in config.settings_tuple:
cstruct.dumpstruct(setting)
print("-" * 10)
elif args.type == "normal":
settings = config.settings
for setting, value in settings.items():
print(f"{setting} = {value!r}")
if args.verbose >= 1:
print("-" * 50)
print(
"pe_export_stamp = {}, {}, {}".format(
config.pe_export_stamp,
hex(config.pe_export_stamp),
time.ctime(config.pe_export_stamp),
dumped = False
for fname in args.input:
logging.info("Processing: %r", fname)
try:
if fname in ("-", "/dev/stdin"):
with io.BytesIO(sys.stdin.buffer.read()) as fin:
config = BeaconConfig.from_file(fin, xor_keys=xor_keys, all_xor_keys=not args.default_xor_keys_only)
else:
config = BeaconConfig.from_path(fname, xor_keys=xor_keys, all_xor_keys=not args.default_xor_keys_only)
except ValueError:
print(f"{fname}: No beacon configuration found.", file=sys.stderr)
continue

dumped = True
if args.type == "raw":
for setting in config.settings_tuple:
print(setting)
elif args.type == "dumpstruct":
cstruct.hexdump(config.config_block)
print("-----")
for setting in config.settings_tuple:
cstruct.dumpstruct(setting)
print("-" * 10)
elif args.type == "normal":
settings = config.settings
for setting, value in settings.items():
print(f"{setting} = {value!r}")
if args.verbose >= 1:
print("-" * 50)
print(
"pe_export_stamp = {}, {}, {}".format(
config.pe_export_stamp,
hex(config.pe_export_stamp),
time.ctime(config.pe_export_stamp),
)
)
)
print(
"pe_compile_stamp = {}, {}, {}".format(
config.pe_compile_stamp,
hex(config.pe_compile_stamp),
time.ctime(config.pe_compile_stamp),
print(
"pe_compile_stamp = {}, {}, {}".format(
config.pe_compile_stamp,
hex(config.pe_compile_stamp),
time.ctime(config.pe_compile_stamp),
)
)
)
print(
"max_setting_enum = {} - {}".format(
config.max_setting_enum,
BeaconSetting(config.max_setting_enum),
print(
"max_setting_enum = {} - {}".format(
config.max_setting_enum,
BeaconSetting(config.max_setting_enum),
)
)
)
print("beacon_version =", config.version)
elif args.type == "c2profile":
profile = c2profile.C2Profile.from_beacon_config(config)
print(profile.as_text())
print("beacon_version =", config.version)
if config.guardrails:
print("guardrail payload xor key =", config.guardrails.payload_xor_key)
print("guardrail options =", [s.option for s in config.guardrails.settings])
elif args.type == "c2profile":
profile = c2profile.C2Profile.from_beacon_config(config)
print(profile.as_text())

return 0 if dumped else 1


if __name__ == "__main__":
Expand Down
Loading
Loading