Skip to content

Commit 62fa85d

Browse files
authored
Merge pull request #28 from DynamiteAI/feature/update-logstash-yaml-config-parser
Feature/update logstash yaml config parser
2 parents d271355 + 3816fff commit 62fa85d

File tree

2 files changed

+107
-136
lines changed

2 files changed

+107
-136
lines changed

dynamite_nsm/services/logstash.py

Lines changed: 106 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,14 @@
77
import tarfile
88
import traceback
99
import subprocess
10+
from yaml import load, dump
1011
from multiprocessing import Process
1112

13+
try:
14+
from yaml import CLoader as Loader, CDumper as Dumper
15+
except ImportError:
16+
from yaml import Loader, Dumper
17+
1218
from dynamite_nsm import const
1319
from dynamite_nsm import utilities
1420
from dynamite_nsm.services.helpers import synesis
@@ -24,48 +30,73 @@ class LogstashConfigurator:
2430
"""
2531
Wrapper for configuring logstash.yml and jvm.options
2632
"""
33+
34+
tokens = {
35+
'node_name': ('node.name',),
36+
'path_data': ('path.data',),
37+
'path_logs': ('path.logs',),
38+
'pipeline_batch_size': ('pipeline.batch.size',),
39+
'pipeline_batch_delay': ('pipeline.batch.delay',)
40+
}
41+
2742
def __init__(self, configuration_directory):
2843
"""
2944
:param configuration_directory: Path to the configuration directory (E.G /etc/dynamite/logstash/)
3045
"""
3146
self.configuration_directory = configuration_directory
32-
self.ls_config_options = self._parse_logstashyaml()
33-
self.jvm_config_options = self._parse_jvm_options()
3447
self.java_home = None
3548
self.ls_home = None
3649
self.ls_path_conf = None
50+
self.java_initial_memory = None
51+
self.java_maximum_memory = None
52+
53+
self.node_name = None
54+
self.path_data = None
55+
self.path_logs = None
56+
self.pipeline_batch_size = None
57+
self.pipeline_batch_delay = None
58+
3759
self._parse_environment_file()
60+
self._parse_jvm_options()
61+
self._parse_logstashyaml()
3862

3963
def _parse_logstashyaml(self):
40-
"""
41-
Parse logstash.yaml, return a object representing the config
42-
:return: A dictionary of config options and their values
43-
"""
44-
ls_config_options = {}
45-
config_path = os.path.join(self.configuration_directory, 'logstash.yml')
46-
if not os.path.exists(config_path):
47-
return ls_config_options
48-
for line in open(config_path).readlines():
49-
if not line.startswith('#') and ':' in line:
50-
k, v = line.strip().split(':')
51-
ls_config_options[k] = str(v).strip().replace('"', '').replace("'", '')
52-
return ls_config_options
64+
65+
def set_instance_var_from_token(variable_name, data):
66+
"""
67+
:param variable_name: The name of the instance variable to update
68+
:param data: The parsed yaml object
69+
:return: True if successfully located
70+
"""
71+
if variable_name not in self.tokens.keys():
72+
return False
73+
key_path = self.tokens[variable_name]
74+
value = data
75+
try:
76+
for k in key_path:
77+
value = value[k]
78+
setattr(self, var_name, value)
79+
except KeyError:
80+
pass
81+
return True
82+
83+
with open(os.path.join(self.configuration_directory, 'logstash.yml'), 'r') as configyaml:
84+
self.config_data = load(configyaml, Loader=Loader)
85+
86+
for var_name in vars(self).keys():
87+
set_instance_var_from_token(variable_name=var_name, data=self.config_data)
5388

