Skip to content

Commit 4f893ad

Browse files
Synchronized Log Rotation and Invariant (#31)
- Refactoring work to add locks to file rotation and writing. - Fixes [#2](#2) - Plugin crashes on file rotation. - Fixes [#19](#19) - Deleted files remain in use by the system eventually filling up disk space.
1 parent 48f9d39 commit 4f893ad

File tree

10 files changed

+478
-140
lines changed

10 files changed

+478
-140
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22
Gemfile.lock
33
.bundle
44
vendor
5+
.idea
6+

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 3.2.1
2+
- Refactoring work to add locks to file rotation and writing.
3+
- Fixes [#2](https://github.com/logstash-plugins/logstash-output-google_cloud_storage/issues/2) - Plugin crashes on file rotation.
4+
- Fixes [#19](https://github.com/logstash-plugins/logstash-output-google_cloud_storage/issues/19) - Deleted files remain in use by the system eventually filling up disk space.
5+
16
## 3.2.0
27
- Change uploads to use a job pool for better performance
38
- Fixes [#22](https://github.com/logstash-plugins/logstash-output-google_cloud_storage/issues/22) - Refactor Job Queue Architecture
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# encoding: utf-8
2+
require 'logstash/outputs/gcs/temp_log_file'
3+
require 'concurrent'
4+
5+
module LogStash
6+
module Outputs
7+
module Gcs
8+
class LogRotate
9+
def initialize(path_factory, max_file_size_bytes, gzip, flush_interval_secs)
10+
@path_factory = path_factory
11+
@max_file_size_bytes = max_file_size_bytes
12+
@gzip = gzip
13+
@flush_interval_secs = flush_interval_secs
14+
15+
@lock = Concurrent::ReentrantReadWriteLock.new
16+
@rotate_callback = nil
17+
18+
rotate_log!
19+
end
20+
21+
# writeln writes a message and carriage-return character to the open
22+
# log file, rotating and syncing it if necessary.
23+
#
24+
# nil messages do not get written, but may cause the log to rotate
25+
def writeln(message=nil)
26+
@lock.with_write_lock do
27+
rotate_log! if should_rotate?
28+
29+
@temp_file.write(message, "\n") unless message.nil?
30+
31+
@temp_file.fsync if @temp_file.time_since_sync >= @flush_interval_secs
32+
end
33+
end
34+
35+
# rotate_log! closes the current log (if it exists), notifies the
36+
# handler, rolls the path over and opens a new log.
37+
#
38+
# Invariant: the old log will ALWAYS be closed and a new one will
39+
# ALWAYS be open at the completion of this function.
40+
def rotate_log!
41+
@lock.with_write_lock do
42+
unless @temp_file.nil?
43+
@temp_file.close!
44+
@rotate_callback.call(@temp_file.path) unless @rotate_callback.nil?
45+
end
46+
47+
@path_factory.rotate_path!
48+
49+
path = @path_factory.current_path
50+
@temp_file = LogStash::Outputs::Gcs::LogFileFactory.create(path, @gzip)
51+
end
52+
end
53+
54+
# on_rotate sets a handler to be called when the log gets rotated.
55+
# The handler receives the path to the rotated out log as a string.
56+
def on_rotate(&block)
57+
@lock.with_write_lock do
58+
@rotate_callback = block
59+
end
60+
end
61+
62+
private
63+
64+
def should_rotate?
65+
@lock.with_read_lock do
66+
path_changed = @path_factory.should_rotate?
67+
rotate_on_size = @max_file_size_bytes > 0
68+
too_big = @temp_file.size >= @max_file_size_bytes
69+
70+
path_changed || (rotate_on_size && too_big)
71+
end
72+
end
73+
end
74+
end
75+
end
76+
end

lib/logstash/outputs/gcs/path_factory.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ def initialize(directory, prefix, include_host, date_pattern, include_part, incl
3030

3131
# Rotates the path to the next one in sequence. If the path has a part number
3232
# and the base path (date/hostname) haven't changed the part number is incremented.
33+
# Returns the path that was rotated out
3334
def rotate_path!
35+
last_path = current_path
36+
3437
@path_lock.synchronize {
3538
@part_number = (next_base == current_base) ? @part_number + 1 : 0
3639
@current = template_variables
3740
}
3841

39-
current_path
42+
last_path
4043
end
4144

4245
# Checks if the file is ready to rotate because the timestamp changed.
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# encoding: utf-8
2+
require 'zlib'
3+
require 'concurrent'
4+
require 'time'
5+
6+
module LogStash
7+
module Outputs
8+
module Gcs
9+
# LogFileFactory creates a LogFile according to user specification
10+
# optionally gzipping it and creating mutexes around modification
11+
# points.
12+
class LogFileFactory
13+
def self.create(path, gzip, synchronize=true)
14+
lf = LogStash::Outputs::Gcs::PlainLogFile.new(path)
15+
lf = LogStash::Outputs::Gcs::GzipLogFile.new(lf) if gzip
16+
lf = LogStash::Outputs::Gcs::SynchronizedLogFile.new(lf) if synchronize
17+
18+
lf
19+
end
20+
end
21+
22+
# PlainLogFile writes events to a plain text file.
23+
class PlainLogFile
24+
attr_reader :path, :fd
25+
26+
def initialize(path)
27+
@path = path
28+
@fd = ::File.new(path, 'a+')
29+
@last_sync = Time.now
30+
end
31+
32+
def write(*contents)
33+
contents.each { |c| @fd.write(c) }
34+
end
35+
36+
def fsync
37+
@fd.fsync
38+
@last_sync = Time.now
39+
end
40+
41+
def close!
42+
@fd.fsync
43+
@fd.close
44+
end
45+
46+
def size
47+
::File.stat(@path).size
48+
end
49+
50+
def time_since_sync
51+
Time.now - @last_sync
52+
end
53+
end
54+
55+
# GzipLogFile wraps another log file and writes events through it.
56+
class GzipLogFile
57+
attr_reader :fd
58+
59+
def initialize(child)
60+
@child = child
61+
@fd = Zlib::GzipWriter.new(child.fd)
62+
end
63+
64+
def write(*contents)
65+
contents.each { |c| @fd.write(c) }
66+
end
67+
68+
def fsync
69+
@fd.flush
70+
@child.fsync
71+
end
72+
73+
def close!
74+
fsync
75+
# The Gzip writer closes the underlying IO after
76+
# appending the Gzip footer.
77+
@fd.close
78+
end
79+
80+
def method_missing(method_name, *args, &block)
81+
@child.send(method_name, *args, &block)
82+
end
83+
end
84+
85+
# SynchronizedLogFile wraps another log file and uses reentrant locks
86+
# around its methods to prevent concurrent modification.
87+
class SynchronizedLogFile
88+
def initialize(child)
89+
@child = child
90+
@lock = Concurrent::ReentrantReadWriteLock.new
91+
end
92+
93+
def time_since_sync
94+
@lock.with_read_lock { @child.time_since_sync }
95+
end
96+
97+
def path
98+
@lock.with_read_lock { @child.path }
99+
end
100+
101+
def method_missing(method_name, *args, &block)
102+
# unless otherwise specified, get a write lock
103+
@lock.with_write_lock do
104+
@child.send(method_name, *args, &block)
105+
end
106+
end
107+
end
108+
end
109+
end
110+
end

0 commit comments

Comments
 (0)