1 module vibe_elastic_logger.logger; 2 3 import std.algorithm.iteration : each, map; 4 import std.stdio; 5 import vibe.core.log : Logger, LogLine, LogLevel; 6 import std.array : appender; 7 import core.time : Duration; 8 import std.datetime : Clock, SysTime; 9 10 private alias MessageBuffer = typeof(appender!(char[])()); 11 12 // Bypass to!string for log levels because we look them up frequently 13 private immutable string[LogLevel.max + 1] levelToString; 14 15 private string escapeChars(char ch) pure nothrow @safe 16 { 17 switch (ch) 18 { 19 case '"': 20 return `\"`; 21 default: 22 return () @trusted { return cast(string)[ch]; }(); 23 } 24 } 25 26 private void putEscapedString(ref typeof(appender!string()) buffer, const(char)[] str) @safe 27 { 28 import std.utf : byCodeUnit; 29 30 str.byCodeUnit().map!(ch => escapeChars(ch)).each!(s => buffer.put(s)); 31 } 32 33 shared static this() 34 { 35 levelToString[LogLevel.trace] = "trace"; 36 levelToString[LogLevel.debugV] = "debugVerbose"; 37 levelToString[LogLevel.debug_] = "debug"; 38 levelToString[LogLevel.diagnostic] = "diagnostic"; 39 levelToString[LogLevel.info] = "info"; 40 levelToString[LogLevel.warn] = "warn"; 41 levelToString[LogLevel.error] = "error"; 42 levelToString[LogLevel.critical] = "critical"; 43 levelToString[LogLevel.fatal] = "fatal"; 44 levelToString[LogLevel.none] = "none"; 45 } 46 47 /** 48 * An IndexCreator is a delegate that calculates the ElasticSearch index that 49 * the logger should log to. 50 */ 51 alias IndexCreator = string delegate() @safe; 52 53 /** 54 * ElasticSearch server connection information. 55 */ 56 struct ElasticInfo 57 { 58 /// Host name for the ElasticSearch server. 59 string hostName; 60 /// Port for connecting to the ElasticSearch server. 61 ushort portNumber = 9200; 62 /// The log message's "type" within an index. 63 string typeName; 64 } 65 66 /** 67 * ElasticSearch logger implementation. 68 * 69 * Messages are queued and written to the server when any of the following 70 * conditions are met: 71 * $(UL 72 * $(LI The message queue is full.) 73 * $(LI The message to be logged has a LogLevel of `critical` or higher.) 74 * $(LI A message is queued to be logged, and more time than the `maxLogInterval` 75 * constructor argument has passed.) 76 * $(LI The logger is asked to log the message "Main thread exiting".) 77 * ) 78 * 79 * The name of the index that the logger writes messages to is determined by a 80 * function (the `indexCreator` constructor argument). This usually returns an 81 * index name based off of the current time. 82 */ 83 class ElasticLogger : Logger 84 { 85 /** 86 * Params: 87 * elasticInfo = Connection information to use when logging messages. 88 * indexCreator = A function that will return the name of the index to 89 * log to. 90 * maxLogInterval = If a log message is received and more than this 91 * amount of time has passed since the last flush, trigger a flush 92 * regardless of the remaining space in the buffer. 93 * messageQueueSize = Number of log messages to queue between writes to 94 * ElasticSearch. 95 */ 96 this(const ElasticInfo elasticInfo, const IndexCreator indexCreator, 97 const Duration maxLogInterval, const size_t messageQueueSize) 98 { 99 this.indexCreator = indexCreator; 100 this.maxLogInterval = maxLogInterval; 101 this.entries = new LogEntry[](messageQueueSize); 102 this.multilineLogger = true; 103 this.minLevel = LogLevel.trace; 104 this.lastFlushTime = Clock.currTime(); 105 this.elasticInfo = elasticInfo; 106 this.logQueueIndex = 0; 107 this.flushing = false; 108 } 109 110 override void beginLine(ref LogLine line) @safe 111 { 112 if (flushing) 113 return; 114 115 LogEntry* l = &entries[logQueueIndex]; 116 l.module_ = line.mod; 117 l.function_ = line.func; 118 l.file = line.file; 119 l.line = line.line; 120 l.lLevel = line.level; 121 l.level = levelToString[line.level]; 122 l.fiberID = line.fiberID; 123 l.time = line.time.toISOExtString(); 124 l.buffer.clear(); 125 } 126 127 override void endLine() @safe 128 { 129 if (flushing) 130 return; 131 132 immutable r = logQueueIndex; 133 logQueueIndex++; 134 if (logQueueIndex == entries.length || entries[r].lLevel >= LogLevel.critical 135 || (entries[r].lLevel == LogLevel.diagnostic 136 && entries[r].buffer.data == "Main thread exiting") 137 || lastFlushTime + maxLogInterval < Clock.currTime()) 138 flush(); 139 } 140 141 override void put(scope const(char)[] text) @safe 142 { 143 if (flushing) 144 return; 145 146 entries[logQueueIndex].buffer.put(text); 147 } 148 149 private: 150 151 static struct LogEntry 152 { 153 string module_; 154 string function_; 155 string file; 156 int line; 157 LogLevel lLevel; 158 string level; 159 uint fiberID; 160 string time; 161 MessageBuffer buffer; 162 } 163 164 void flush() @safe 165 { 166 import std.conv : to; 167 import std.format : format; 168 import vibe.http.client : requestHTTP, HTTPClientRequest, HTTPClientResponse, HTTPMethod; 169 170 flushing = true; 171 scope(exit) flushing = false; 172 173 immutable elasticIndex = indexCreator(); 174 immutable url = format!"http://%s:%d/%s/%s/_bulk"(elasticInfo.hostName, 175 elasticInfo.portNumber, elasticIndex, elasticInfo.typeName); 176 version(debug_elastic_logger) 177 { 178 () @trusted { stderr.writeln("\033[01;33m", url, "\033[0m"); }(); 179 } 180 auto requestBody = appender!string(); 181 foreach (ref entry; entries[0 .. logQueueIndex]) 182 { 183 requestBody.put(`{"index": {}}`); 184 requestBody.put("\n"); 185 requestBody.put(`{"message":"`); 186 requestBody.putEscapedString(entry.buffer.data); 187 requestBody.put(`","module":"`); 188 requestBody.put(entry.module_); 189 requestBody.put(`","function":"`); 190 requestBody.putEscapedString(entry.function_); 191 requestBody.put(`","file":"`); 192 requestBody.putEscapedString(entry.file); 193 requestBody.put(`","line":"`); 194 requestBody.put(entry.line.to!string()); 195 requestBody.put(`","level":"`); 196 requestBody.put(entry.level); 197 requestBody.put(`","fiberID":"`); 198 requestBody.put(entry.fiberID.to!string()); 199 requestBody.put(`","time":"`); 200 requestBody.put(entry.time); 201 requestBody.put(`"}`); 202 requestBody.put("\n"); 203 } 204 205 version(debug_elastic_logger) 206 { 207 () @trusted { stderr.writeln("\033[01;33m", requestBody.data, "\033[0m"); }(); 208 } 209 210 logQueueIndex = 0; 211 this.lastFlushTime = Clock.currTime(); 212 requestHTTP(url, 213 (scope request) { 214 request.method = HTTPMethod.POST; 215 request.writeBody(cast(ubyte[]) requestBody.data, "application/x-ndjson"); 216 }, 217 (scope response) { 218 version(debug_elastic_logger) 219 { 220 import vibe.stream.operations : readAllUTF8; 221 () @trusted { stderr.writeln("\033[01;33mStatus code: ", response.statusCode, "\033[0m"); }(); 222 () @trusted { stderr.writeln("\033[01;33mResponse message: ", response.bodyReader.readAllUTF8(), "\033[0m"); }(); 223 } 224 response.dropBody(); 225 }); 226 } 227 228 229 // See constructor docs 230 const IndexCreator indexCreator; 231 // See constructor docs 232 const Duration maxLogInterval; 233 // See constructor docs 234 const ElasticInfo elasticInfo; 235 // Time that the last flush happened 236 SysTime lastFlushTime; 237 // Message queue 238 LogEntry[] entries; 239 // Index into the `entries` buffer. 240 size_t logQueueIndex; 241 // True if the logger is currently flushing log info to the server. Prevents 242 // the HTTP request code from causing an infinite recursion. 243 bool flushing; 244 }