5489
def _parse_jvm_options(self):
5590
"""
5691
Parses the initial and max heap allocation from jvm.options configuration
5792
:return: A dictionary containing the initial_memory and maximum_memory allocated to JVM heap
5893
"""
59-
jvm_options = {}
6094
config_path = os.path.join(self.configuration_directory, 'jvm.options')
61-
if not os.path.exists(config_path):
62-
return jvm_options
6395
for line in open(config_path).readlines():
6496
if not line.startswith('#') and '-Xms' in line:
65-
jvm_options['initial_memory'] = line.replace('-Xms', '').strip()
97+
self.java_initial_memory = line.replace('-Xms', '').strip()
6698
elif not line.startswith('#') and '-Xmx' in line:
67-
jvm_options['maximum_memory'] = line.replace('-Xmx', '').strip()
68-
return jvm_options
99+
self.java_maximum_memory = line.replace('-Xmx', '').strip()
69100

70101
def _parse_environment_file(self):
71102
"""
@@ -80,21 +111,6 @@ def _parse_environment_file(self):
80111
elif line.startswith('LS_HOME'):
81112
self.ls_home = line.split('=')[1].strip()
82113

83-
def _overwrite_jvm_options(self):
84-
"""
85-
Overwrites the JVM initial/max memory if settings were updated
86-
"""
87-
new_output = ''
88-
for line in open(os.path.join(self.configuration_directory, 'jvm.options')).readlines():
89-
if not line.startswith('#') and '-Xms' in line:
90-
new_output += '-Xms' + self.jvm_config_options['initial_memory']
91-
elif not line.startswith('#') and '-Xmx' in line:
92-
new_output += '-Xmx' + self.jvm_config_options['maximum_memory']
93-
else:
94-
new_output += line
95-
new_output += '\n'
96-
open(os.path.join(self.configuration_directory, 'jvm.options'), 'w').write(new_output)
97-
98114
@staticmethod
99115
def get_elasticsearch_password():
100116
"""
@@ -103,49 +119,6 @@ def get_elasticsearch_password():
103119
elastiflow_config = elastiflow.ElastiflowConfigurator()
104120
return elastiflow_config.es_passwd
105121

