1 module vibe_elastic_logger.logger;
2 
3 import std.algorithm.iteration : each, map;
4 import std.stdio;
5 import vibe.core.log : Logger, LogLine, LogLevel;
6 import std.array : appender;
7 import core.time : Duration;
8 import std.datetime : Clock, SysTime;
9 
10 private alias MessageBuffer = typeof(appender!(char[])());
11 
12 // Bypass to!string for log levels because we look them up frequently
13 private immutable string[LogLevel.max + 1] levelToString;
14 
15 private string escapeChars(char ch) pure nothrow @safe
16 {
17     switch (ch)
18     {
19     case '"':
20         return `\"`;
21     default:
22         return () @trusted { return cast(string)[ch]; }();
23     }
24 }
25 
26 private void putEscapedString(ref typeof(appender!string()) buffer, const(char)[] str) @safe
27 {
28     import std.utf : byCodeUnit;
29 
30     str.byCodeUnit().map!(ch => escapeChars(ch)).each!(s => buffer.put(s));
31 }
32 
33 shared static this()
34 {
35     levelToString[LogLevel.trace] = "trace";
36     levelToString[LogLevel.debugV] = "debugVerbose";
37     levelToString[LogLevel.debug_] = "debug";
38     levelToString[LogLevel.diagnostic] = "diagnostic";
39     levelToString[LogLevel.info] = "info";
40     levelToString[LogLevel.warn] = "warn";
41     levelToString[LogLevel.error] = "error";
42     levelToString[LogLevel.critical] = "critical";
43     levelToString[LogLevel.fatal] = "fatal";
44     levelToString[LogLevel.none] = "none";
45 }
46 
47 /**
48  * An IndexCreator is a delegate that calculates the ElasticSearch index that
49  * the logger should log to.
50  */
51 alias IndexCreator = string delegate() @safe;
52 
53 /**
54  * ElasticSearch server connection information.
55  */
56 struct ElasticInfo
57 {
58     /// Host name for the ElasticSearch server.
59     string hostName;
60     /// Port for connecting to the ElasticSearch server.
61     ushort portNumber = 9200;
62     /// The log message's "type" within an index.
63     string typeName;
64 }
65 
66 /**
67  * ElasticSearch logger implementation.
68  *
69  * Messages are queued and written to the server when any of the following
70  * conditions are met:
71  * $(UL
72  * $(LI The message queue is full.)
73  * $(LI The message to be logged has a LogLevel of `critical` or higher.)
74  * $(LI A message is queued to be logged, and more time than the `maxLogInterval`
75  *     constructor argument has passed.)
76  * $(LI The logger is asked to log the message "Main thread exiting".)
77  * )
78  *
79  * The name of the index that the logger writes messages to is determined by a
80  * function (the `indexCreator` constructor argument). This usually returns an
81  * index name based off of the current time.
82  */
83 class ElasticLogger : Logger
84 {
85     /**
86      * Params:
87      *     elasticInfo = Connection information to use when logging messages.
88      *     indexCreator = A function that will return the name of the index to
89      *         log to.
90      *     maxLogInterval = If a log message is received and more than this
91      *         amount of time has passed since the last flush, trigger a flush
92      *         regardless of the remaining space in the buffer.
93      *     messageQueueSize = Number of log messages to queue between writes to
94      *         ElasticSearch.
95      */
96     this(const ElasticInfo elasticInfo, const IndexCreator indexCreator,
97             const Duration maxLogInterval, const size_t messageQueueSize)
98     {
99         this.indexCreator = indexCreator;
100         this.maxLogInterval = maxLogInterval;
101         this.entries = new LogEntry[](messageQueueSize);
102         this.multilineLogger = true;
103         this.minLevel = LogLevel.trace;
104         this.lastFlushTime = Clock.currTime();
105         this.elasticInfo = elasticInfo;
106         this.logQueueIndex = 0;
107         this.flushing = false;
108     }
109 
110     override void beginLine(ref LogLine line) @safe
111     {
112         if (flushing)
113             return;
114 
115         LogEntry* l = &entries[logQueueIndex];
116         l.module_ = line.mod;
117         l.function_ = line.func;
118         l.file = line.file;
119         l.line = line.line;
120         l.lLevel = line.level;
121         l.level = levelToString[line.level];
122         l.fiberID = line.fiberID;
123         l.time = line.time.toISOExtString();
124         l.buffer.clear();
125     }
126 
127     override void endLine() @safe
128     {
129         if (flushing)
130             return;
131 
132         immutable r = logQueueIndex;
133         logQueueIndex++;
134         if (logQueueIndex == entries.length || entries[r].lLevel >= LogLevel.critical
135                 || (entries[r].lLevel == LogLevel.diagnostic
136                     && entries[r].buffer.data == "Main thread exiting")
137                 || lastFlushTime + maxLogInterval < Clock.currTime())
138             flush();
139     }
140 
141     override void put(scope const(char)[] text) @safe
142     {
143         if (flushing)
144             return;
145 
146         entries[logQueueIndex].buffer.put(text);
147     }
148 
149 private:
150 
151     static struct LogEntry
152     {
153         string module_;
154         string function_;
155         string file;
156         int line;
157         LogLevel lLevel;
158         string level;
159         uint fiberID;
160         string time;
161         MessageBuffer buffer;
162     }
163 
164     void flush() @safe
165     {
166         import std.conv : to;
167         import std.format : format;
168         import vibe.http.client : requestHTTP, HTTPClientRequest, HTTPClientResponse, HTTPMethod;
169 
170         flushing = true;
171         scope(exit) flushing = false;
172 
173         immutable elasticIndex = indexCreator();
174         immutable url = format!"http://%s:%d/%s/%s/_bulk"(elasticInfo.hostName,
175                 elasticInfo.portNumber, elasticIndex, elasticInfo.typeName);
176         version(debug_elastic_logger)
177         {
178             () @trusted { stderr.writeln("\033[01;33m", url, "\033[0m"); }();
179         }
180         auto requestBody = appender!string();
181         foreach (ref entry; entries[0 .. logQueueIndex])
182         {
183             requestBody.put(`{"index": {}}`);
184             requestBody.put("\n");
185             requestBody.put(`{"message":"`);
186             requestBody.putEscapedString(entry.buffer.data);
187             requestBody.put(`","module":"`);
188             requestBody.put(entry.module_);
189             requestBody.put(`","function":"`);
190             requestBody.putEscapedString(entry.function_);
191             requestBody.put(`","file":"`);
192             requestBody.putEscapedString(entry.file);
193             requestBody.put(`","line":"`);
194             requestBody.put(entry.line.to!string());
195             requestBody.put(`","level":"`);
196             requestBody.put(entry.level);
197             requestBody.put(`","fiberID":"`);
198             requestBody.put(entry.fiberID.to!string());
199             requestBody.put(`","time":"`);
200             requestBody.put(entry.time);
201             requestBody.put(`"}`);
202             requestBody.put("\n");
203         }
204 
205         version(debug_elastic_logger)
206         {
207             () @trusted { stderr.writeln("\033[01;33m", requestBody.data, "\033[0m"); }();
208         }
209 
210         logQueueIndex = 0;
211         this.lastFlushTime = Clock.currTime();
212         requestHTTP(url,
213             (scope request) {
214                 request.method = HTTPMethod.POST;
215                 request.writeBody(cast(ubyte[]) requestBody.data, "application/x-ndjson");
216             },
217             (scope response) {
218                 version(debug_elastic_logger)
219                 {
220                     import vibe.stream.operations : readAllUTF8;
221                     () @trusted { stderr.writeln("\033[01;33mStatus code: ", response.statusCode, "\033[0m"); }();
222                     () @trusted { stderr.writeln("\033[01;33mResponse message: ", response.bodyReader.readAllUTF8(), "\033[0m"); }();
223                 }
224                 response.dropBody();
225             });
226     }
227 
228 
229     // See constructor docs
230     const IndexCreator indexCreator;
231     // See constructor docs
232     const Duration maxLogInterval;
233     // See constructor docs
234     const ElasticInfo elasticInfo;
235     // Time that the last flush happened
236     SysTime lastFlushTime;
237     // Message queue
238     LogEntry[] entries;
239     // Index into the `entries` buffer.
240     size_t logQueueIndex;
241     // True if the logger is currently flushing log info to the server. Prevents
242     // the HTTP request code from causing an infinite recursion.
243     bool flushing;
244 }