From d3864555b7579ef8be08939bcb1db88bea4e5b33 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 27 Jun 2024 16:34:57 +0200 Subject: [PATCH 01/23] ENH: Add possibility to delay generating MISP Feed Generating MISP feed on every incoming message slows down processing. The new config option let us decide to save them in batches. Cached events are stored in a cache list in Redis. In addition, a code related to Python 3.6 was removed as we do not support this version any more. --- intelmq/bots/outputs/misp/output_feed.py | 106 +++++++++++------- intelmq/lib/bot.py | 4 + intelmq/lib/mixins/cache.py | 18 ++- .../bots/outputs/misp/test_output_feed.py | 54 ++++++++- 4 files changed, 135 insertions(+), 47 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index d9d34a8571..ce754578b2 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -5,13 +5,14 @@ # -*- coding: utf-8 -*- import datetime import json +import re from pathlib import Path from uuid import uuid4 -import re from intelmq import VAR_STATE_PATH from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: @@ -20,19 +21,14 @@ except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None - import_fail_reason = 'import' -except SyntaxError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 - MISPEvent = None - import_fail_reason = 'syntax' - + import_fail_reason = "import" -# NOTE: This module is compatible with Python 3.6+ - -class MISPFeedOutputBot(OutputBot): +class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" + interval_event: str = "1 hour" + delay_save_event_count: int = None misp_org_name = None misp_org_uuid = None output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path @@ -46,13 +42,8 @@ def check_output_dir(dirname): return True def init(self): - if MISPEvent is None and import_fail_reason == 'syntax': - raise MissingDependencyError("pymisp", - version='>=2.4.117.3', - additional_text="Python versions below 3.6 are " - "only supported by pymisp <= 2.4.119.1.") - elif MISPEvent is None: - raise MissingDependencyError('pymisp', version='>=2.4.117.3') + if MISPEvent is None: + raise MissingDependencyError("pymisp", version=">=2.4.117.3") self.current_event = None @@ -72,59 +63,90 @@ def init(self): try: with (self.output_dir / '.current').open() as f: self.current_file = Path(f.read()) - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0] - last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f') - last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f') - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None - else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time + + if self.current_file.exists(): + self.current_event = MISPEvent() + self.current_event.load_file(self.current_file) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_event.info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + if last_max_time < datetime.datetime.now(): + self.min_time_current = datetime.datetime.now() + self.max_time_current = self.min_time_current + self.timedelta + self.current_event = None + else: + self.min_time_current = last_min_time + self.max_time_current = last_max_time except: - self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event) + self.logger.exception( + "Loading current event %s failed. Skipping it.", self.current_event + ) self.current_event = None else: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta def process(self): - if not self.current_event or datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta self.current_event = MISPEvent() - self.current_event.info = ('IntelMQ event {begin} - {end}' - ''.format(begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat())) + self.current_event.info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) self.current_event.set_date(datetime.date.today()) self.current_event.Orgc = self.misp_org self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f'{self.current_event.uuid}.json' - with (self.output_dir / '.current').open('w') as f: + self.current_file = self.output_dir / f"{self.current_event.uuid}.json" + with (self.output_dir / ".current").open("w") as f: f.write(str(self.current_file)) + # On startup or when timeout occurs, clean the queue to ensure we do not + # keep events forever because there was not enough generated + self._generate_feed() + event = self.receive_message().to_dict(jsondict_as_string=True) - obj = self.current_event.add_object(name='intelmq_event') - for object_relation, value in event.items(): + cache_size = None + if self.delay_save_event_count: + cache_size = self.cache_put(event) + + if cache_size is None: + self._generate_feed(event) + elif cache_size >= self.delay_save_event_count: + self._generate_feed() + + self.acknowledge_message() + + def _add_message_to_feed(self, message: dict): + obj = self.current_event.add_object(name="intelmq_event") + for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. pass - feed_output = self.current_event.to_feed(with_meta=False) + def _generate_feed(self, message: dict = None): + if message: + self._add_message_to_feed(message) + + while message := self.cache_pop(): + self._add_message_to_feed(message) - with self.current_file.open('w') as f: + feed_output = self.current_event.to_feed(with_meta=False) + with self.current_file.open("w") as f: json.dump(feed_output, f) feed_meta_generator(self.output_dir) - self.acknowledge_message() @staticmethod def check(parameters): diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 696d5424e4..7c62502112 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -279,6 +279,10 @@ def catch_shutdown(): def harmonization(self): return self._harmonization + @property + def bot_id(self): + return self.__bot_id_full + def __handle_sigterm_signal(self, signum: int, stack: Optional[object]): """ Calls when a SIGTERM is received. Stops the bot. diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 3cf5365023..9565175402 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -1,4 +1,4 @@ -""" CacheMixin for IntelMQ +"""CacheMixin for IntelMQ SPDX-FileCopyrightText: 2021 Sebastian Waldbauer SPDX-License-Identifier: AGPL-3.0-or-later @@ -6,6 +6,7 @@ CacheMixin is used for caching/storing data in redis. """ +import json from typing import Any, Optional import redis import intelmq.lib.utils as utils @@ -31,7 +32,9 @@ def __init__(self, **kwargs): "socket_timeout": 5, } - self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs) + self.__redis = redis.Redis( + db=self.redis_cache_db, password=self.redis_cache_password, **kwargs + ) super().__init__() def cache_exists(self, key: str): @@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None): if self.redis_cache_ttl: self.__redis.expire(key, self.redis_cache_ttl) + def cache_put(self, value: dict) -> int: + # Returns the length of the list after pushing + size = self.__redis.lpush(self.bot_id, json.dumps(value)) + return size + + def cache_pop(self) -> dict: + data = self.__redis.rpop(self.bot_id) + if data is None: + return None + return json.loads(data) + def cache_flush(self): """ Flushes the currently opened database by calling FLUSHDB. diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 783f2bfa94..1627e29c4c 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -3,8 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- +import json import unittest -import sys +from pathlib import Path from tempfile import TemporaryDirectory import intelmq.lib.test as test @@ -37,9 +38,9 @@ @test.skip_exotic() class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase): - @classmethod def set_bot(cls): + cls.use_cache = True cls.bot_reference = MISPFeedOutputBot cls.default_input_message = EXAMPLE_EVENT cls.directory = TemporaryDirectory() @@ -51,10 +52,57 @@ def set_bot(cls): def test_event(self): self.run_bot() + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + def test_accumulating_events(self): + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"delay_save_event_count": 3}) + + current_event = open(f"{self.directory.name}/.current").read() + + # First, the feed is empty - not enough events came + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 0 + + self.input_message = [EXAMPLE_EVENT] + self.run_bot(parameters={"delay_save_event_count": 3}) + + # When enough events were collected, save them + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 3 + + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=3, parameters={"delay_save_event_count": 3}) + + # We continue saving to the same file until interval timeout + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 6 + + # Simulating leftovers in the queue when it's time to generate new event + Path(f"{self.directory.name}/.current").unlink() + self.bot.cache_put(EXAMPLE_EVENT) + self.run_bot(parameters={"delay_save_event_count": 3}) + + new_event = open(f"{self.directory.name}/.current").read() + with open(new_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 1 + + + def tearDown(self): + self.cache.delete(self.bot_id) + super().tearDown() + @classmethod def tearDownClass(cls): cls.directory.cleanup() -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover unittest.main() From d67d6f3dd46c12764f5b750d2c85b46250b142ad Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 3 Jul 2024 15:46:59 +0200 Subject: [PATCH 02/23] Add documentation. Fix code compatibility --- CHANGELOG.md | 4 +- docs/user/bots.md | 7 ++ intelmq/bots/outputs/misp/output_feed.py | 10 ++- intelmq/lib/mixins/cache.py | 11 +++ .../bots/outputs/misp/test_output_feed.py | 76 ++++++++++--------- 5 files changed, 66 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f646eb3bf6..4675c7c099 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -219,7 +219,9 @@ This is short list of the most important known issues. The full list can be retr - Treat value `false` for parameter `filter_regex` as false (PR#2499 by Sebastian Wagner). #### Outputs -- `intelmq.bots.outputs.misp.output_feed`: Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). +- `intelmq.bots.outputs.misp.output_feed`: + - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). + - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index 8b1b956419..67baae678a 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4760,6 +4760,13 @@ The PyMISP library >= 2.4.119.1 is required, see () The output bot creates one event per each interval, all data in this time frame is part of this event. Default "1 hour", string. +**`bulk_save_count`** + +(optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immeadiately, but will cache +incoming messages until the given number of them. Use it if your bot proceeds a high number of messages +and constant saving to the disk is a problem. Reloading or restarting bot as well as generating +a new MISP event based on `interval_event` triggers saving regardless of the cache size. + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index ce754578b2..1e090011fe 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -28,7 +28,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" interval_event: str = "1 hour" - delay_save_event_count: int = None + bulk_save_count: int = None misp_org_name = None misp_org_uuid = None output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path @@ -116,12 +116,12 @@ def process(self): event = self.receive_message().to_dict(jsondict_as_string=True) cache_size = None - if self.delay_save_event_count: + if self.bulk_save_count: cache_size = self.cache_put(event) if cache_size is None: self._generate_feed(event) - elif cache_size >= self.delay_save_event_count: + elif cache_size >= self.bulk_save_count: self._generate_feed() self.acknowledge_message() @@ -139,8 +139,10 @@ def _generate_feed(self, message: dict = None): if message: self._add_message_to_feed(message) - while message := self.cache_pop(): + message = self.cache_pop() + while message: self._add_message_to_feed(message) + message = self.cache_pop() feed_output = self.current_event.to_feed(with_meta=False) with self.current_file.open("w") as f: diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 9565175402..ee945fbb53 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -13,6 +13,17 @@ class CacheMixin: + """Provides caching possibilities for bots + + For key-value cache, use methods: + cache_exists + cache_get + cache_set + + To store dict elements in a cache queue named after bot id, use methods: + cache_put + cache_pop + """ __redis: redis.Redis = None redis_cache_host: str = "127.0.0.1" redis_cache_port: int = 6379 diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 1627e29c4c..631b7b7bd4 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -11,29 +11,30 @@ import intelmq.lib.test as test from intelmq.bots.outputs.misp.output_feed import MISPFeedOutputBot -EXAMPLE_EVENT = {"classification.type": "infected-system", - "destination.port": 9796, - "feed.accuracy": 100.0, - "destination.ip": "52.18.196.169", - "malware.name": "salityp2p", - "event_description.text": "Sinkhole attempted connection", - "time.source": "2016-04-19T23:16:08+00:00", - "source.ip": "152.166.119.2", - "feed.url": "http://alerts.bitsighttech.com:8080/stream?", - "source.geolocation.country": "Dominican Republic", - "time.observation": "2016-04-19T23:16:08+00:00", - "source.port": 65118, - "__type": "Event", - "feed.name": "BitSight", - "extra.non_ascii": "ççãããã\x80\ua000 \164 \x80\x80 abcd \165\166", - "raw": "eyJ0cm9qYW5mYW1pbHkiOiJTYWxpdHlwMnAiLCJlbnYiOnsic" - "mVtb3RlX2FkZHIiOiIxNTIuMTY2LjExOS4yIiwicmVtb3RlX3" - "BvcnQiOiI2NTExOCIsInNlcnZlcl9hZGRyIjoiNTIuMTguMTk" - "2LjE2OSIsInNlcnZlcl9wb3J0IjoiOTc5NiJ9LCJfdHMiOjE0" - "NjExMDc3NjgsIl9nZW9fZW52X3JlbW90ZV9hZGRyIjp7ImNvd" - "W50cnlfbmFtZSI6IkRvbWluaWNhbiBSZXB1YmxpYyJ9fQ==", - "__type": "Event", - } +EXAMPLE_EVENT = { + "classification.type": "infected-system", + "destination.port": 9796, + "feed.accuracy": 100.0, + "destination.ip": "52.18.196.169", + "malware.name": "salityp2p", + "event_description.text": "Sinkhole attempted connection", + "time.source": "2016-04-19T23:16:08+00:00", + "source.ip": "152.166.119.2", + "feed.url": "http://alerts.bitsighttech.com:8080/stream?", + "source.geolocation.country": "Dominican Republic", + "time.observation": "2016-04-19T23:16:08+00:00", + "source.port": 65118, + "__type": "Event", + "feed.name": "BitSight", + "extra.non_ascii": "ççãããã\x80\ua000 \164 \x80\x80 abcd \165\166", + "raw": "eyJ0cm9qYW5mYW1pbHkiOiJTYWxpdHlwMnAiLCJlbnYiOnsic" + "mVtb3RlX2FkZHIiOiIxNTIuMTY2LjExOS4yIiwicmVtb3RlX3" + "BvcnQiOiI2NTExOCIsInNlcnZlcl9hZGRyIjoiNTIuMTguMTk" + "2LjE2OSIsInNlcnZlcl9wb3J0IjoiOTc5NiJ9LCJfdHMiOjE0" + "NjExMDc3NjgsIl9nZW9fZW52X3JlbW90ZV9hZGRyIjp7ImNvd" + "W50cnlfbmFtZSI6IkRvbWluaWNhbiBSZXB1YmxpYyJ9fQ==", + "__type": "Event", +} @test.skip_exotic() @@ -43,11 +44,16 @@ def set_bot(cls): cls.use_cache = True cls.bot_reference = MISPFeedOutputBot cls.default_input_message = EXAMPLE_EVENT - cls.directory = TemporaryDirectory() - cls.sysconfig = {"misp_org_name": 'IntelMQTestOrg', - "misp_org_uuid": "b89da4c2-0f74-11ea-96a1-6fa873a0eb4d", - "output_dir": cls.directory.name, - "interval_event": '1 hour'} + cls.sysconfig = { + "misp_org_name": "IntelMQTestOrg", + "misp_org_uuid": "b89da4c2-0f74-11ea-96a1-6fa873a0eb4d", + "interval_event": "1 hour", + } + + def setUp(self) -> None: + super().setUp() + self.directory = TemporaryDirectory() + self.sysconfig["output_dir"] = self.directory.name def test_event(self): self.run_bot() @@ -59,7 +65,7 @@ def test_event(self): def test_accumulating_events(self): self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] - self.run_bot(iterations=2, parameters={"delay_save_event_count": 3}) + self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) current_event = open(f"{self.directory.name}/.current").read() @@ -69,7 +75,7 @@ def test_accumulating_events(self): assert len(objects) == 0 self.input_message = [EXAMPLE_EVENT] - self.run_bot(parameters={"delay_save_event_count": 3}) + self.run_bot(parameters={"bulk_save_count": 3}) # When enough events were collected, save them with open(current_event) as f: @@ -77,7 +83,7 @@ def test_accumulating_events(self): assert len(objects) == 3 self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] - self.run_bot(iterations=3, parameters={"delay_save_event_count": 3}) + self.run_bot(iterations=3, parameters={"bulk_save_count": 3}) # We continue saving to the same file until interval timeout with open(current_event) as f: @@ -87,22 +93,18 @@ def test_accumulating_events(self): # Simulating leftovers in the queue when it's time to generate new event Path(f"{self.directory.name}/.current").unlink() self.bot.cache_put(EXAMPLE_EVENT) - self.run_bot(parameters={"delay_save_event_count": 3}) + self.run_bot(parameters={"bulk_save_count": 3}) new_event = open(f"{self.directory.name}/.current").read() with open(new_event) as f: objects = json.load(f)["Event"]["Object"] assert len(objects) == 1 - def tearDown(self): self.cache.delete(self.bot_id) + self.directory.cleanup() super().tearDown() - @classmethod - def tearDownClass(cls): - cls.directory.cleanup() - if __name__ == "__main__": # pragma: no cover unittest.main() From 920a07d1750134a9f6e88bfc843b3e97a53ea105 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 3 Jul 2024 15:51:30 +0200 Subject: [PATCH 03/23] Fix spelling --- docs/user/bots.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/bots.md b/docs/user/bots.md index 67baae678a..2a4aeeed37 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4762,7 +4762,7 @@ hour", string. **`bulk_save_count`** -(optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immeadiately, but will cache +(optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immediately, but will cache incoming messages until the given number of them. Use it if your bot proceeds a high number of messages and constant saving to the disk is a problem. Reloading or restarting bot as well as generating a new MISP event based on `interval_event` triggers saving regardless of the cache size. From 59c3014211a5ef7e8516e996ae1703faf9069f05 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 4 Jul 2024 13:32:59 +0200 Subject: [PATCH 04/23] ENH: Add attribute mapping The bot can now construct an event much more alligned to custom needs, allowing setting comments and selecting just a subset of fields to export --- CHANGELOG.md | 1 + docs/user/bots.md | 25 +++++++ intelmq/bots/outputs/misp/output_feed.py | 74 ++++++++++++++++--- .../bots/outputs/misp/test_output_feed.py | 47 +++++++++++- 4 files changed, 135 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4675c7c099..6d1da18dcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -222,6 +222,7 @@ This is short list of the most important known issues. The full list can be retr - `intelmq.bots.outputs.misp.output_feed`: - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski). + - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index 2a4aeeed37..206573501e 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4767,6 +4767,31 @@ incoming messages until the given number of them. Use it if your bot proceeds a and constant saving to the disk is a problem. Reloading or restarting bot as well as generating a new MISP event based on `interval_event` triggers saving regardless of the cache size. +**`attribute_mapping`** + +(optional, dict) If set, allows selecting which IntelMQ event fields are mapped to MISP attributes +as well as attribute parameters (like e.g. a comment). The expected format is a *dictonary of dictionaries*: +first-level key represents an IntelMQ field that will be directly translated to a MISP attribute; nested +dictionary represents addditional parameters PyMISP can take when creating an attribute. They can use +names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed, +leave empty dict. + +For example: + +```yaml +attribute_mapping: + source.ip: + feed.name: + comment: event_description.text + destination.ip: + to_ids: False +``` + +would create a MISP object with three attributes `source.ip`, `feed.name` and `destination.ip` +and set their values as in the IntelMQ event. In addition, the `feed.name` would have a comment +as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set +as not usable for IDS. + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 1e090011fe..ca3eab69db 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -10,8 +10,11 @@ from uuid import uuid4 from intelmq import VAR_STATE_PATH +import pymisp + from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError +from ....lib.message import Message, MessageFactory from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative @@ -33,6 +36,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): misp_org_uuid = None output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path _is_multithreadable: bool = False + attribute_mapping: dict = None @staticmethod def check_output_dir(dirname): @@ -57,11 +61,13 @@ def init(self): if self.interval_event is None: self.timedelta = datetime.timedelta(hours=1) else: - self.timedelta = datetime.timedelta(minutes=parse_relative(self.interval_event)) + self.timedelta = datetime.timedelta( + minutes=parse_relative(self.interval_event) + ) - if (self.output_dir / '.current').exists(): + if (self.output_dir / ".current").exists(): try: - with (self.output_dir / '.current').open() as f: + with (self.output_dir / ".current").open() as f: self.current_file = Path(f.read()) if self.current_file.exists(): @@ -128,12 +134,49 @@ def process(self): def _add_message_to_feed(self, message: dict): obj = self.current_event.add_object(name="intelmq_event") + if not self.attribute_mapping: + self._default_mapping(obj, message) + else: + self._custom_mapping(obj, message) + + def _default_mapping(self, obj: pymisp.MISPObject, message: dict): for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. - pass + self.logger.warning( + "Object relation %s not exists in MISP definition, ignoring", + object_relation, + ) + + def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dict: + # For caching and default mapping, the serialized version is the right format to work on. + # However, for any custom mapping the Message object is more sufficient as it handles + # subfields. + message = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" + ) + result = {} + for parameter, value in definition.items(): + # Check if the value is a harmonization key or a static value + if isinstance(value, str) and ( + value in self.harmonization["event"] + or value.split(".", 1)[0] in self.harmonization["event"] + ): + result[parameter] = message.get(value) + else: + result[parameter] = value + return result + + def _custom_mapping(self, obj: pymisp.MISPObject, message: dict): + for object_relation, definition in self.attribute_mapping.items(): + obj.add_attribute( + object_relation, + value=message[object_relation], + **self._extract_misp_attribute_kwargs(message, definition), + ) + # In case of manual mapping, we want to fail if it produces incorrect values def _generate_feed(self, message: dict = None): if message: @@ -152,18 +195,27 @@ def _generate_feed(self, message: dict = None): @staticmethod def check(parameters): - if 'output_dir' not in parameters: + if "output_dir" not in parameters: return [["error", "Parameter 'output_dir' not given."]] try: - created = MISPFeedOutputBot.check_output_dir(parameters['output_dir']) + created = MISPFeedOutputBot.check_output_dir(parameters["output_dir"]) except OSError: - return [["error", - "Directory %r of parameter 'output_dir' does not exist and could not be created." % parameters['output_dir']]] + return [ + [ + "error", + "Directory %r of parameter 'output_dir' does not exist and could not be created." + % parameters["output_dir"], + ] + ] else: if created: - return [["info", - "Directory %r of parameter 'output_dir' did not exist, but has now been created." - "" % parameters['output_dir']]] + return [ + [ + "info", + "Directory %r of parameter 'output_dir' did not exist, but has now been created." + "" % parameters["output_dir"], + ] + ] BOT = MISPFeedOutputBot diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 631b7b7bd4..abb4b9c368 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -8,6 +8,7 @@ from pathlib import Path from tempfile import TemporaryDirectory +from .....lib.message import Message, MessageFactory import intelmq.lib.test as test from intelmq.bots.outputs.misp.output_feed import MISPFeedOutputBot @@ -92,7 +93,7 @@ def test_accumulating_events(self): # Simulating leftovers in the queue when it's time to generate new event Path(f"{self.directory.name}/.current").unlink() - self.bot.cache_put(EXAMPLE_EVENT) + self.bot.cache_put(MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True)) self.run_bot(parameters={"bulk_save_count": 3}) new_event = open(f"{self.directory.name}/.current").read() @@ -100,6 +101,50 @@ def test_accumulating_events(self): objects = json.load(f)["Event"]["Object"] assert len(objects) == 1 + def test_attribute_mapping(self): + self.run_bot( + parameters={ + "attribute_mapping": { + "source.ip": {}, + "feed.name": {"comment": "event_description.text"}, + "destination.ip": {"to_ids": False}, + "malware.name": {"comment": "extra.non_ascii"} + } + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + + assert len(objects) == 1 + attributes = objects[0].get("Attribute") + assert len(attributes) == 4 + source_ip = next( + attr for attr in attributes if attr.get("object_relation") == "source.ip" + ) + assert source_ip["value"] == "152.166.119.2" + assert source_ip["comment"] == "" + + feed_name = next( + attr for attr in attributes if attr.get("object_relation") == "feed.name" + ) + assert feed_name["value"] == EXAMPLE_EVENT["feed.name"] + assert feed_name["comment"] == EXAMPLE_EVENT["event_description.text"] + + destination_ip = next( + attr for attr in attributes if attr.get("object_relation") == "destination.ip" + ) + assert destination_ip["value"] == EXAMPLE_EVENT["destination.ip"] + assert destination_ip["to_ids"] is False + + malware_name = next( + attr for attr in attributes if attr.get("object_relation") == "malware.name" + ) + assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] + assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] + + def tearDown(self): self.cache.delete(self.bot_id) self.directory.cleanup() From b8b606169e0d5a9277c7b92f28db82368ba365ec Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 4 Jul 2024 16:32:30 +0200 Subject: [PATCH 05/23] ENH: Add support for creating separated MISP Events With `event_separator` parameter, user can decide to create more than one MISP event in the output bot and group incomming messages based on given field. --- CHANGELOG.md | 5 +- docs/user/bots.md | 10 +- intelmq/bots/outputs/misp/output_feed.py | 155 +++++++++++------- .../bots/outputs/misp/test_output_feed.py | 89 ++++++++-- 4 files changed, 189 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d1da18dcb..c5a21f3ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -221,8 +221,9 @@ This is short list of the most important known issues. The full list can be retr #### Outputs - `intelmq.bots.outputs.misp.output_feed`: - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). - - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2505 by Kamil Mankowski). - - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR by Kamil Mankowski). + - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). + - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). + - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index 206573501e..f8787fc3e7 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4770,9 +4770,9 @@ a new MISP event based on `interval_event` triggers saving regardless of the cac **`attribute_mapping`** (optional, dict) If set, allows selecting which IntelMQ event fields are mapped to MISP attributes -as well as attribute parameters (like e.g. a comment). The expected format is a *dictonary of dictionaries*: +as well as attribute parameters (like e.g. a comment). The expected format is a *dictionary of dictionaries*: first-level key represents an IntelMQ field that will be directly translated to a MISP attribute; nested -dictionary represents addditional parameters PyMISP can take when creating an attribute. They can use +dictionary represents additional parameters PyMISP can take when creating an attribute. They can use names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed, leave empty dict. @@ -4792,6 +4792,12 @@ and set their values as in the IntelMQ event. In addition, the `feed.name` would as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set as not usable for IDS. +**`event_separator` + +(optional, string): If set to a field name from IntelMQ event, the bot will group incoming messages +in separated MISP events, based on the value of this field. The `interval_event` parameter acts +for all grouping events together. + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index ca3eab69db..9b5247c595 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -14,18 +14,20 @@ from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError -from ....lib.message import Message, MessageFactory +from ....lib.message import MessageFactory from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: - from pymisp import MISPEvent, MISPOrganisation, NewAttributeError + from pymisp import MISPEvent, MISPObject, MISPOrganisation, NewAttributeError from pymisp.tools import feed_meta_generator except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None import_fail_reason = "import" +DEFAULT_KEY = "default" + class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" @@ -37,6 +39,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path _is_multithreadable: bool = False attribute_mapping: dict = None + event_separator: str = None @staticmethod def check_output_dir(dirname): @@ -49,7 +52,8 @@ def init(self): if MISPEvent is None: raise MissingDependencyError("pymisp", version=">=2.4.117.3") - self.current_event = None + self.current_events = {} + self.current_files = {} self.misp_org = MISPOrganisation() self.misp_org.name = self.misp_org_name @@ -65,58 +69,57 @@ def init(self): minutes=parse_relative(self.interval_event) ) + self.min_time_current = datetime.datetime.max + self.max_time_current = datetime.datetime.min + if (self.output_dir / ".current").exists(): try: with (self.output_dir / ".current").open() as f: - self.current_file = Path(f.read()) - - if self.current_file.exists(): - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall( - "IntelMQ event (.*) - (.*)", self.current_event.info - )[0] - last_min_time = datetime.datetime.strptime( - last_min_time, "%Y-%m-%dT%H:%M:%S.%f" - ) - last_max_time = datetime.datetime.strptime( - last_max_time, "%Y-%m-%dT%H:%M:%S.%f" - ) - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None - else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time - except: + current = f.read() + + if not self.event_separator: + self.current_files[DEFAULT_KEY] = Path(current) + else: + self.current_files = { + k: Path(v) for k, v in json.loads(current).items() + } + + for key, path in self.current_files.items(): + self._load_event(path, key) + except Exception: self.logger.exception( - "Loading current event %s failed. Skipping it.", self.current_event + "Loading current events %s failed. Skipping it.", self.current_files ) - self.current_event = None - else: + self.current_events = {} + + if not self.current_files or self.max_time_current < datetime.datetime.now(): self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta + self.current_events = {} + + def _load_event(self, file_path: Path, key: str): + if file_path.exists(): + self.current_events[key] = MISPEvent() + self.current_events[key].load_file(file_path) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_events[key].info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + + self.min_time_current = min(last_min_time, self.min_time_current) + self.max_time_current = max(last_max_time, self.max_time_current) def process(self): - if not self.current_event or datetime.datetime.now() > self.max_time_current: + if datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta - self.current_event = MISPEvent() - self.current_event.info = "IntelMQ event {begin} - {end}" "".format( - begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat(), - ) - self.current_event.set_date(datetime.date.today()) - self.current_event.Orgc = self.misp_org - self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f"{self.current_event.uuid}.json" - with (self.output_dir / ".current").open("w") as f: - f.write(str(self.current_file)) - - # On startup or when timeout occurs, clean the queue to ensure we do not - # keep events forever because there was not enough generated + self._generate_feed() event = self.receive_message().to_dict(jsondict_as_string=True) @@ -127,28 +130,67 @@ def process(self): if cache_size is None: self._generate_feed(event) + elif not self.current_events: + # Always create the first event so we can keep track of the interval. + # It also ensures cleaning the queue after startup in case of awaiting + # messages from the previous run + self._generate_feed() elif cache_size >= self.bulk_save_count: self._generate_feed() self.acknowledge_message() + def _generate_new_event(self, key): + self.current_events[key] = MISPEvent() + self.current_events[key].info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) + self.current_events[key].set_date(datetime.date.today()) + self.current_events[key].Orgc = self.misp_org + self.current_events[key].uuid = str(uuid4()) + self.current_files[key] = ( + self.output_dir / f"{self.current_events[key].uuid}.json" + ) + with (self.output_dir / ".current").open("w") as f: + if not self.event_separator: + f.write(str(self.current_files[key])) + else: + json.dump({k: str(v) for k, v in self.current_files.items()}, f) + return self.current_events[key] + def _add_message_to_feed(self, message: dict): - obj = self.current_event.add_object(name="intelmq_event") + if not self.event_separator: + key = DEFAULT_KEY + else: + # For proper handling of nested fields + message_obj = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" + ) + key = message_obj.get(self.event_separator) or DEFAULT_KEY + + if key in self.current_events: + event = self.current_events[key] + else: + event = self._generate_new_event(key) + + obj = event.add_object(name="intelmq_event") if not self.attribute_mapping: self._default_mapping(obj, message) else: self._custom_mapping(obj, message) - def _default_mapping(self, obj: pymisp.MISPObject, message: dict): + def _default_mapping(self, obj: "MISPObject", message: dict): for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. - self.logger.warning( - "Object relation %s not exists in MISP definition, ignoring", - object_relation, - ) + if object_relation != "__type": + self.logger.warning( + "Object relation %s not exists in MISP definition, ignoring", + object_relation, + ) def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dict: # For caching and default mapping, the serialized version is the right format to work on. @@ -161,15 +203,15 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic for parameter, value in definition.items(): # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] - or value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] or + value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: result[parameter] = value return result - def _custom_mapping(self, obj: pymisp.MISPObject, message: dict): + def _custom_mapping(self, obj: "MISPObject", message: dict): for object_relation, definition in self.attribute_mapping.items(): obj.add_attribute( object_relation, @@ -187,9 +229,10 @@ def _generate_feed(self, message: dict = None): self._add_message_to_feed(message) message = self.cache_pop() - feed_output = self.current_event.to_feed(with_meta=False) - with self.current_file.open("w") as f: - json.dump(feed_output, f) + for key, event in self.current_events.items(): + feed_output = event.to_feed(with_meta=False) + with self.current_files[key].open("w") as f: + json.dump(feed_output, f) feed_meta_generator(self.output_dir) diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index abb4b9c368..31172a81bb 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -70,18 +70,19 @@ def test_accumulating_events(self): current_event = open(f"{self.directory.name}/.current").read() - # First, the feed is empty - not enough events came + # The first event is always immediately dumped to the MISP feed + # But the second wait until bulk saving size is achieved with open(current_event) as f: objects = json.load(f).get("Event", {}).get("Object", []) - assert len(objects) == 0 + assert len(objects) == 1 - self.input_message = [EXAMPLE_EVENT] - self.run_bot(parameters={"bulk_save_count": 3}) + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) # When enough events were collected, save them with open(current_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 3 + assert len(objects) == 4 self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] self.run_bot(iterations=3, parameters={"bulk_save_count": 3}) @@ -89,17 +90,19 @@ def test_accumulating_events(self): # We continue saving to the same file until interval timeout with open(current_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 6 + assert len(objects) == 7 # Simulating leftovers in the queue when it's time to generate new event Path(f"{self.directory.name}/.current").unlink() - self.bot.cache_put(MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True)) + self.bot.cache_put( + MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True) + ) self.run_bot(parameters={"bulk_save_count": 3}) new_event = open(f"{self.directory.name}/.current").read() with open(new_event) as f: objects = json.load(f)["Event"]["Object"] - assert len(objects) == 1 + assert len(objects) == 2 def test_attribute_mapping(self): self.run_bot( @@ -108,7 +111,7 @@ def test_attribute_mapping(self): "source.ip": {}, "feed.name": {"comment": "event_description.text"}, "destination.ip": {"to_ids": False}, - "malware.name": {"comment": "extra.non_ascii"} + "malware.name": {"comment": "extra.non_ascii"}, } } ) @@ -133,7 +136,9 @@ def test_attribute_mapping(self): assert feed_name["comment"] == EXAMPLE_EVENT["event_description.text"] destination_ip = next( - attr for attr in attributes if attr.get("object_relation") == "destination.ip" + attr + for attr in attributes + if attr.get("object_relation") == "destination.ip" ) assert destination_ip["value"] == EXAMPLE_EVENT["destination.ip"] assert destination_ip["to_ids"] is False @@ -144,6 +149,70 @@ def test_attribute_mapping(self): assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] + def test_event_separation(self): + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "malware.name": "another_malware"}, + EXAMPLE_EVENT, + ] + self.run_bot(iterations=3, parameters={"event_separator": "malware.name"}) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + + with open(current_events["salityp2p"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "salityp2p" + + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + malware_name = next( + attr["value"] + for attr in objects[0]["Attribute"] + if attr.get("object_relation") == "malware.name" + ) + assert malware_name == "another_malware" + + def test_event_separation_with_extra_and_bulk_save(self): + self.input_message = [ + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "first_malware"}, + {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, + ] + self.run_bot( + iterations=3, + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Only the initial event is saved, the rest is cached + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 1 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + self.input_message = {**EXAMPLE_EVENT, "extra.some_key": "first_malware"} + self.run_bot( + parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + ) + + # Now everything is saved + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + with open(current_events["another_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 + + with open(current_events["first_malware"]) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 2 def tearDown(self): self.cache.delete(self.bot_id) From baef44476464e0311faf2d732b4d7412cc4d2234 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Mon, 8 Jul 2024 13:59:23 +0200 Subject: [PATCH 06/23] FIX: Handle not existing fields with manual mapping --- intelmq/bots/outputs/misp/output_feed.py | 13 ++++++----- .../bots/outputs/misp/test_output_feed.py | 22 +++++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 9b5247c595..c1c165ec1d 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -213,12 +213,13 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic def _custom_mapping(self, obj: "MISPObject", message: dict): for object_relation, definition in self.attribute_mapping.items(): - obj.add_attribute( - object_relation, - value=message[object_relation], - **self._extract_misp_attribute_kwargs(message, definition), - ) - # In case of manual mapping, we want to fail if it produces incorrect values + if object_relation in message: + obj.add_attribute( + object_relation, + value=message[object_relation], + **self._extract_misp_attribute_kwargs(message, definition), + ) + # In case of manual mapping, we want to fail if it produces incorrect values def _generate_feed(self, message: dict = None): if message: diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 31172a81bb..c2b69e37b6 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -149,6 +149,28 @@ def test_attribute_mapping(self): assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] + def test_attribute_mapping_empty_field(self): + self.run_bot( + parameters={ + "attribute_mapping": { + "source.ip": {}, + "source.fqdn": {}, # not exists in the message + } + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + + assert len(objects) == 1 + attributes = objects[0].get("Attribute") + assert len(attributes) == 1 + source_ip = next( + attr for attr in attributes if attr.get("object_relation") == "source.ip" + ) + assert source_ip["value"] == "152.166.119.2" + def test_event_separation(self): self.input_message = [ EXAMPLE_EVENT, From cff9efe19eb3f995e2eeb6aec2808e7dec0a0015 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 9 Jul 2024 15:43:15 +0200 Subject: [PATCH 07/23] ENH: Add option to extend default info --- CHANGELOG.md | 1 + docs/user/bots.md | 27 ++++++++++++++++--- intelmq/bots/outputs/misp/output_feed.py | 7 ++++- .../bots/outputs/misp/test_output_feed.py | 21 +++++++++++++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5a21f3ea9..3003022c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -224,6 +224,7 @@ This is short list of the most important known issues. The full list can be retr - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). + - Add `additional_info` parameter to extend the default description of MISP Events (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index f8787fc3e7..639aaceed6 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4794,9 +4794,30 @@ as not usable for IDS. **`event_separator` -(optional, string): If set to a field name from IntelMQ event, the bot will group incoming messages -in separated MISP events, based on the value of this field. The `interval_event` parameter acts -for all grouping events together. +(optional, string): If set to a field name from IntelMQ event, the bot will work in parallel on a few +events instead of saving all incomming messages to a one. Each unique value from the field will +use its own MISP Event. This is useful if your feed provides data about multiple entities you would +like to group, for example IPs of C2 servers from different botnets. For a given value, the bot will +use the same MISP Event as long as it's allowed by the `interval_event`. + +**`additional_info` + +(optional, string): If set, the generated MISP Event will use it in the `info` field of the event, +in addition to the standard IntelMQ description with the time frame (you cannot remove it as the bot +depends of datetimes saved there). If you use `event_separator`, you may want to use `{separator}` +placeholder which will be then replaced with the value of the separator. + +For example, the following configuration can be used to create MISP Feed with IPs of C2 servers +of different botnets, having each botnet in a separated MISP Events with an appropiate description. +Each MISP Event will contain objects with the `source.ip` field only, and the events' info will look +like *C2 Servers for botnet-1. IntelMQ event 2024-07-09T14:51:10.825123 - 2024-07-10T14:51:10.825123* + +```yaml +event_separator: malware.name +additional_info: C2 Servers for {separator}. +attribute_mapping: + source.ip: +``` **Usage in MISP** diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index c1c165ec1d..5d80fa3db4 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -40,6 +40,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): _is_multithreadable: bool = False attribute_mapping: dict = None event_separator: str = None + additional_info: str = None @staticmethod def check_output_dir(dirname): @@ -142,10 +143,14 @@ def process(self): def _generate_new_event(self, key): self.current_events[key] = MISPEvent() - self.current_events[key].info = "IntelMQ event {begin} - {end}" "".format( + info = "IntelMQ event {begin} - {end}" "".format( begin=self.min_time_current.isoformat(), end=self.max_time_current.isoformat(), ) + if self.additional_info: + info = f"{self.additional_info.format(separator=key)} {info}" + + self.current_events[key].info = info self.current_events[key].set_date(datetime.date.today()) self.current_events[key].Orgc = self.misp_org self.current_events[key].uuid = str(uuid4()) diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index c2b69e37b6..0c175177d9 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -64,6 +64,27 @@ def test_event(self): objects = json.load(f).get("Event", {}).get("Object", []) assert len(objects) == 1 + def test_additional_info(self): + self.run_bot(parameters={"additional_info": "This is my custom info."}) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + info: str = json.load(f).get("Event", {}).get("info", "") + assert info.startswith("This is my custom info. IntelMQ event ") + + def test_additional_info_with_separator(self): + self.run_bot( + parameters={ + "additional_info": "Event related to {separator}.", + "event_separator": "malware.name", + } + ) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + with open(current_events["salityp2p"]) as f: + info: str = json.load(f).get("Event", {}).get("info", "") + assert info.startswith("Event related to salityp2p. IntelMQ event ") + def test_accumulating_events(self): self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) From f6869ab03e0376ffa42ba59019115c104378ce06 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 9 Jul 2024 15:46:38 +0200 Subject: [PATCH 08/23] Fix typos --- docs/user/bots.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/user/bots.md b/docs/user/bots.md index 639aaceed6..fe330ea692 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4795,7 +4795,7 @@ as not usable for IDS. **`event_separator` (optional, string): If set to a field name from IntelMQ event, the bot will work in parallel on a few -events instead of saving all incomming messages to a one. Each unique value from the field will +events instead of saving all incoming messages to a one. Each unique value from the field will use its own MISP Event. This is useful if your feed provides data about multiple entities you would like to group, for example IPs of C2 servers from different botnets. For a given value, the bot will use the same MISP Event as long as it's allowed by the `interval_event`. @@ -4808,7 +4808,7 @@ depends of datetimes saved there). If you use `event_separator`, you may want to placeholder which will be then replaced with the value of the separator. For example, the following configuration can be used to create MISP Feed with IPs of C2 servers -of different botnets, having each botnet in a separated MISP Events with an appropiate description. +of different botnets, having each botnet in a separated MISP Events with an appropriate description. Each MISP Event will contain objects with the `source.ip` field only, and the events' info will look like *C2 Servers for botnet-1. IntelMQ event 2024-07-09T14:51:10.825123 - 2024-07-10T14:51:10.825123* From bd7e0d1ddff1179b8c065aba3efb58b31f68b3a2 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 10 Jul 2024 11:25:05 +0200 Subject: [PATCH 09/23] ENH: add support for tagging --- intelmq/bots/outputs/misp/output_feed.py | 29 ++++++++++++++++++- .../bots/outputs/misp/test_output_feed.py | 13 +++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 5d80fa3db4..c559c117c1 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -19,7 +19,7 @@ from intelmq.lib.utils import parse_relative try: - from pymisp import MISPEvent, MISPObject, MISPOrganisation, NewAttributeError + from pymisp import MISPEvent, MISPObject, MISPOrganisation, MISPTag, NewAttributeError from pymisp.tools import feed_meta_generator except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 @@ -41,6 +41,10 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): attribute_mapping: dict = None event_separator: str = None additional_info: str = None + tagging: dict = None + # A structure like: + # __all__: list of tag kwargs for all events + # : list of tag kwargs per separator key @staticmethod def check_output_dir(dirname): @@ -98,6 +102,18 @@ def init(self): self.max_time_current = self.min_time_current + self.timedelta self.current_events = {} + self._tagging_objects = {} + if self.tagging: + for key, tag_list in self.tagging.items(): + self._tagging_objects[key] = list() + for kw in tag_list: + # For some reason, PyMISP do not uses classmethod, and from_dict requires + # unpacking. So this is really the way to initialize tag objects. + tag = MISPTag() + tag.from_dict(**kw) + self._tagging_objects[key].append(tag) + self.logger.debug("Generated tags: %r.", self._tagging_objects) + def _load_event(self, file_path: Path, key: str): if file_path.exists(): self.current_events[key] = MISPEvent() @@ -143,6 +159,14 @@ def process(self): def _generate_new_event(self, key): self.current_events[key] = MISPEvent() + + tags: list[MISPTag] = [] + if "__all__" in self._tagging_objects: + tags.extend(self._tagging_objects["__all__"]) + if key in self._tagging_objects: + tags.extend(self._tagging_objects[key]) + self.current_events[key].tags = tags + info = "IntelMQ event {begin} - {end}" "".format( begin=self.min_time_current.isoformat(), end=self.max_time_current.isoformat(), @@ -198,6 +222,9 @@ def _default_mapping(self, obj: "MISPObject", message: dict): ) def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dict: + """ + Creates a + """ # For caching and default mapping, the serialized version is the right format to work on. # However, for any custom mapping the Message object is more sufficient as it handles # subfields. diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 0c175177d9..d3b04442cf 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -257,6 +257,19 @@ def test_event_separation_with_extra_and_bulk_save(self): objects = json.load(f).get("Event", {}).get("Object", []) assert len(objects) == 2 + def test_tagging(self): + self.run_bot( + parameters={ + "tagging": {"__all__": [{"name": "tlp:unclear", "colour": "#7e7eae"}]} + } + ) + + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + def tearDown(self): self.cache.delete(self.bot_id) self.directory.cleanup() From 820bdec89587e9cc3b9770ce40ffc3285ca268d6 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Wed, 10 Jul 2024 15:08:45 +0200 Subject: [PATCH 10/23] Fix generating on restart --- intelmq/bots/outputs/misp/output_feed.py | 4 ++++ intelmq/lib/mixins/cache.py | 21 +++++++++++++-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index c559c117c1..513ca8c808 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -114,6 +114,10 @@ def init(self): self._tagging_objects[key].append(tag) self.logger.debug("Generated tags: %r.", self._tagging_objects) + if self.current_events and self.cache_length(): + # Ensure we do generate feed on reload / restart + self._generate_feed() + def _load_event(self, file_path: Path, key: str): if file_path.exists(): self.current_events[key] = MISPEvent() diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index ee945fbb53..5919d67535 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -15,15 +15,17 @@ class CacheMixin: """Provides caching possibilities for bots - For key-value cache, use methods: - cache_exists - cache_get - cache_set - - To store dict elements in a cache queue named after bot id, use methods: - cache_put - cache_pop + For key-value cache, use methods: + cache_exists + cache_get + cache_set + + To store dict elements in a cache queue named after bot id, use methods: + cache_put + cache_pop + cache_length """ + __redis: redis.Redis = None redis_cache_host: str = "127.0.0.1" redis_cache_port: int = 6379 @@ -70,6 +72,9 @@ def cache_put(self, value: dict) -> int: size = self.__redis.lpush(self.bot_id, json.dumps(value)) return size + def cache_length(self) -> int: + return self.__redis.llen(self.bot_id) + def cache_pop(self) -> dict: data = self.__redis.rpop(self.bot_id) if data is None: From d05acebe02a1ae68483f5336b9cdfba42ce87dcf Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 16 Jul 2024 13:56:03 +0200 Subject: [PATCH 11/23] ENH: Add tagging, check, and improved docs --- CHANGELOG.md | 1 + docs/user/bots.md | 46 ++- intelmq/bots/outputs/misp/output_feed.py | 284 ++++++++++++++---- .../bots/outputs/misp/test_output_feed.py | 116 ++++++- 4 files changed, 380 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3003022c9f..633d75aea7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -224,6 +224,7 @@ This is short list of the most important known issues. The full list can be retr - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). + - Add `tagging` parameter to allow adding tags to MISP events (PR#2509 by Kamil Mankowski). - Add `additional_info` parameter to extend the default description of MISP Events (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). diff --git a/docs/user/bots.md b/docs/user/bots.md index fe330ea692..7d4edd7278 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4736,6 +4736,12 @@ Create a directory layout in the MISP Feed format. The PyMISP library >= 2.4.119.1 is required, see [REQUIREMENTS.txt](https://github.com/certtools/intelmq/blob/master/intelmq/bots/outputs/misp/REQUIREMENTS.txt). +Note: please test the produced feed before using in production. This bot allows you to do an +extensive customisation of the MISP feed, including creating multiple events and tags, but it can +be tricky to configure properly. Misconfiguration can prevent bot from starting or have bad +consequences for your MISP Instance (e.g. spaming with events). Use `intelmqctl check` command +to validate your configuration against common mistakes. + **Module:** `intelmq.bots.outputs.misp.output_feed` **Parameters:** @@ -4765,7 +4771,7 @@ hour", string. (optional, int) If set to a non-0 value, the bot won't refresh the MISP feed immediately, but will cache incoming messages until the given number of them. Use it if your bot proceeds a high number of messages and constant saving to the disk is a problem. Reloading or restarting bot as well as generating -a new MISP event based on `interval_event` triggers saving regardless of the cache size. +a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size. **`attribute_mapping`** @@ -4776,6 +4782,10 @@ dictionary represents additional parameters PyMISP can take when creating an att names of other IntelMQ fields (then the value of such field will be used), or static values. If not needed, leave empty dict. +For available attribute parameters, refer to the +[PyMISP documentation](https://pymisp.readthedocs.io/en/latest/_modules/pymisp/mispevent.html#MISPObjectAttribute) +for the `MISPObjectAttribute`. + For example: ```yaml @@ -4819,6 +4829,40 @@ attribute_mapping: source.ip: ``` +**`tagging` + +(optional, dict): Allows setting MISP tags to MISP events. The structure is a *dict of list of dicts*. +The keys refers to which MISP events you want to tag. If you want to tag all of them, use `__all__`. +If you use `event_separator` and want to add additional tags to some events, use the expected values +of the separation field. The *list of dicts* defines MISP tags as parameters to create `MISPTag` +objects from. Each dictonary has to have at least `name`. For all available parameters refer to the +[PyMISP documentation](https://pymisp.readthedocs.io/en/latest/_modules/pymisp/abstract.html#MISPTag) +for `MISPTag`. + +Note: setting `name` is enough for MISP to match a correct tag from the global collection. You may +see it lacking the colour in the MISP Feed view, but it will be retriven after importing to your +instance. + +Example 1 - set two tags for every MISP event: + +```yaml +tagging: + __all__: + - name: tlp:red + - name: source:intelmq +``` + +Example 2 - create separated events based on `malware.name` and set additional family tag: + +```yaml +event_separator: malware.name +tagging: + __all__: + - name: tlp:red + njrat: + - name: njrat +``` + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 513ca8c808..f8d4850aba 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2019 Sebastian Wagner +# SPDX-FileCopyrightText: 2019 Sebastian Wagner, 2024 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -10,21 +10,25 @@ from uuid import uuid4 from intelmq import VAR_STATE_PATH -import pymisp from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError -from ....lib.message import MessageFactory +from intelmq.lib.message import Event, Message, MessageFactory from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: - from pymisp import MISPEvent, MISPObject, MISPOrganisation, MISPTag, NewAttributeError + from pymisp import ( + MISPEvent, + MISPObject, + MISPOrganisation, + MISPTag, + MISPObjectAttribute, + NewAttributeError, + ) from pymisp.tools import feed_meta_generator except ImportError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None - import_fail_reason = "import" DEFAULT_KEY = "default" @@ -33,18 +37,42 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" interval_event: str = "1 hour" - bulk_save_count: int = None misp_org_name = None misp_org_uuid = None output_dir: str = f"{VAR_STATE_PATH}mispfeed-output" # TODO: should be path - _is_multithreadable: bool = False - attribute_mapping: dict = None - event_separator: str = None + # Enables regenerating the MISP feed after collecting given number of messages + bulk_save_count: int = None + + # Additional information to be added to the MISP event description additional_info: str = None + + # An optional field used to create multiple MISP events from incoming messages + event_separator: str = None + + # Optional non-standard mapping of message fields to MISP object attributes + # The structure is like: + # {: {}} + # For example: + # {"source.ip": {"comment": "This is the source of the event"}} + # will include only the "source.ip" field in the MISP object attributes, + # and set the comment + attribute_mapping: dict = None + + # Optional definition to add tags to the MISP event. It should be a dict where keys are + # '__all__' (to add tags for every event) or, if the event_separator is used, the separator + # values. For each key, there should be a list of dicts defining parameters for the MISPTag + # object, but only the "name" is required to set. + # For example: + # {"__all__": [{"name": "tag1"}, {"name": "tag2"}]} + # will add two tags to every event + # {"infostealer": [{"name": "type:infostealer"}], "__all__": [{"name": "tag1"}]} + # will add two tags to every event separated by "infostealer", and + # one tag to every other event tagging: dict = None - # A structure like: - # __all__: list of tag kwargs for all events - # : list of tag kwargs per separator key + + # Delaying reloading would delay saving eventually long-awaiting messages + _sighup_delay = False + _is_multithreadable: bool = False @staticmethod def check_output_dir(dirname): @@ -112,11 +140,13 @@ def init(self): tag = MISPTag() tag.from_dict(**kw) self._tagging_objects[key].append(tag) - self.logger.debug("Generated tags: %r.", self._tagging_objects) - if self.current_events and self.cache_length(): - # Ensure we do generate feed on reload / restart - self._generate_feed() + # Ensure we do generate feed on reload / restart, so awaiting messages won't wait forever + if self.cache_length() and not getattr(self, "testing", False): + self.logger.debug( + "Found %s awaiting messages. Generating feed.", self.cache_length() + ) + self._generate_misp_feed() def _load_event(self, file_path: Path, key: str): if file_path.exists(): @@ -141,7 +171,7 @@ def process(self): self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta - self._generate_feed() + self._generate_misp_feed() event = self.receive_message().to_dict(jsondict_as_string=True) @@ -150,18 +180,16 @@ def process(self): cache_size = self.cache_put(event) if cache_size is None: - self._generate_feed(event) + self._generate_misp_feed(event) elif not self.current_events: # Always create the first event so we can keep track of the interval. - # It also ensures cleaning the queue after startup in case of awaiting - # messages from the previous run - self._generate_feed() + self._generate_misp_feed() elif cache_size >= self.bulk_save_count: - self._generate_feed() + self._generate_misp_feed() self.acknowledge_message() - def _generate_new_event(self, key): + def _generate_new_misp_event(self, key): self.current_events[key] = MISPEvent() tags: list[MISPTag] = [] @@ -192,26 +220,29 @@ def _generate_new_event(self, key): json.dump({k: str(v) for k, v in self.current_files.items()}, f) return self.current_events[key] - def _add_message_to_feed(self, message: dict): + def _add_message_to_misp_event(self, message: dict): + # For proper handling of nested fields, we need the object + message_obj = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" + ) if not self.event_separator: key = DEFAULT_KEY else: - # For proper handling of nested fields - message_obj = MessageFactory.from_dict( - message, harmonization=self.harmonization, default_type="Event" - ) key = message_obj.get(self.event_separator) or DEFAULT_KEY if key in self.current_events: event = self.current_events[key] else: - event = self._generate_new_event(key) + event = self._generate_new_misp_event(key) obj = event.add_object(name="intelmq_event") + # For caching and default mapping, the serialized version is the right format to work on. + # However, for any custom mapping the Message object is more sufficient as it handles + # subfields. if not self.attribute_mapping: self._default_mapping(obj, message) else: - self._custom_mapping(obj, message) + self._custom_mapping(obj, message_obj) def _default_mapping(self, obj: "MISPObject", message: dict): for object_relation, value in message.items(): @@ -226,28 +257,21 @@ def _default_mapping(self, obj: "MISPObject", message: dict): ) def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dict: - """ - Creates a - """ - # For caching and default mapping, the serialized version is the right format to work on. - # However, for any custom mapping the Message object is more sufficient as it handles - # subfields. - message = MessageFactory.from_dict( - message, harmonization=self.harmonization, default_type="Event" - ) + """Creates the a dict with arguments to create a MISPObjectAttribute.""" result = {} for parameter, value in definition.items(): # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] or - value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] + or value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: result[parameter] = value return result - def _custom_mapping(self, obj: "MISPObject", message: dict): + def _custom_mapping(self, obj: "MISPObject", message: Message): + """Map the IntelMQ event to the MISP Object using the custom mapping definition.""" for object_relation, definition in self.attribute_mapping.items(): if object_relation in message: obj.add_attribute( @@ -255,15 +279,15 @@ def _custom_mapping(self, obj: "MISPObject", message: dict): value=message[object_relation], **self._extract_misp_attribute_kwargs(message, definition), ) - # In case of manual mapping, we want to fail if it produces incorrect values + # In case of custom mapping, we want to fail if it produces incorrect values - def _generate_feed(self, message: dict = None): + def _generate_misp_feed(self, message: dict = None): if message: - self._add_message_to_feed(message) + self._add_message_to_misp_event(message) message = self.cache_pop() while message: - self._add_message_to_feed(message) + self._add_message_to_misp_event(message) message = self.cache_pop() for key, event in self.current_events.items(): @@ -275,27 +299,163 @@ def _generate_feed(self, message: dict = None): @staticmethod def check(parameters): + results = [] if "output_dir" not in parameters: - return [["error", "Parameter 'output_dir' not given."]] - try: - created = MISPFeedOutputBot.check_output_dir(parameters["output_dir"]) - except OSError: - return [ + results.append(["error", "Parameter 'output_dir' not given."]) + else: + try: + created = MISPFeedOutputBot.check_output_dir(parameters["output_dir"]) + except OSError: + results.append( + [ + "error", + "Directory %r of parameter 'output_dir' does not exist and could not be created." + % parameters["output_dir"], + ] + ) + else: + if created: + results.append( + [ + "info", + "Directory %r of parameter 'output_dir' did not exist, but has now been created." + "" % parameters["output_dir"], + ] + ) + + bulk_save_count = parameters.get("bulk_save_count") + if bulk_save_count and not isinstance(bulk_save_count, int): + results.append( + ["error", "Parameter 'bulk_save_count' has to be int if set."] + ) + + sanity_event = Event({}) + event_separator = parameters.get("event_separator") + if ( + event_separator + and not sanity_event._Message__is_valid_key(event_separator)[0] + ): + results.append( [ "error", - "Directory %r of parameter 'output_dir' does not exist and could not be created." - % parameters["output_dir"], + f"Value {event_separator} in 'event_separator' is not a valid event key.", ] - ] - else: - if created: - return [ + ) + + not_feed_field_warning = ( + "Parameter '{parameter}' of {context} looks like not being a field exportable to" + " MISP Feed. It may be a valid PyMISP parameter, but won't be exported to the feed." + " Please ensure it's intended and consult PyMISP documentation at https://pymisp.readthedocs.io/" + " for valid parameters for the {object}." + ) + attribute_mapping = parameters.get("attribute_mapping") + if attribute_mapping: + if not isinstance(attribute_mapping, dict): + results.append( + ["error", "Parameter 'attribute_mapping has to be a dictionary."] + ) + else: + for key, value in attribute_mapping.items(): + if not sanity_event._Message__is_valid_key(key)[0]: + results.append( + [ + "error", + f"The key '{key}' in attribute_mapping is not a valid IDF field.", + ] + ) + if not isinstance(value, dict): + results.append( + [ + "error", + f"The config attribute_mapping['{key}'] should be a " + "dict with parameters for MISPObjectAttribute.", + ] + ) + else: + for parameter in value.keys(): + if parameter not in MISPObjectAttribute._fields_for_feed: + results.append( + [ + "warning", + not_feed_field_warning.format( + parameter=parameter, + context=f"attribute_mapping['{key}']", + object="MISPObjectAttribute", + ), + ] + ) + + tagging = parameters.get("tagging") + if tagging: + tagging_error = ( + "should be a list of dictionaries with parameters for the MISPTag object." + " Please consult PyMISP documentation at https://pymisp.readthedocs.io/" + " to find valid fields." + ) + if not isinstance(tagging, dict): + results.append( [ - "info", - "Directory %r of parameter 'output_dir' did not exist, but has now been created." - "" % parameters["output_dir"], + "error", + ( + "Parameter 'tagging' has to be a dictionary with keys as '__all__' " + "or possible 'event_separator' values. Each dictionary value " + + tagging_error, + ), ] - ] + ) + else: + if not event_separator and ( + "__all__" not in tagging or len(tagging.keys()) > 1 + ): + results.append( + [ + "error", + ( + "Tagging configuration expects custom values, but the 'event_separator'" + " parameter is not set. If you want to just tag all events, use only" + " the '__all__' key." + ), + ] + ) + for key, value in tagging.items(): + if not isinstance(value, list): + results.append( + [ + "error", + f"The config tagging['{key}'] {tagging_error}", + ] + ) + else: + for tag in value: + if not isinstance(tag, dict): + results.append( + [ + "error", + f"The config tagging['{key}'] {tagging_error}", + ] + ) + else: + if "name" not in tag: + results.append( + [ + "error", + f"The config tagging['{key}'] contains a tag without 'name'.", + ] + ) + for parameter in tag.keys(): + if parameter not in MISPTag._fields_for_feed: + results.append( + [ + "warning", + not_feed_field_warning.format( + parameter=parameter, + context=f"tagging['{key}']", + object="MISPTag", + ), + ] + ) + + return results or None BOT = MISPFeedOutputBot diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index d3b04442cf..f27b367164 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -4,6 +4,7 @@ # -*- coding: utf-8 -*- import json +import select import unittest from pathlib import Path from tempfile import TemporaryDirectory @@ -86,6 +87,9 @@ def test_additional_info_with_separator(self): assert info.startswith("Event related to salityp2p. IntelMQ event ") def test_accumulating_events(self): + """Ensures bot first collects events and then saves them in bulks to MISP feed, + and also respects the event interval to create a new event periodically. + """ self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] self.run_bot(iterations=2, parameters={"bulk_save_count": 3}) @@ -126,6 +130,8 @@ def test_accumulating_events(self): assert len(objects) == 2 def test_attribute_mapping(self): + """Tests custom attribute mapping that selects just a subset of fields to export + and allows including custom parameters for MISPObjectAttribute, like comments.""" self.run_bot( parameters={ "attribute_mapping": { @@ -170,7 +176,7 @@ def test_attribute_mapping(self): assert malware_name["value"] == EXAMPLE_EVENT["malware.name"] assert malware_name["comment"] == EXAMPLE_EVENT["extra.non_ascii"] - def test_attribute_mapping_empty_field(self): + def test_attribute_mapping_omitted_when_field_is_empty(self): self.run_bot( parameters={ "attribute_mapping": { @@ -193,6 +199,8 @@ def test_attribute_mapping_empty_field(self): assert source_ip["value"] == "152.166.119.2" def test_event_separation(self): + """Tests that based on the value of the given field, incoming messages are put in separated + MISP events.""" self.input_message = [ EXAMPLE_EVENT, {**EXAMPLE_EVENT, "malware.name": "another_malware"}, @@ -258,17 +266,117 @@ def test_event_separation_with_extra_and_bulk_save(self): assert len(objects) == 2 def test_tagging(self): + """Ensures MISP events get correct MISP tags""" self.run_bot( parameters={ - "tagging": {"__all__": [{"name": "tlp:unclear", "colour": "#7e7eae"}]} + "tagging": { + "__all__": [ + {"name": "tlp:unclear", "colour": "#7e7eae"}, + {"name": "source:intelmq"}, + ] + } } ) current_event = open(f"{self.directory.name}/.current").read() with open(current_event) as f: - objects = json.load(f).get("Event", {}).get("Object", []) - assert len(objects) == 1 + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + + tlp = next(t for t in tags if t["name"] == "tlp:unclear") + assert tlp["colour"] == "#7e7eae" + + def test_tagging_and_event_separation(self): + """When separating events, it is possible to add different MISP tags to specific MISP + events.""" + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "malware.name": "another_malware"}, + ] + self.run_bot( + iterations=2, + parameters={ + "event_separator": "malware.name", + "tagging": { + "__all__": [{"name": "source:intelmq"}], + "salityp2p": [{"name": "family:salityp2p"}], + "another_malware": [{"name": "family:malware_2"}], + }, + }, + ) + + current_events = json.loads(open(f"{self.directory.name}/.current").read()) + assert len(current_events) == 2 + + with open(current_events["salityp2p"]) as f: + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + assert next(t for t in tags if t["name"] == "source:intelmq") + assert next(t for t in tags if t["name"] == "family:salityp2p") + with open(current_events["another_malware"]) as f: + tags = json.load(f).get("Event", {}).get("Tag", []) + assert len(tags) == 2 + assert next(t for t in tags if t["name"] == "source:intelmq") + assert next(t for t in tags if t["name"] == "family:malware_2") + + def test_parameter_check_correct(self): + result = self.bot_reference.check( + { + **self.sysconfig, + "attribute_mapping": { + "source.ip": {}, + "feed.name": {"comment": "event_description.text"}, + "destination.ip": {"to_ids": False, "comment": "Possible FP"}, + "malware.name": {"comment": "extra.non_ascii"}, + }, + "event_separator": "extra.botnet", + "bulk_save_count": 10, + "tagging": { + "__all__": [{"name": "source:feed", "colour": "#000000"}], + "abotnet": [{"name": "type:botnet"}], + }, + } + ) + assert result is None + + def test_parameter_check_errors(self): + cases = [ + {"bulk_save_count": "not-a-number"}, + {"event_separator": "not-a-field"}, + {"attribute_mapping": "not-a-dict"}, + {"attribute_mapping": {"not-a-field": {}}}, + {"attribute_mapping": {"source.ip": "not-a-dict"}}, + {"tagging": {"not-all": []}}, # without event_separator, only __all__ is allowed + {"tagging": {"__all__": [], "other": []}}, + {"event_separator": "malware.name", "tagging": ["not", "a", "dict"]}, + { + "event_separator": "malware.name", + "tagging": {"case": "must-be-list-of-dicts"}, + }, + { + "event_separator": "malware.name", + "tagging": {"case": ["must-be-list-of-dicts"]}, + }, + { + "event_separator": "malware.name", + "tagging": {"case": [{"must": "have a name"}]}, + }, + ] + for case in cases: + with self.subTest(): + result = self.bot_reference.check({**self.sysconfig, **case}) + assert len(list(r for r in result if r[0] == "error")) == 1 + + def test_parameter_check_warnings(self): + cases = [ + {"attribute_mapping": {"source.ip": {"not-a-feed-arg": "any"}}}, + {"tagging": {"case": [{"name": "", "not-a-feed-arg": "any"}]}}, + ] + for case in cases: + with self.subTest(): + result = self.bot_reference.check({**self.sysconfig, **case}) + assert len(list(r for r in result if r[0] == "warning")) == 1 def tearDown(self): self.cache.delete(self.bot_id) From 26f161fb81032369718fad88f1ebfa04fa980f14 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 16 Jul 2024 14:25:10 +0200 Subject: [PATCH 12/23] DOC: Update documentation about CacheMixin --- CHANGELOG.md | 2 ++ docs/dev/bot-development.md | 14 +++++++++++++- intelmq/lib/mixins/cache.py | 2 +- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 633d75aea7..c355148af9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Please refer to the [NEWS](NEWS.md) for a list of changes which have an affect o - `intelmq.lib.datatypes`: Remove unneeded Dict39 alias (PR#2639 by Nakul Rajpal, fixes #2635) - `intelmq.lib.mixins.http`: Only set HTTP header 'Authorization' if username or password are set and are not both empty string as they are by default in the Manager (fixes #2590, PR#2634 by Sebastian Wagner). - `intelmq.lib.message.Message.from_dict`: Do not modify the dict parameter by adding the `__type` field and raise an error when type is not determinable (PR#2545 by Sebastian Wagner). +- `intelmq.lib.mixins.cache.CacheMixin` was extended to support temporary storing messages in a cache queue + (PR#2509 by Kamil Mankowski). ### Development diff --git a/docs/dev/bot-development.md b/docs/dev/bot-development.md index 2aba2adcab..f611daafbb 100644 --- a/docs/dev/bot-development.md +++ b/docs/dev/bot-development.md @@ -197,7 +197,7 @@ The `CacheMixin` provides methods to cache values for bots in a Redis database. - `redis_cache_ttl: int = 15` - `redis_cache_password: Optional[str] = None` -and provides the methods: +and provides the methods to cache key-value pairs: - `cache_exists` - `cache_get` @@ -205,6 +205,18 @@ and provides the methods: - `cache_flush` - `cache_get_redis_instance` +and following methods to cache objects in a queue: + +- `cache_put` +- `cache_pop` +- `cache_length`. + +Caching key-value pairs and queue caching are two separated mechanisms. The first is designed + for arbitrary values, the second one is focused on temporary storing messages (but can handle other + data). You won't see caches from one in the another. For example, if adding a key-value pair using + `cache_set`, it does not change the value from `cache_length`, and if adding an element using + `cache_put` you cannot use `check_exists` to look for it. + ### Pipeline Interactions We can call three methods related to the pipeline: diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 5919d67535..01465ae3df 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -13,7 +13,7 @@ class CacheMixin: - """Provides caching possibilities for bots + """Provides caching possibilities for bots, see also https://docs.intelmq.org/latest/dev/bot-development/#mixins For key-value cache, use methods: cache_exists From 93790dfae4be1945ab8462ba516c9b0fa72a36ba Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 16 Jul 2024 14:11:02 +0200 Subject: [PATCH 13/23] Adjust to pycodestyle --- intelmq/bots/outputs/misp/output_feed.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index f8d4850aba..d45932a332 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -262,8 +262,8 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic for parameter, value in definition.items(): # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] - or value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] or + value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: @@ -332,8 +332,8 @@ def check(parameters): sanity_event = Event({}) event_separator = parameters.get("event_separator") if ( - event_separator - and not sanity_event._Message__is_valid_key(event_separator)[0] + event_separator and not + sanity_event._Message__is_valid_key(event_separator)[0] ): results.append( [ @@ -398,8 +398,8 @@ def check(parameters): "error", ( "Parameter 'tagging' has to be a dictionary with keys as '__all__' " - "or possible 'event_separator' values. Each dictionary value " - + tagging_error, + "or possible 'event_separator' values. Each dictionary value " + + tagging_error, ), ] ) From 1a980e01efaa31a7753b15d8527e6ba34aa34854 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 16 Jul 2024 14:26:16 +0200 Subject: [PATCH 14/23] Fix typo --- docs/user/bots.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/bots.md b/docs/user/bots.md index 7d4edd7278..295b7aedc8 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4835,7 +4835,7 @@ attribute_mapping: The keys refers to which MISP events you want to tag. If you want to tag all of them, use `__all__`. If you use `event_separator` and want to add additional tags to some events, use the expected values of the separation field. The *list of dicts* defines MISP tags as parameters to create `MISPTag` -objects from. Each dictonary has to have at least `name`. For all available parameters refer to the +objects from. Each dictionary has to have at least `name`. For all available parameters refer to the [PyMISP documentation](https://pymisp.readthedocs.io/en/latest/_modules/pymisp/abstract.html#MISPTag) for `MISPTag`. From 6577a90885c031432fa7fab0a327c570464aa572 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 16 Jul 2024 14:46:50 +0200 Subject: [PATCH 15/23] Clean up imports in tests --- intelmq/tests/bots/outputs/misp/test_output_feed.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index f27b367164..5fedb657f7 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -4,14 +4,13 @@ # -*- coding: utf-8 -*- import json -import select import unittest from pathlib import Path from tempfile import TemporaryDirectory -from .....lib.message import Message, MessageFactory import intelmq.lib.test as test from intelmq.bots.outputs.misp.output_feed import MISPFeedOutputBot +from intelmq.lib.message import MessageFactory EXAMPLE_EVENT = { "classification.type": "infected-system", @@ -347,7 +346,9 @@ def test_parameter_check_errors(self): {"attribute_mapping": "not-a-dict"}, {"attribute_mapping": {"not-a-field": {}}}, {"attribute_mapping": {"source.ip": "not-a-dict"}}, - {"tagging": {"not-all": []}}, # without event_separator, only __all__ is allowed + { + "tagging": {"not-all": []} + }, # without event_separator, only __all__ is allowed {"tagging": {"__all__": [], "other": []}}, {"event_separator": "malware.name", "tagging": ["not", "a", "dict"]}, { From 0c1dcf15782e4df768ba44878f380b3546d54b98 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Mon, 27 Oct 2025 15:33:14 +0100 Subject: [PATCH 16/23] Add option for flat structure --- intelmq/bots/outputs/misp/output_feed.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index d45932a332..6a6ae3c979 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -7,6 +7,7 @@ import json import re from pathlib import Path +from typing import Union from uuid import uuid4 from intelmq import VAR_STATE_PATH @@ -70,6 +71,9 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): # one tag to every other event tagging: dict = None + # do not create objects in the MISP events, add data directly as event attributes + flat_events: bool = False + # Delaying reloading would delay saving eventually long-awaiting messages _sighup_delay = False _is_multithreadable: bool = False @@ -235,7 +239,10 @@ def _add_message_to_misp_event(self, message: dict): else: event = self._generate_new_misp_event(key) - obj = event.add_object(name="intelmq_event") + if not self.flat_events: + obj = event.add_object(name="intelmq_event") + else: + obj = event # For caching and default mapping, the serialized version is the right format to work on. # However, for any custom mapping the Message object is more sufficient as it handles # subfields. @@ -244,7 +251,7 @@ def _add_message_to_misp_event(self, message: dict): else: self._custom_mapping(obj, message_obj) - def _default_mapping(self, obj: "MISPObject", message: dict): + def _default_mapping(self, obj: Union["MISPObject", "MISPEvent"], message: dict): for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) @@ -270,7 +277,7 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic result[parameter] = value return result - def _custom_mapping(self, obj: "MISPObject", message: Message): + def _custom_mapping(self, obj: Union["MISPObject", "MISPEvent"], message: Message): """Map the IntelMQ event to the MISP Object using the custom mapping definition.""" for object_relation, definition in self.attribute_mapping.items(): if object_relation in message: From 352c4877b28223a6727be9a0737cc71f8952efcc Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Mon, 27 Oct 2025 15:36:55 +0100 Subject: [PATCH 17/23] Add changing attribute type --- intelmq/bots/outputs/misp/output_feed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 6a6ae3c979..45b126a26b 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -282,7 +282,7 @@ def _custom_mapping(self, obj: Union["MISPObject", "MISPEvent"], message: Messag for object_relation, definition in self.attribute_mapping.items(): if object_relation in message: obj.add_attribute( - object_relation, + definition.get("type") or object_relation, value=message[object_relation], **self._extract_misp_attribute_kwargs(message, definition), ) From 6c5317e602482b49aa006aa46a4269cecaf97f7b Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Mon, 27 Oct 2025 15:49:55 +0100 Subject: [PATCH 18/23] Fix positional arguments --- intelmq/bots/outputs/misp/output_feed.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 45b126a26b..b25e92dab9 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -267,6 +267,9 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic """Creates the a dict with arguments to create a MISPObjectAttribute.""" result = {} for parameter, value in definition.items(): + # This is extracted as the first positional argument + if parameter == "type": + continue # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( value in self.harmonization["event"] or From fbfdecc59cac6d2e345a6bd2e24f6d3eaa2a258e Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 28 Oct 2025 11:26:09 +0100 Subject: [PATCH 19/23] Improved documentation, renamed parameters, lower-level cache API --- docs/dev/bot-development.md | 32 ++++++++++--- docs/user/bots.md | 57 +++++++++++++++++++++--- intelmq/bots/outputs/misp/output_feed.py | 42 ++++++++--------- intelmq/lib/mixins/cache.py | 22 ++++----- 4 files changed, 108 insertions(+), 45 deletions(-) diff --git a/docs/dev/bot-development.md b/docs/dev/bot-development.md index f611daafbb..85b18967af 100644 --- a/docs/dev/bot-development.md +++ b/docs/dev/bot-development.md @@ -207,15 +207,37 @@ and provides the methods to cache key-value pairs: and following methods to cache objects in a queue: -- `cache_put` -- `cache_pop` -- `cache_length`. +- `cache_lpush` +- `cache_rpop` +- `cache_llen`. Caching key-value pairs and queue caching are two separated mechanisms. The first is designed for arbitrary values, the second one is focused on temporary storing messages (but can handle other data). You won't see caches from one in the another. For example, if adding a key-value pair using - `cache_set`, it does not change the value from `cache_length`, and if adding an element using - `cache_put` you cannot use `check_exists` to look for it. + `cache_set`, it does not change the value from `cache_llen`, and if adding an element using + `cache_lpush` you cannot use `check_exists` to look for it. + +When using queue-based caching, you have to serialize object to a format accepted by Redis/Valkey +as the underlying storage. For example, to store a message in a queue using bot ID as key, you can +use code like: + +```python +msmessage = self.receive_message().to_dict(jsondict_as_string=True) +self.cache_lpush(self.bot_id, json.dumps(message)) +``` + +and to retrieve a message from the cache: + +```python +data = self.cache_pop() +if data is None: + return # handle empty cache +message = json.loads(data) +# to use it as Message object +message_obj = MessageFactory.from_dict( + message, harmonization=self.harmonization, default_type="Event" +) +``` ### Pipeline Interactions diff --git a/docs/user/bots.md b/docs/user/bots.md index 295b7aedc8..43557997ef 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4772,6 +4772,7 @@ hour", string. incoming messages until the given number of them. Use it if your bot proceeds a high number of messages and constant saving to the disk is a problem. Reloading or restarting bot as well as generating a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size. +To ensure saving on reload without any delay, you should also set `_sighup_delay` parameter. **`attribute_mapping`** @@ -4790,7 +4791,7 @@ For example: ```yaml attribute_mapping: - source.ip: + source.ip: {} feed.name: comment: event_description.text destination.ip: @@ -4800,9 +4801,9 @@ attribute_mapping: would create a MISP object with three attributes `source.ip`, `feed.name` and `destination.ip` and set their values as in the IntelMQ event. In addition, the `feed.name` would have a comment as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set -as not usable for IDS. +as not usable for IDS. You can use `type` key to overwrite the attribute type. -**`event_separator` +**`grouping_key` (optional, string): If set to a field name from IntelMQ event, the bot will work in parallel on a few events instead of saving all incoming messages to a one. Each unique value from the field will @@ -4814,8 +4815,8 @@ use the same MISP Event as long as it's allowed by the `interval_event`. (optional, string): If set, the generated MISP Event will use it in the `info` field of the event, in addition to the standard IntelMQ description with the time frame (you cannot remove it as the bot -depends of datetimes saved there). If you use `event_separator`, you may want to use `{separator}` -placeholder which will be then replaced with the value of the separator. +depends of datetimes saved there). If you use `grouping_key`, you may want to use `{key}` +placeholder which will be then replaced with the value of the grouping key. For example, the following configuration can be used to create MISP Feed with IPs of C2 servers of different botnets, having each botnet in a separated MISP Events with an appropriate description. @@ -4823,8 +4824,8 @@ Each MISP Event will contain objects with the `source.ip` field only, and the ev like *C2 Servers for botnet-1. IntelMQ event 2024-07-09T14:51:10.825123 - 2024-07-10T14:51:10.825123* ```yaml -event_separator: malware.name -additional_info: C2 Servers for {separator}. +grouping_key: malware.name +additional_info: C2 Servers for {key}. attribute_mapping: source.ip: ``` @@ -4863,6 +4864,48 @@ tagging: - name: njrat ``` +** `flat_events` + +(optional, bool): instead of creating an object for every incomming IntelMQ message, it will add +attributes directly to the MISP event. Useful if your want to export just a list of data, e.g. +C2 domains, without having to group some attributes together. By default set to `False`. + +**Example** + +For example, if you have a source that sends C2 domains for multiple malware families, +you can use the following bot's configuration: + +```yaml +parameters: + destination_queues: {} + # you have to configure your webserver to expose this path for MISP + output_dir: "/var/lib/intelmq/bots/your_feed/" + misp_org_name: My Organisation + misp_org_uuid: Your-Org-UUID + interval_event: 1 day + grouping_key: "malware.name" + bulk_save_count: 100 + additional_info: "{key} - " + flat_events: true + attribute_mapping: + source.fqdn: + comment: malware.name + type: domain + category: "Network activity" + to_ids: true + tagging: + __all__: + - name: tlp:amber + # ensure saving on reload + _sighup_delay: false +``` + +As a result, you will get MISP feed that creates one event per malware family every day. In the event, +there will be just C2 domains with the IDS flag set and the malware name as comment. In addition, all +events will be tagged with `tlp:amber` and also have the malware name in the comment, together with +the information about the time period. The MISP Feed will be saved to disk after accumulating 100 C2 +domains or on reload/restart. + **Usage in MISP** Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index b25e92dab9..3b90643df4 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -48,9 +48,10 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): additional_info: str = None # An optional field used to create multiple MISP events from incoming messages - event_separator: str = None + grouping_key: str = None # Optional non-standard mapping of message fields to MISP object attributes + # You can overwrite the attribute type by providing the 'type' argument # The structure is like: # {: {}} # For example: @@ -60,7 +61,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): attribute_mapping: dict = None # Optional definition to add tags to the MISP event. It should be a dict where keys are - # '__all__' (to add tags for every event) or, if the event_separator is used, the separator + # '__all__' (to add tags for every event) or, if the grouping_key is used, the key # values. For each key, there should be a list of dicts defining parameters for the MISPTag # object, but only the "name" is required to set. # For example: @@ -114,7 +115,7 @@ def init(self): with (self.output_dir / ".current").open() as f: current = f.read() - if not self.event_separator: + if not self.grouping_key: self.current_files[DEFAULT_KEY] = Path(current) else: self.current_files = { @@ -146,9 +147,9 @@ def init(self): self._tagging_objects[key].append(tag) # Ensure we do generate feed on reload / restart, so awaiting messages won't wait forever - if self.cache_length() and not getattr(self, "testing", False): + if length := self.cache_llen(self.bot_id) and not getattr(self, "testing", False): self.logger.debug( - "Found %s awaiting messages. Generating feed.", self.cache_length() + "Found %s awaiting messages. Generating feed.", length ) self._generate_misp_feed() @@ -181,7 +182,7 @@ def process(self): cache_size = None if self.bulk_save_count: - cache_size = self.cache_put(event) + cache_size = self.cache_lpush(self.bot_id, json.dumps(event)) if cache_size is None: self._generate_misp_feed(event) @@ -208,7 +209,7 @@ def _generate_new_misp_event(self, key): end=self.max_time_current.isoformat(), ) if self.additional_info: - info = f"{self.additional_info.format(separator=key)} {info}" + info = f"{self.additional_info.format(key=key)} {info}" self.current_events[key].info = info self.current_events[key].set_date(datetime.date.today()) @@ -218,7 +219,7 @@ def _generate_new_misp_event(self, key): self.output_dir / f"{self.current_events[key].uuid}.json" ) with (self.output_dir / ".current").open("w") as f: - if not self.event_separator: + if not self.grouping_key: f.write(str(self.current_files[key])) else: json.dump({k: str(v) for k, v in self.current_files.items()}, f) @@ -229,10 +230,10 @@ def _add_message_to_misp_event(self, message: dict): message_obj = MessageFactory.from_dict( message, harmonization=self.harmonization, default_type="Event" ) - if not self.event_separator: + if not self.grouping_key: key = DEFAULT_KEY else: - key = message_obj.get(self.event_separator) or DEFAULT_KEY + key = message_obj.get(self.grouping_key) or DEFAULT_KEY if key in self.current_events: event = self.current_events[key] @@ -295,10 +296,11 @@ def _generate_misp_feed(self, message: dict = None): if message: self._add_message_to_misp_event(message) - message = self.cache_pop() - while message: + cached_msg = self.cache_rpop(self.bot_id) + while cached_msg: + message = json.loads(cached_msg) self._add_message_to_misp_event(message) - message = self.cache_pop() + cached_msg = self.cache_rpop(self.bot_id) for key, event in self.current_events.items(): feed_output = event.to_feed(with_meta=False) @@ -340,15 +342,15 @@ def check(parameters): ) sanity_event = Event({}) - event_separator = parameters.get("event_separator") + grouping_key = parameters.get("grouping_key") if ( - event_separator and not - sanity_event._Message__is_valid_key(event_separator)[0] + grouping_key and not + sanity_event._Message__is_valid_key(grouping_key)[0] ): results.append( [ "error", - f"Value {event_separator} in 'event_separator' is not a valid event key.", + f"Value {grouping_key} in 'grouping_key' is not a valid event key.", ] ) @@ -408,20 +410,20 @@ def check(parameters): "error", ( "Parameter 'tagging' has to be a dictionary with keys as '__all__' " - "or possible 'event_separator' values. Each dictionary value " + + "or possible 'grouping_key' values. Each dictionary value " + tagging_error, ), ] ) else: - if not event_separator and ( + if not grouping_key and ( "__all__" not in tagging or len(tagging.keys()) > 1 ): results.append( [ "error", ( - "Tagging configuration expects custom values, but the 'event_separator'" + "Tagging configuration expects custom values, but the 'grouping_key'" " parameter is not set. If you want to just tag all events, use only" " the '__all__' key." ), diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 01465ae3df..ed8c2b025c 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -21,9 +21,9 @@ class CacheMixin: cache_set To store dict elements in a cache queue named after bot id, use methods: - cache_put - cache_pop - cache_length + cache_lpush + cache_rpop + cache_llen """ __redis: redis.Redis = None @@ -67,19 +67,15 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None): if self.redis_cache_ttl: self.__redis.expire(key, self.redis_cache_ttl) - def cache_put(self, value: dict) -> int: + def cache_lpush(self, key: str, value: Any) -> int: # Returns the length of the list after pushing - size = self.__redis.lpush(self.bot_id, json.dumps(value)) - return size + return self.__redis.lpush(key, value) - def cache_length(self) -> int: - return self.__redis.llen(self.bot_id) + def cache_llen(self, key: str) -> int: + return self.__redis.llen(key) - def cache_pop(self) -> dict: - data = self.__redis.rpop(self.bot_id) - if data is None: - return None - return json.loads(data) + def cache_rpop(self) -> Any: + return self.__redis.rpop(self.bot_id) def cache_flush(self): """ From 370f3cc77f351ece882c7739fbf00ea6094b4ebc Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 23 Oct 2025 16:25:13 +0200 Subject: [PATCH 20/23] Regenerate only modified events --- intelmq/bots/outputs/misp/output_feed.py | 33 ++++++++++++++---------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index 3b90643df4..ffdb9fdd20 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -92,6 +92,7 @@ def init(self): self.current_events = {} self.current_files = {} + self._dirty_events = set() self.misp_org = MISPOrganisation() self.misp_org.name = self.misp_org_name @@ -147,10 +148,10 @@ def init(self): self._tagging_objects[key].append(tag) # Ensure we do generate feed on reload / restart, so awaiting messages won't wait forever - if length := self.cache_llen(self.bot_id) and not getattr(self, "testing", False): - self.logger.debug( - "Found %s awaiting messages. Generating feed.", length - ) + if length := self.cache_llen(self.bot_id) and not getattr( + self, "testing", False + ): + self.logger.debug("Found %s awaiting messages. Generating feed.", length) self._generate_misp_feed() def _load_event(self, file_path: Path, key: str): @@ -204,7 +205,7 @@ def _generate_new_misp_event(self, key): tags.extend(self._tagging_objects[key]) self.current_events[key].tags = tags - info = "IntelMQ event {begin} - {end}" "".format( + info = "IntelMQ event {begin} - {end}".format( begin=self.min_time_current.isoformat(), end=self.max_time_current.isoformat(), ) @@ -223,6 +224,8 @@ def _generate_new_misp_event(self, key): f.write(str(self.current_files[key])) else: json.dump({k: str(v) for k, v in self.current_files.items()}, f) + + self._dirty_events.add(key) return self.current_events[key] def _add_message_to_misp_event(self, message: dict): @@ -240,10 +243,13 @@ def _add_message_to_misp_event(self, message: dict): else: event = self._generate_new_misp_event(key) + self._dirty_events.add(key) + if not self.flat_events: obj = event.add_object(name="intelmq_event") else: obj = event + # For caching and default mapping, the serialized version is the right format to work on. # However, for any custom mapping the Message object is more sufficient as it handles # subfields. @@ -273,8 +279,8 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic continue # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] or - value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] + or value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: @@ -303,11 +309,15 @@ def _generate_misp_feed(self, message: dict = None): cached_msg = self.cache_rpop(self.bot_id) for key, event in self.current_events.items(): + # Feed generation can be very resource-consuming process + if key not in self._dirty_events: + continue feed_output = event.to_feed(with_meta=False) with self.current_files[key].open("w") as f: json.dump(feed_output, f) feed_meta_generator(self.output_dir) + self._dirty_events.clear() @staticmethod def check(parameters): @@ -343,10 +353,7 @@ def check(parameters): sanity_event = Event({}) grouping_key = parameters.get("grouping_key") - if ( - grouping_key and not - sanity_event._Message__is_valid_key(grouping_key)[0] - ): + if grouping_key and not sanity_event._Message__is_valid_key(grouping_key)[0]: results.append( [ "error", @@ -410,8 +417,8 @@ def check(parameters): "error", ( "Parameter 'tagging' has to be a dictionary with keys as '__all__' " - "or possible 'grouping_key' values. Each dictionary value " + - tagging_error, + "or possible 'grouping_key' values. Each dictionary value " + + tagging_error, ), ] ) From d86b1e36c1116eba4e8a8ced9ed95acd7fbb1c82 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 28 Oct 2025 12:02:17 +0100 Subject: [PATCH 21/23] Update Changelog and fixes in docs --- CHANGELOG.md | 4 +++- docs/user/bots.md | 2 +- intelmq/bots/outputs/misp/output_feed.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c355148af9..bad1927380 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -224,10 +224,12 @@ This is short list of the most important known issues. The full list can be retr - `intelmq.bots.outputs.misp.output_feed`: - Handle failures if saved current event wasn't saved or is incorrect (PR by Kamil Mankowski). - Allow saving messages in bulks instead of refreshing the feed immediately (PR#2509 by Kamil Mankowski). + - Regenerate only modified events (PR#2509 by Kamil Mankowski). - Add `attribute_mapping` parameter to allow selecting a subset of event attributes as well as additional attribute parameters (PR#2509 by Kamil Mankowski). - - Add `event_separator` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). + - Add `grouping_key` parameter to allow keeping IntelMQ events in separated MISP Events based on a given field (PR#2509 by Kamil Mankowski). - Add `tagging` parameter to allow adding tags to MISP events (PR#2509 by Kamil Mankowski). - Add `additional_info` parameter to extend the default description of MISP Events (PR#2509 by Kamil Mankowski). + - Add `flat_events` parameter to allow skipping creating objects in MISP Events (PR#2509 by Kamil Mankowski). - `intelmq.bots.outputs.smtp_batch.output`: Documentation on multiple recipients added (PR#2501 by Edvard Rejthar). ### Documentation diff --git a/docs/user/bots.md b/docs/user/bots.md index 43557997ef..80548d3577 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4772,7 +4772,7 @@ hour", string. incoming messages until the given number of them. Use it if your bot proceeds a high number of messages and constant saving to the disk is a problem. Reloading or restarting bot as well as generating a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size. -To ensure saving on reload without any delay, you should also set `_sighup_delay` parameter. +To ensure saving on reload without any delay, you should also set `_sighup_delay` internal variable. **`attribute_mapping`** diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index ffdb9fdd20..d65b2270ce 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2019 Sebastian Wagner, 2024 CERT.at GmbH +# SPDX-FileCopyrightText: 2019 Sebastian Wagner, 2025 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later From 2ab3da3c87772f07a3f759c59ba020333a930775 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 28 Oct 2025 12:05:47 +0100 Subject: [PATCH 22/23] Spelling & pycodestyle --- docs/user/bots.md | 2 +- intelmq/bots/outputs/misp/output_feed.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/user/bots.md b/docs/user/bots.md index 80548d3577..572f7aefaa 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4866,7 +4866,7 @@ tagging: ** `flat_events` -(optional, bool): instead of creating an object for every incomming IntelMQ message, it will add +(optional, bool): instead of creating an object for every incoming IntelMQ message, it will add attributes directly to the MISP event. Useful if your want to export just a list of data, e.g. C2 domains, without having to group some attributes together. By default set to `False`. diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index d65b2270ce..f93e534925 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -279,8 +279,8 @@ def _extract_misp_attribute_kwargs(self, message: dict, definition: dict) -> dic continue # Check if the value is a harmonization key or a static value if isinstance(value, str) and ( - value in self.harmonization["event"] - or value.split(".", 1)[0] in self.harmonization["event"] + value in self.harmonization["event"] or + value.split(".", 1)[0] in self.harmonization["event"] ): result[parameter] = message.get(value) else: @@ -417,8 +417,8 @@ def check(parameters): "error", ( "Parameter 'tagging' has to be a dictionary with keys as '__all__' " - "or possible 'grouping_key' values. Each dictionary value " - + tagging_error, + "or possible 'grouping_key' values. Each dictionary value " + + tagging_error, ), ] ) From 388609ae473741a62b956a91e1c3bd1f22f53907 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 28 Oct 2025 14:01:52 +0100 Subject: [PATCH 23/23] Correct and extend test, automatically adjust reload delay, docs fixes --- docs/dev/bot-development.md | 13 ++-- docs/user/bots.md | 6 +- intelmq/bots/outputs/misp/output_feed.py | 15 +++- intelmq/lib/mixins/cache.py | 4 +- .../bots/outputs/misp/test_output_feed.py | 74 ++++++++++++++----- 5 files changed, 78 insertions(+), 34 deletions(-) diff --git a/docs/dev/bot-development.md b/docs/dev/bot-development.md index 85b18967af..9531ab96c1 100644 --- a/docs/dev/bot-development.md +++ b/docs/dev/bot-development.md @@ -211,19 +211,18 @@ and following methods to cache objects in a queue: - `cache_rpop` - `cache_llen`. -Caching key-value pairs and queue caching are two separated mechanisms. The first is designed - for arbitrary values, the second one is focused on temporary storing messages (but can handle other - data). You won't see caches from one in the another. For example, if adding a key-value pair using - `cache_set`, it does not change the value from `cache_llen`, and if adding an element using - `cache_lpush` you cannot use `check_exists` to look for it. +Caching key-value pairs and queue caching are two different mechanisms. The functions in the + first list are designed for arbitrary values, while the latter ones are primarily for temporarily + storing messages, but can also handle other data types. You won't see caches from one in the other. + For example, if adding a key-value pair using `cache_set`, it does not change the value from + `cache_llen`, and if adding an element using `cache_lpush` you cannot use `check_exists` to look for it. When using queue-based caching, you have to serialize object to a format accepted by Redis/Valkey as the underlying storage. For example, to store a message in a queue using bot ID as key, you can use code like: ```python -msmessage = self.receive_message().to_dict(jsondict_as_string=True) -self.cache_lpush(self.bot_id, json.dumps(message)) +self.cache_lpush(self.bot_id, self.receive_message().to_json(jsondict_as_string=True)) ``` and to retrieve a message from the cache: diff --git a/docs/user/bots.md b/docs/user/bots.md index 572f7aefaa..f2b4176216 100644 --- a/docs/user/bots.md +++ b/docs/user/bots.md @@ -4772,7 +4772,6 @@ hour", string. incoming messages until the given number of them. Use it if your bot proceeds a high number of messages and constant saving to the disk is a problem. Reloading or restarting bot as well as generating a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size. -To ensure saving on reload without any delay, you should also set `_sighup_delay` internal variable. **`attribute_mapping`** @@ -4868,7 +4867,8 @@ tagging: (optional, bool): instead of creating an object for every incoming IntelMQ message, it will add attributes directly to the MISP event. Useful if your want to export just a list of data, e.g. -C2 domains, without having to group some attributes together. By default set to `False`. +C2 domains, without having to group some attributes together. When using flat events, you +have to define custom mapping to ensure the correct attribute types. By default set to `False`. **Example** @@ -4896,8 +4896,6 @@ parameters: tagging: __all__: - name: tlp:amber - # ensure saving on reload - _sighup_delay: false ``` As a result, you will get MISP feed that creates one event per malware family every day. In the event, diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index f93e534925..7d76d566f8 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -75,8 +75,6 @@ class MISPFeedOutputBot(OutputBot, CacheMixin): # do not create objects in the MISP events, add data directly as event attributes flat_events: bool = False - # Delaying reloading would delay saving eventually long-awaiting messages - _sighup_delay = False _is_multithreadable: bool = False @staticmethod @@ -108,6 +106,10 @@ def init(self): minutes=parse_relative(self.interval_event) ) + if self.bulk_save_count: + # Delaying reloading would delay saving eventually long-awaiting messages + self._sighup_delay = False + self.min_time_current = datetime.datetime.max self.max_time_current = datetime.datetime.min @@ -404,6 +406,15 @@ def check(parameters): ] ) + flat_events = parameters.get("flat_events") + if flat_events and not attribute_mapping: + results.append( + [ + "error", + "When using flat_events, you have to provide attribute_mapping", + ] + ) + tagging = parameters.get("tagging") if tagging: tagging_error = ( diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index ed8c2b025c..aad6e165cf 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -74,8 +74,8 @@ def cache_lpush(self, key: str, value: Any) -> int: def cache_llen(self, key: str) -> int: return self.__redis.llen(key) - def cache_rpop(self) -> Any: - return self.__redis.rpop(self.bot_id) + def cache_rpop(self, key: str) -> Any: + return self.__redis.rpop(key) def cache_flush(self): """ diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 5fedb657f7..3770a73cbe 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -72,11 +72,11 @@ def test_additional_info(self): info: str = json.load(f).get("Event", {}).get("info", "") assert info.startswith("This is my custom info. IntelMQ event ") - def test_additional_info_with_separator(self): + def test_additional_info_with_grouping_key(self): self.run_bot( parameters={ - "additional_info": "Event related to {separator}.", - "event_separator": "malware.name", + "additional_info": "Event related to {key}.", + "grouping_key": "malware.name", } ) @@ -94,6 +94,9 @@ def test_accumulating_events(self): current_event = open(f"{self.directory.name}/.current").read() + # bot has to disable delayed reload when accumulating events + assert self.bot._sighup_delay is False + # The first event is always immediately dumped to the MISP feed # But the second wait until bulk saving size is achieved with open(current_event) as f: @@ -118,8 +121,9 @@ def test_accumulating_events(self): # Simulating leftovers in the queue when it's time to generate new event Path(f"{self.directory.name}/.current").unlink() - self.bot.cache_put( - MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True) + self.bot.cache_lpush( + self.bot_id, + json.dumps(MessageFactory.from_dict(EXAMPLE_EVENT).to_dict(jsondict_as_string=True)) ) self.run_bot(parameters={"bulk_save_count": 3}) @@ -197,7 +201,7 @@ def test_attribute_mapping_omitted_when_field_is_empty(self): ) assert source_ip["value"] == "152.166.119.2" - def test_event_separation(self): + def test_events_grouping(self): """Tests that based on the value of the given field, incoming messages are put in separated MISP events.""" self.input_message = [ @@ -205,7 +209,7 @@ def test_event_separation(self): {**EXAMPLE_EVENT, "malware.name": "another_malware"}, EXAMPLE_EVENT, ] - self.run_bot(iterations=3, parameters={"event_separator": "malware.name"}) + self.run_bot(iterations=3, parameters={"grouping_key": "malware.name"}) current_events = json.loads(open(f"{self.directory.name}/.current").read()) assert len(current_events) == 2 @@ -230,7 +234,7 @@ def test_event_separation(self): ) assert malware_name == "another_malware" - def test_event_separation_with_extra_and_bulk_save(self): + def test_events_grouping_with_extra_and_bulk_save(self): self.input_message = [ {**EXAMPLE_EVENT, "extra.some_key": "another_malware"}, {**EXAMPLE_EVENT, "extra.some_key": "first_malware"}, @@ -238,7 +242,7 @@ def test_event_separation_with_extra_and_bulk_save(self): ] self.run_bot( iterations=3, - parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + parameters={"grouping_key": "extra.some_key", "bulk_save_count": 3}, ) # Only the initial event is saved, the rest is cached @@ -250,7 +254,7 @@ def test_event_separation_with_extra_and_bulk_save(self): self.input_message = {**EXAMPLE_EVENT, "extra.some_key": "first_malware"} self.run_bot( - parameters={"event_separator": "extra.some_key", "bulk_save_count": 3}, + parameters={"grouping_key": "extra.some_key", "bulk_save_count": 3}, ) # Now everything is saved @@ -285,8 +289,8 @@ def test_tagging(self): tlp = next(t for t in tags if t["name"] == "tlp:unclear") assert tlp["colour"] == "#7e7eae" - def test_tagging_and_event_separation(self): - """When separating events, it is possible to add different MISP tags to specific MISP + def test_tagging_and_events_grouping(self): + """When grouping events, it is possible to add different MISP tags to specific MISP events.""" self.input_message = [ EXAMPLE_EVENT, @@ -295,7 +299,7 @@ def test_tagging_and_event_separation(self): self.run_bot( iterations=2, parameters={ - "event_separator": "malware.name", + "grouping_key": "malware.name", "tagging": { "__all__": [{"name": "source:intelmq"}], "salityp2p": [{"name": "family:salityp2p"}], @@ -319,6 +323,36 @@ def test_tagging_and_event_separation(self): assert next(t for t in tags if t["name"] == "source:intelmq") assert next(t for t in tags if t["name"] == "family:malware_2") + def test_flat_misp_events_structure(self): + """In flat mode attributes are set directly to on the event""" + self.input_message = [ + EXAMPLE_EVENT, + {**EXAMPLE_EVENT, "source.ip": "1.1.1.1"}, + ] + + self.run_bot( + iterations=2, + parameters={ + "flat_events": True, + "tagging": { + "__all__": [{"name": "source:intelmq"}], + }, + "attribute_mapping": { + "source.ip": {"type": "ip-dst", "category": "Network activity"} + } + }, + ) + + current_event = open(f"{self.directory.name}/.current").read() + + with open(current_event) as f: + attributes = json.load(f).get("Event", {}).get("Attribute", []) + + assert len(attributes) == 2 + assert next(a for a in attributes if a["value"] == "1.1.1.1") + assert next(a for a in attributes if a["value"] == "152.166.119.2") + + def test_parameter_check_correct(self): result = self.bot_reference.check( { @@ -329,7 +363,7 @@ def test_parameter_check_correct(self): "destination.ip": {"to_ids": False, "comment": "Possible FP"}, "malware.name": {"comment": "extra.non_ascii"}, }, - "event_separator": "extra.botnet", + "grouping_key": "extra.botnet", "bulk_save_count": 10, "tagging": { "__all__": [{"name": "source:feed", "colour": "#000000"}], @@ -342,7 +376,9 @@ def test_parameter_check_correct(self): def test_parameter_check_errors(self): cases = [ {"bulk_save_count": "not-a-number"}, - {"event_separator": "not-a-field"}, + # attribute_mapping is required for flat_events + {"flat_events": True}, + {"grouping_key": "not-a-field"}, {"attribute_mapping": "not-a-dict"}, {"attribute_mapping": {"not-a-field": {}}}, {"attribute_mapping": {"source.ip": "not-a-dict"}}, @@ -350,17 +386,17 @@ def test_parameter_check_errors(self): "tagging": {"not-all": []} }, # without event_separator, only __all__ is allowed {"tagging": {"__all__": [], "other": []}}, - {"event_separator": "malware.name", "tagging": ["not", "a", "dict"]}, + {"grouping_key": "malware.name", "tagging": ["not", "a", "dict"]}, { - "event_separator": "malware.name", + "grouping_key": "malware.name", "tagging": {"case": "must-be-list-of-dicts"}, }, { - "event_separator": "malware.name", + "grouping_key": "malware.name", "tagging": {"case": ["must-be-list-of-dicts"]}, }, { - "event_separator": "malware.name", + "grouping_key": "malware.name", "tagging": {"case": [{"must": "have a name"}]}, }, ]