106-
def get_log_path(self):
107-
"""
108-
:return: The path to Logstash logs on filesystem
109-
"""
110-
return self.ls_config_options.get('path.logs')
111-
112-
def get_node_name(self):
113-
"""
114-
:return: The name of the LogStash collector node
115-
"""
116-
return self.ls_config_options.get('node.name')
117-
118-
def get_data_path(self):
119-
"""
120-
:return: The directory where data (persistent queues) are being stored
121-
"""
122-
return self.ls_config_options.get('path.data')
123-
124-
def get_pipeline_batch_size(self):
125-
"""
126-
:return: The number of events to retrieve from inputs before sending to filters+workers
127-
"""
128-
return self.ls_config_options.get('pipeline.batch.size')
129-
130-
def get_pipeline_batch_delay(self):
131-
"""
132-
:return: The number of milliseconds while polling for the next event before dispatching an
133-
undersized batch to filters+outputs
134-
"""
135-
return self.ls_config_options.get('pipeline.batch.delay')
136-
137-
def get_jvm_initial_memory(self):
138-
"""
139-
:return: The initial amount of memory the JVM heap allocates
140-
"""
141-
return self.jvm_config_options.get('initial_memory')
142-
143-
def get_jvm_maximum_memory(self):
144-
"""
145-
:return: The maximum amount of memory the JVM heap allocates
146-
"""
147-
return self.jvm_config_options.get('maximum_memory')
148-
149122
@staticmethod
150123
def set_elasticsearch_password(password):
151124
"""
@@ -158,66 +131,64 @@ def set_elasticsearch_password(password):
158131
elastiflow_config.write_environment_variables()
159132
synesis_config.write_environment_variables()
160133

161-
def set_log_path(self, path):
162-
"""
163-
:param path: The path to Logstash logs on the filesystem
134+
def write_jvm_config(self):
164135
"""
165-
self.ls_config_options['path.logs'] = path
166-
167-
def set_node_name(self, name):
168-
"""
169-
:param name: The name of the Logstash collector node
170-
"""
171-
self.ls_config_options['node.name'] = name
172-
173-
def set_data_path(self, path):
174-
"""
175-
:param path: The path to the Logstash collector node
136+
Overwrites the JVM initial/max memory if settings were updated
176137
"""
177-
self.ls_config_options['path.data'] = path
138+
new_output = ''
139+
for line in open(os.path.join(self.configuration_directory, 'jvm.options')).readlines():
140+
if not line.startswith('#') and '-Xms' in line:
141+
new_output += '-Xms' + str(self.java_initial_memory) + 'g'
142+
elif not line.startswith('#') and '-Xmx' in line:
143+
new_output += '-Xmx' + str(self.java_maximum_memory) + 'g'
144+
else:
145+
new_output += line
146+
new_output += '\n'
178147

179-
def set_pipeline_batch_size(self, event_count):
180-
"""
181-
:param event_count: How many events to retrieve from inputs before sending to filters+workers
182-
"""
183-
self.ls_config_options['pipeline.batch.size'] = event_count
148+
backup_configurations = os.path.join(self.configuration_directory, 'config_backups/')
149+
java_config_backup = os.path.join(backup_configurations, 'java.options.backup.{}'.format(
150+
int(time.time())
151+
))
152+
shutil.copy(os.path.join(self.configuration_directory, 'jvm.options'), java_config_backup)
153+
open(os.path.join(self.configuration_directory, 'jvm.options'), 'w').write(new_output)
184154

185-
def set_pipeline_batch_delay(self, delay_millisecs):
186-
"""
187-
:param delay_millisecs: How long to wait in milliseconds while polling for the next event before dispatching an
188-
undersized batch to filters+outputs
189-
"""
190-
self.ls_config_options['pipeline.batch.delay'] = delay_millisecs
155+
def write_logstash_config(self):
156+
157+
def update_dict_from_path(path, value):
158+
"""
159+
:param path: A tuple representing each level of a nested path in the yaml document
160+
('vars', 'address-groups', 'HOME_NET') = /vars/address-groups/HOME_NET
161+
:param value: The new value
162+
:return: None
163+
"""
164+
partial_config_data = self.config_data
165+
for i in range(0, len(path) - 1):
166+
try:
167+
partial_config_data = partial_config_data[path[i]]
168+
except KeyError:
169+
pass
170+
partial_config_data.update({path[-1]: value})
191171

192-
def set_jvm_initial_memory(self, gigs):
193-
"""
194-
:param gigs: The amount of initial memory (In Gigabytes) for the JVM to allocate to the heap
195-
"""
196-
self.jvm_config_options['initial_memory'] = str(int(gigs)) + 'g'
172+
timestamp = int(time.time())
173+
backup_configurations = os.path.join(self.configuration_directory, 'config_backups/')
174+
logstash_config_backup = os.path.join(backup_configurations, 'logstash.yml.backup.{}'.format(timestamp))
175+
subprocess.call('mkdir -p {}'.format(backup_configurations), shell=True)
176+
shutil.copy(os.path.join(self.configuration_directory, 'logstash.yml'), logstash_config_backup)
197177

198-
def set_jvm_maximum_memory(self, gigs):
199-
"""
200-
:param gigs: The amount of maximum memory (In Gigabytes) for the JVM to allocate to the heap
201-
"""
202-
self.jvm_config_options['maximum_memory'] = str(int(gigs)) + 'g'
178+
for k, v in vars(self).items():
179+
if k not in self.tokens:
180+
continue
181+
token_path = self.tokens[k]
182+
update_dict_from_path(token_path, v)
183+
with open(os.path.join(self.configuration_directory, 'logstash.yml'), 'w') as configyaml:
184+
dump(self.config_data, configyaml, default_flow_style=False)
203185

204186
def write_configs(self):
205187
"""
206-
Write (and backs-up) logstash.yml and jvm.option configurations
188+
Writes both the JVM and logstash.yaml configurations, backs up originals
207189
"""
208-
timestamp = int(time.time())
209-
backup_configurations = os.path.join(self.configuration_directory, 'config_backups/')
210-
es_config_backup = os.path.join(backup_configurations, 'logstash.yml.backup.{}'.format(timestamp))
211-
java_config_backup = os.path.join(backup_configurations, 'java.options.backup.{}'.format(
212-
timestamp
213-
))
214-
subprocess.call('mkdir -p {}'.format(backup_configurations), shell=True)
215-
shutil.move(os.path.join(self.configuration_directory, 'logstash.yml'), es_config_backup)
216-
shutil.copy(os.path.join(self.configuration_directory, 'jvm.options'), java_config_backup)
217-
with open(os.path.join(self.configuration_directory, 'logstash.yml'), 'a') as logstash_search_config_obj:
218-
for k, v in self.ls_config_options.items():
219-
logstash_search_config_obj.write('{}: {}\n'.format(k, v))
220-
self._overwrite_jvm_options()
190+
self.write_logstash_config()
191+
self.write_jvm_config()
221192

222193

223194
class LogstashInstaller:
@@ -354,7 +325,7 @@ def _install_logstash_plugins(self):
354325
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
355326
subprocess.call('{}/bin/logstash-plugin install logstash-codec-netflow'.format(self.install_directory),
356327
shell=True, env=utilities.get_environment_file_dict(),
357-
stdout = subprocess.PIPE, stderr = subprocess.PIPE)
328+
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
358329
subprocess.call('{}/bin/logstash-plugin install logstash-filter-dns'.format(self.install_directory),
359330
shell=True, env=utilities.get_environment_file_dict(),
360331
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -377,8 +348,8 @@ def _setup_default_logstash_configs(self):
377348
if self.stdout:
378349
sys.stdout.write('[+] Setting up JVM default heap settings [4GB]\n')
379350
sys.stdout.flush()
380-
ls_config.set_jvm_initial_memory(4)
381-
ls_config.set_jvm_maximum_memory(4)
351+
ls_config.java_initial_memory = 4
352+
ls_config.java_maximum_memory = 4
382353
ls_config.write_configs()
383354

384355
def _setup_elastiflow(self):
@@ -743,7 +714,7 @@ def status(self):
743714
744715
:return: A dictionary containing the run status and relevant configuration options
745716
"""
746-
log_path = os.path.join(self.config.get_log_path(), 'logstash-plain.log')
717+
log_path = os.path.join(self.config.path_logs, 'logstash-plain.log')
747718

