Index: trunk/udplog/debian/changelog |
— | — | @@ -1,3 +1,9 @@ |
| 2 | +udplog (1.4-1) unstable; urgency=low |
| 3 | + |
| 4 | + * Allow the stream flushing behaviour to be configured for each processor |
| 5 | + |
| 6 | + -- Tim Starling <tstarling@wikimedia.org> Wed, 24 Sep 2008 17:08:46 +1000 |
| 7 | + |
2 | 8 | udplog (1.3-1) unstable; urgency=low |
3 | 9 | |
4 | 10 | * Restart broken pipes after 5 seconds. Because awk dies after days or weeks with EFAULT. |
Index: trunk/udplog/udp2log/LogProcessor.cpp |
— | — | @@ -7,7 +7,7 @@ |
8 | 8 | // FileProcessor |
9 | 9 | //--------------------------------------------------------------------------- |
10 | 10 | |
11 | | -LogProcessor * FileProcessor::NewFromConfig(char * params) |
| 11 | +LogProcessor * FileProcessor::NewFromConfig(char * params, bool flush) |
12 | 12 | { |
13 | 13 | char * strFactor = strtok(params, " \t"); |
14 | 14 | if (strFactor == NULL) { |
— | — | @@ -22,7 +22,7 @@ |
23 | 23 | ); |
24 | 24 | } |
25 | 25 | char * filename = strtok(NULL, ""); |
26 | | - FileProcessor * fp = new FileProcessor(filename, factor); |
| 26 | + FileProcessor * fp = new FileProcessor(filename, factor, flush); |
27 | 27 | if (!fp->IsOpen()) { |
28 | 28 | delete fp; |
29 | 29 | throw ConfigError("Unable to open file"); |
— | — | @@ -35,7 +35,7 @@ |
36 | 36 | { |
37 | 37 | if (Sample()) { |
38 | 38 | f.write(buffer, size); |
39 | | - if (factor >= 10) { |
| 39 | + if (flush) { |
40 | 40 | f.flush(); |
41 | 41 | } |
42 | 42 | } |
— | — | @@ -45,7 +45,7 @@ |
46 | 46 | // PipeProcessor |
47 | 47 | //--------------------------------------------------------------------------- |
48 | 48 | |
49 | | -LogProcessor * PipeProcessor::NewFromConfig(char * params) |
| 49 | +LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush) |
50 | 50 | { |
51 | 51 | char * strFactor = strtok(params, " \t"); |
52 | 52 | if (strFactor == NULL) { |
— | — | @@ -60,7 +60,7 @@ |
61 | 61 | ); |
62 | 62 | } |
63 | 63 | char * command = strtok(NULL, ""); |
64 | | - PipeProcessor * pp = new PipeProcessor(command, factor); |
| 64 | + PipeProcessor * pp = new PipeProcessor(command, factor, flush); |
65 | 65 | if (!pp->IsOpen()) { |
66 | 66 | delete pp; |
67 | 67 | throw ConfigError("Unable to open pipe"); |
— | — | @@ -88,7 +88,7 @@ |
89 | 89 | // Reopen it after a few seconds |
90 | 90 | alarm(RESTART_INTERVAL); |
91 | 91 | } else { |
92 | | - if (factor >= 10) { |
| 92 | + if (flush) { |
93 | 93 | fflush(f); |
94 | 94 | } |
95 | 95 | } |
Index: trunk/udplog/udp2log/Udp2LogConfig.cpp |
— | — | @@ -39,21 +39,29 @@ |
40 | 40 | type = strtok(line, " \t"); |
41 | 41 | if (!type) { |
42 | 42 | continue; |
43 | | - } else { |
| 43 | + } |
| 44 | + |
| 45 | + params = strtok(NULL, ""); |
| 46 | + LogProcessor * processor = NULL; |
| 47 | + bool flush = false; |
| 48 | + |
| 49 | + if (!strcmp(type, "flush")) { |
| 50 | + flush = true; |
| 51 | + type = strtok(params, " \t"); |
44 | 52 | params = strtok(NULL, ""); |
45 | | - LogProcessor * processor = NULL; |
46 | | - if (!strcmp(type, "file")) { |
47 | | - processor = FileProcessor::NewFromConfig(params); |
48 | | - } else if (!strcmp(type, "pipe")) { |
49 | | - processor = PipeProcessor::NewFromConfig(params); |
50 | | - } else { |
51 | | - throw ConfigError("Unrecognised log type"); |
52 | | - } |
| 53 | + } |
53 | 54 | |
54 | | - if (processor) { |
55 | | - newProcessors.push_back(processor); |
56 | | - } |
| 55 | + if (!strcmp(type, "file")) { |
| 56 | + processor = FileProcessor::NewFromConfig(params, flush); |
| 57 | + } else if (!strcmp(type, "pipe")) { |
| 58 | + processor = PipeProcessor::NewFromConfig(params, flush); |
| 59 | + } else { |
| 60 | + throw ConfigError("Unrecognised log type"); |
57 | 61 | } |
| 62 | + |
| 63 | + if (processor) { |
| 64 | + newProcessors.push_back(processor); |
| 65 | + } |
58 | 66 | } |
59 | 67 | |
60 | 68 | // Swap in the new configuration |
Index: trunk/udplog/udp2log/LogProcessor.h |
— | — | @@ -13,8 +13,8 @@ |
14 | 14 | virtual ~LogProcessor() {} |
15 | 15 | |
16 | 16 | protected: |
17 | | - LogProcessor(int factor_) |
18 | | - : counter(0), factor(factor_) |
| 17 | + LogProcessor(int factor_, bool flush_) |
| 18 | + : counter(0), factor(factor_), flush(flush_) |
19 | 19 | {} |
20 | 20 | |
21 | 21 | |
— | — | @@ -34,16 +34,17 @@ |
35 | 35 | |
36 | 36 | int counter; |
37 | 37 | int factor; |
| 38 | + bool flush; |
38 | 39 | }; |
39 | 40 | |
40 | 41 | class FileProcessor : public LogProcessor |
41 | 42 | { |
42 | 43 | public: |
43 | | - static LogProcessor * NewFromConfig(char * params); |
| 44 | + static LogProcessor * NewFromConfig(char * params, bool flush); |
44 | 45 | virtual void ProcessLine(char *buffer, size_t size); |
45 | 46 | |
46 | | - FileProcessor(char * filename, int factor_) |
47 | | - : LogProcessor(factor_) |
| 47 | + FileProcessor(char * filename, int factor_, bool flush_) |
| 48 | + : LogProcessor(factor_, flush_) |
48 | 49 | { |
49 | 50 | f.open(filename, std::ios::app | std::ios::out); |
50 | 51 | } |
— | — | @@ -58,12 +59,12 @@ |
59 | 60 | class PipeProcessor : public LogProcessor |
60 | 61 | { |
61 | 62 | public: |
62 | | - static LogProcessor * NewFromConfig(char * params); |
| 63 | + static LogProcessor * NewFromConfig(char * params, bool flush); |
63 | 64 | virtual void ProcessLine(char *buffer, size_t size); |
64 | 65 | virtual void FixIfBroken(); |
65 | 66 | |
66 | | - PipeProcessor(char * command_, int factor_) |
67 | | - : LogProcessor(factor_) |
| 67 | + PipeProcessor(char * command_, int factor_, bool flush_) |
| 68 | + : LogProcessor(factor_, flush_) |
68 | 69 | { |
69 | 70 | command = strdup(command_); |
70 | 71 | f = popen(command, "w"); |