748719
return {
749720
'PID': self.pid,
@@ -821,7 +792,7 @@ def install_logstash(host='0.0.0.0',
821792
traceback.print_exc(file=sys.stderr)
822793
return False
823794
if stdout:
824-
sys.stdout.write('[+] *** LogStash + ElastiFlow (w/ Zeek Support) installed successfully. ***\n\n')
795+
sys.stdout.write('[+] *** LogStash installed event/alert pipelines. ***\n\n')
825796
sys.stdout.write('[+] Next, Start your collector: \'dynamite start logstash\'.\n')
826797
sys.stdout.flush()
827798
return LogstashProfiler(stderr=False).is_installed
@@ -856,7 +827,7 @@ def uninstall_logstash(stdout=False, prompt_user=True):
856827
try:
857828
shutil.rmtree(ls_config.ls_path_conf)
858829
shutil.rmtree(ls_config.ls_home)
859-
shutil.rmtree(ls_config.get_log_path())
830+
shutil.rmtree(ls_config.path_logs)
860831
shutil.rmtree('/tmp/dynamite/install_cache/', ignore_errors=True)
861832
env_lines = ''
862833
for line in open('/etc/dynamite/environment').readlines():

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
setup(
88
name='dynamite-nsm',
9-
version='0.6.3',
9+
version='0.6.4',
1010
packages=find_packages(),
1111
scripts=['scripts/dynamite'],
1212
url='http://dynamite.ai',

0 commit comments

Comments
 (0)