Index: trunk/udplog/udp2log/LogProcessor.cpp |
— | — | @@ -1,5 +1,9 @@ |
2 | 2 | #include <stdio.h> |
3 | 3 | #include <iostream> |
| 4 | +#include <unistd.h> |
| 5 | +#include <linux/limits.h> |
| 6 | +#include <sys/types.h> |
| 7 | +#include <sys/wait.h> |
4 | 8 | #include "LogProcessor.h" |
5 | 9 | #include "Udp2LogConfig.h" |
6 | 10 | |
— | — | @@ -31,7 +35,7 @@ |
32 | 36 | return (LogProcessor*)fp; |
33 | 37 | } |
34 | 38 | |
35 | | -void FileProcessor::ProcessLine(char *buffer, size_t size) |
| 39 | +void FileProcessor::ProcessLine(const char *buffer, size_t size) |
36 | 40 | { |
37 | 41 | if (Sample()) { |
38 | 42 | f.write(buffer, size); |
— | — | @@ -45,8 +49,15 @@ |
46 | 50 | // PipeProcessor |
47 | 51 | //--------------------------------------------------------------------------- |
48 | 52 | |
49 | | -LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush) |
| 53 | +PipeProcessor::PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_) |
| 54 | + : LogProcessor(factor_, flush_), child(0), blocking(blocking_) |
50 | 55 | { |
| 56 | + command = command_; |
| 57 | + Open(); |
| 58 | +} |
| 59 | + |
| 60 | +LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush, bool blocking) |
| 61 | +{ |
51 | 62 | char * strFactor = strtok(params, " \t"); |
52 | 63 | if (strFactor == NULL) { |
53 | 64 | throw ConfigError( |
— | — | @@ -60,7 +71,7 @@ |
61 | 72 | ); |
62 | 73 | } |
63 | 74 | char * command = strtok(NULL, ""); |
64 | | - PipeProcessor * pp = new PipeProcessor(command, factor, flush); |
| 75 | + PipeProcessor * pp = new PipeProcessor(command, factor, flush, blocking); |
65 | 76 | if (!pp->IsOpen()) { |
66 | 77 | delete pp; |
67 | 78 | throw ConfigError("Unable to open pipe"); |
— | — | @@ -69,37 +80,54 @@ |
70 | 81 | return (LogProcessor*)pp; |
71 | 82 | } |
72 | 83 | |
73 | | -void PipeProcessor::ProcessLine(char *buffer, size_t size) |
| 84 | +void PipeProcessor::HandleError(libc_error & e) |
74 | 85 | { |
75 | | - if (!f) { |
| 86 | + bool restart; |
| 87 | + if (e.code == EAGAIN) { |
| 88 | + numLost++; |
| 89 | + restart = false; |
| 90 | + } else if (e.code == EPIPE) { |
| 91 | + std::cerr << "Pipe terminated, suspending output: " << command << std::endl; |
| 92 | + restart = true; |
| 93 | + } else { |
| 94 | + std::cerr << "Error writing to pipe: " << e.what() << std::endl; |
| 95 | + restart = true; |
| 96 | + } |
| 97 | + if (restart) { |
| 98 | + Close(); |
| 99 | + // Reopen it after a few seconds |
| 100 | + alarm(RESTART_INTERVAL); |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +void PipeProcessor::ProcessLine(const char *buffer, size_t size) |
| 105 | +{ |
| 106 | + if (!child) { |
76 | 107 | return; |
77 | 108 | } |
78 | 109 | |
| 110 | + |
79 | 111 | if (Sample()) { |
80 | | - fwrite(buffer, 1, size, f); |
81 | | - if (ferror(f)) { |
82 | | - if (errno == EPIPE) { |
83 | | - std::cerr << "Pipe terminated, suspending output: " << command << std::endl; |
| 112 | + try { |
| 113 | + if (blocking && size > PIPE_BUF) { |
| 114 | + // Write large packets in blocking mode to preserve data integrity |
| 115 | + GetPipe().SetStatusFlags(0); |
| 116 | + GetPipe().Write(buffer, size); |
| 117 | + GetPipe().SetStatusFlags(O_NONBLOCK); |
84 | 118 | } else { |
85 | | - std::cerr << "Error writing to pipe: " << strerror(errno) << std::endl; |
| 119 | + GetPipe().Write(buffer, size); |
86 | 120 | } |
87 | | - pclose(f); |
88 | | - f = NULL; |
89 | | - // Reopen it after a few seconds |
90 | | - alarm(RESTART_INTERVAL); |
91 | | - } else { |
92 | | - if (flush) { |
93 | | - fflush(f); |
94 | | - } |
| 121 | + } catch (libc_error & e) { |
| 122 | + HandleError(e); |
95 | 123 | } |
96 | 124 | } |
97 | 125 | } |
98 | 126 | |
99 | 127 | void PipeProcessor::FixIfBroken() |
100 | 128 | { |
101 | | - if (!f) { |
102 | | - f = popen(command, "w"); |
103 | | - if (!f) { |
| 129 | + if (!child) { |
| 130 | + Open(); |
| 131 | + if (!child) { |
104 | 132 | std::cerr << "Unable to restart pipe: " << command << std::endl; |
105 | 133 | // Try again later |
106 | 134 | alarm(RESTART_INTERVAL); |
— | — | @@ -108,3 +136,71 @@ |
109 | 137 | } |
110 | 138 | } |
111 | 139 | } |
| 140 | + |
| 141 | +void PipeProcessor::Close() |
| 142 | +{ |
| 143 | + if (child) { |
| 144 | + int status = 0; |
| 145 | + waitpid(child, &status, 0); |
| 146 | + GetPipe().Close(); |
| 147 | + child = 0; |
| 148 | + } |
| 149 | +} |
| 150 | + |
| 151 | +void PipeProcessor::Open() |
| 152 | +{ |
| 153 | + if (child) { |
| 154 | + throw std::runtime_error("PipeProcessor::Open called but the pipe is already open"); |
| 155 | + } |
| 156 | + pipes = PipePairPointer(new PipePair); |
| 157 | + child = fork(); |
| 158 | + if (!child) { |
| 159 | + // This is the child process |
| 160 | + pipes->writeEnd.Close(); |
| 161 | + pipes->readEnd.Dup2(STDIN_FILENO); |
| 162 | + pipes->readEnd.Close(); |
| 163 | + |
| 164 | + // Run the command, similar to how popen() does it |
| 165 | + std::string fullCommand = std::string("exec ") + command; |
| 166 | + execl("/bin/sh", "sh", "-c", fullCommand.c_str(), NULL); |
| 167 | + throw libc_error("PipeProcessor::Open"); |
| 168 | + } |
| 169 | + |
| 170 | + if (child == -1) { |
| 171 | + // Fork failed |
| 172 | + child = 0; |
| 173 | + throw libc_error("PipeProcessor::Open"); |
| 174 | + } |
| 175 | + |
| 176 | + // This is the parent process |
| 177 | + pipes->readEnd.Close(); |
| 178 | + pipes->writeEnd.SetDescriptorFlags(FD_CLOEXEC); |
| 179 | + if (!blocking) { |
| 180 | + pipes->writeEnd.SetStatusFlags(O_NONBLOCK); |
| 181 | + } |
| 182 | +} |
| 183 | + |
| 184 | +void PipeProcessor::CopyFromPipe(Pipe & source, size_t dataLength) |
| 185 | +{ |
| 186 | + if (!child) { |
| 187 | + return; |
| 188 | + } |
| 189 | + |
| 190 | + int flags = 0; |
| 191 | + if (!blocking) { |
| 192 | + flags |= SPLICE_F_NONBLOCK; |
| 193 | + } |
| 194 | + try { |
| 195 | + if (blocking && dataLength > PIPE_BUF) { |
| 196 | + // Write large packets in blocking mode to preserve data integrity |
| 197 | + GetPipe().SetStatusFlags(0); |
| 198 | + source.Tee(GetPipe(), dataLength, 0); |
| 199 | + GetPipe().SetStatusFlags(O_NONBLOCK); |
| 200 | + } else { |
| 201 | + source.Tee(GetPipe(), dataLength, flags); |
| 202 | + } |
| 203 | + } catch (libc_error & e) { |
| 204 | + HandleError(e); |
| 205 | + } |
| 206 | +} |
| 207 | + |
Index: trunk/udplog/udp2log/Udp2LogConfig.cpp |
— | — | @@ -4,13 +4,14 @@ |
5 | 5 | |
6 | 6 | #include "Udp2LogConfig.h" |
7 | 7 | |
| 8 | +PosixFile Udp2LogConfig::devNull("/dev/null", O_WRONLY); |
| 9 | + |
8 | 10 | Udp2LogConfig::Udp2LogConfig() |
9 | 11 | : reload(false) |
10 | 12 | {} |
11 | 13 | |
12 | 14 | void Udp2LogConfig::Open(const std::string & name) |
13 | | -{ |
14 | | - using namespace std; |
| 15 | +{ |
15 | 16 | fileName = name; |
16 | 17 | Load(); |
17 | 18 | } |
— | — | @@ -43,18 +44,24 @@ |
44 | 45 | |
45 | 46 | params = strtok(NULL, ""); |
46 | 47 | LogProcessor * processor = NULL; |
47 | | - bool flush = false; |
| 48 | + bool flush = false, blocking = false; |
48 | 49 | |
49 | 50 | if (!strcmp(type, "flush")) { |
50 | 51 | flush = true; |
51 | 52 | type = strtok(params, " \t"); |
52 | 53 | params = strtok(NULL, ""); |
53 | 54 | } |
| 55 | + if (!strcmp(type, "blocking")) { |
| 56 | + blocking = true; |
| 57 | + type = strtok(params, " \t"); |
| 58 | + params = strtok(NULL, ""); |
| 59 | + } |
54 | 60 | |
55 | 61 | if (!strcmp(type, "file")) { |
| 62 | + // TODO: support blocking for FileProcessor |
56 | 63 | processor = FileProcessor::NewFromConfig(params, flush); |
57 | 64 | } else if (!strcmp(type, "pipe")) { |
58 | | - processor = PipeProcessor::NewFromConfig(params, flush); |
| 65 | + processor = PipeProcessor::NewFromConfig(params, flush, blocking); |
59 | 66 | } else { |
60 | 67 | throw ConfigError("Unrecognised log type"); |
61 | 68 | } |
— | — | @@ -94,11 +101,75 @@ |
95 | 102 | } |
96 | 103 | } |
97 | 104 | |
98 | | -void Udp2LogConfig::ProcessLine(char *buffer, size_t size) |
| 105 | +void Udp2LogConfig::ProcessBuffer(const char *data, size_t dataLength) |
99 | 106 | { |
100 | | - boost::ptr_vector<LogProcessor>::iterator i; |
101 | | - for (i = processors.begin(); i != processors.end(); i++) { |
102 | | - i->ProcessLine(buffer, size); |
| 107 | + boost::ptr_vector<LogProcessor>::iterator iter; |
| 108 | + int numPipes = 0; |
| 109 | + |
| 110 | + for (iter = processors.begin(); iter != processors.end(); iter++) { |
| 111 | + if (iter->IsUnsampledPipe() && iter->IsOpen()) { |
| 112 | + numPipes++; |
| 113 | + } |
103 | 114 | } |
| 115 | + |
| 116 | + if (numPipes >= 2) { |
| 117 | + // Efficiently send the packet to the unsampled pipes. |
| 118 | + // First load the data into a pipe buffer. |
| 119 | + // It just so happens that the size of the pipe buffer is the same |
| 120 | + // as the maximum size of a UDP packet, so this won't block. |
| 121 | + bufferPipe.writeEnd.Write(data, dataLength); |
| 122 | + |
| 123 | + // Now send it out to all unsampled pipes using the zero-copy tee() syscall. |
| 124 | + for (iter = processors.begin(); iter != processors.end(); iter++) { |
| 125 | + if (iter->IsUnsampledPipe() && iter->IsOpen()) { |
| 126 | + PipeProcessor & p = dynamic_cast<PipeProcessor&>(*iter); |
| 127 | + p.CopyFromPipe(bufferPipe.readEnd, dataLength); |
| 128 | + } |
| 129 | + } |
| 130 | + |
| 131 | + // Finally flush the pipe buffer by splicing it out to /dev/null |
| 132 | + bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, dataLength, 0); |
| 133 | + } |
| 134 | + |
| 135 | + // For the sampled processors, split the buffer into lines |
| 136 | + const char *line1, *line2; |
| 137 | + ssize_t bytesRemaining; |
| 138 | + |
| 139 | + line1 = data; |
| 140 | + bytesRemaining = dataLength; |
| 141 | + while (bytesRemaining) { |
| 142 | + size_t lineLength; |
| 143 | + |
| 144 | + // Find the next line break |
| 145 | + line2 = (char*)memchr(line1, '\n', bytesRemaining); |
| 146 | + if (line2) { |
| 147 | + // advance line2 to the start of the next line |
| 148 | + line2++; |
| 149 | + lineLength = line2 - line1; |
| 150 | + } else { |
| 151 | + // no more lines, process the remainder of the buffer |
| 152 | + lineLength = bytesRemaining; |
| 153 | + } |
| 154 | + |
| 155 | + for (iter = processors.begin(); iter != processors.end(); iter++) { |
| 156 | + if (numPipes >= 2 && iter->IsUnsampledPipe()) { |
| 157 | + continue; |
| 158 | + } |
| 159 | + |
| 160 | + iter->ProcessLine(line1, lineLength); |
| 161 | + } |
| 162 | + bytesRemaining -= lineLength; |
| 163 | + line1 = line2; |
| 164 | + } |
104 | 165 | } |
105 | 166 | |
| 167 | +void Udp2LogConfig::PrintLossReport(std::ostream & os) |
| 168 | +{ |
| 169 | + boost::ptr_vector<LogProcessor>::iterator iter; |
| 170 | + for (iter = processors.begin(); iter != processors.end(); iter++) { |
| 171 | + uint64_t lost = iter->GetNumLost(); |
| 172 | + if (lost) { |
| 173 | + os << "Lost output chunks: " << lost << ": " << iter->GetName() << "\n"; |
| 174 | + } |
| 175 | + } |
| 176 | +} |
Index: trunk/udplog/udp2log/LogProcessor.h |
— | — | @@ -3,18 +3,34 @@ |
4 | 4 | |
5 | 5 | #include <fstream> |
6 | 6 | #include <sys/time.h> |
| 7 | +#include <string> |
| 8 | +#include "../srclib/Exception.h" |
7 | 9 | #include "../srclib/Socket.h" |
| 10 | +#include "../srclib/Pipe.h" |
8 | 11 | |
9 | 12 | class LogProcessor |
10 | 13 | { |
11 | 14 | public: |
12 | | - virtual void ProcessLine(char *buffer, size_t size) = 0; |
| 15 | + virtual void ProcessLine(const char *buffer, size_t size) = 0; |
13 | 16 | virtual void FixIfBroken() {} |
14 | 17 | virtual ~LogProcessor() {} |
| 18 | + virtual bool IsOpen() = 0; |
| 19 | + virtual std::string & GetName() = 0; |
15 | 20 | |
| 21 | + virtual int IsUnsampledPipe() { |
| 22 | + return false; |
| 23 | + } |
| 24 | + |
| 25 | + int GetFactor() const |
| 26 | + { |
| 27 | + return factor; |
| 28 | + } |
| 29 | + |
| 30 | + uint64_t GetNumLost() { return numLost; } |
| 31 | + |
16 | 32 | protected: |
17 | 33 | LogProcessor(int factor_, bool flush_) |
18 | | - : counter(0), factor(factor_), flush(flush_) |
| 34 | + : counter(0), factor(factor_), flush(flush_), numLost(0) |
19 | 35 | {} |
20 | 36 | |
21 | 37 | |
— | — | @@ -35,57 +51,80 @@ |
36 | 52 | int counter; |
37 | 53 | int factor; |
38 | 54 | bool flush; |
| 55 | + uint64_t numLost; |
39 | 56 | }; |
40 | 57 | |
41 | 58 | class FileProcessor : public LogProcessor |
42 | 59 | { |
43 | 60 | public: |
44 | 61 | static LogProcessor * NewFromConfig(char * params, bool flush); |
45 | | - virtual void ProcessLine(char *buffer, size_t size); |
| 62 | + virtual void ProcessLine(const char *buffer, size_t size); |
46 | 63 | |
47 | | - FileProcessor(char * filename, int factor_, bool flush_) |
| 64 | + FileProcessor(char * fileName_, int factor_, bool flush_) |
48 | 65 | : LogProcessor(factor_, flush_) |
49 | 66 | { |
50 | | - f.open(filename, std::ios::app | std::ios::out); |
| 67 | + fileName = fileName_; |
| 68 | + f.open(fileName_, std::ios::app | std::ios::out); |
51 | 69 | } |
52 | 70 | |
53 | | - bool IsOpen() { |
| 71 | + virtual bool IsOpen() { |
54 | 72 | return f.good(); |
55 | 73 | } |
56 | 74 | |
| 75 | + |
| 76 | + virtual std::string & GetName() { |
| 77 | + return fileName; |
| 78 | + } |
| 79 | + |
57 | 80 | std::ofstream f; |
| 81 | + std::string fileName; |
58 | 82 | }; |
59 | 83 | |
60 | 84 | class PipeProcessor : public LogProcessor |
61 | 85 | { |
62 | 86 | public: |
63 | | - static LogProcessor * NewFromConfig(char * params, bool flush); |
64 | | - virtual void ProcessLine(char *buffer, size_t size); |
| 87 | + static LogProcessor * NewFromConfig(char * params, bool flush, bool blocking); |
| 88 | + virtual void ProcessLine(const char *buffer, size_t size); |
65 | 89 | virtual void FixIfBroken(); |
66 | 90 | |
67 | | - PipeProcessor(char * command_, int factor_, bool flush_) |
68 | | - : LogProcessor(factor_, flush_) |
69 | | - { |
70 | | - command = strdup(command_); |
71 | | - f = popen(command, "w"); |
72 | | - } |
| 91 | + PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_); |
73 | 92 | |
74 | 93 | ~PipeProcessor() |
75 | 94 | { |
76 | | - free(command); |
77 | | - if (f) { |
78 | | - pclose(f); |
79 | | - } |
| 95 | + Close(); |
80 | 96 | } |
81 | 97 | |
82 | | - bool IsOpen() |
| 98 | + virtual bool IsOpen() |
83 | 99 | { |
84 | | - return (bool)f; |
| 100 | + return (bool)child; |
85 | 101 | } |
86 | 102 | |
87 | | - FILE * f; |
88 | | - char * command; |
| 103 | + virtual int IsUnsampledPipe() { |
| 104 | + return factor == 1; |
| 105 | + } |
| 106 | + |
| 107 | + Pipe & GetPipe() { |
| 108 | + return pipes->writeEnd; |
| 109 | + } |
| 110 | + |
| 111 | + void CopyFromPipe(Pipe & source, size_t dataLength); |
| 112 | + virtual std::string & GetName() { return command; } |
| 113 | + |
| 114 | +protected: |
| 115 | + typedef boost::shared_ptr<PipePair> PipePairPointer; |
| 116 | + |
| 117 | + void Open(); |
| 118 | + void Close(); |
| 119 | + void HandleError(libc_error & e); |
| 120 | + |
| 121 | + std::string command; |
| 122 | + |
| 123 | + PipePairPointer pipes; |
| 124 | + pid_t child; |
| 125 | + bool blocking; |
| 126 | + |
89 | 127 | enum {RESTART_INTERVAL = 5}; |
| 128 | + |
90 | 129 | }; |
91 | 130 | |
92 | 131 | |
Index: trunk/udplog/udp2log/Udp2LogConfig.h |
— | — | @@ -4,8 +4,17 @@ |
5 | 5 | #include <string> |
6 | 6 | #include <boost/ptr_container/ptr_vector.hpp> |
7 | 7 | #include <stdexcept> |
| 8 | +#include <iostream> |
8 | 9 | #include "LogProcessor.h" |
| 10 | +#include "../srclib/PosixFile.h" |
| 11 | +#include "../srclib/Pipe.h" |
9 | 12 | |
| 13 | +class ConfigWriteCallback; |
| 14 | + |
| 15 | +/** |
| 16 | + * The configuration and current processing state for udp2log. |
| 17 | + * TODO: rename this to Udp2LogState |
| 18 | + */ |
10 | 19 | class Udp2LogConfig |
11 | 20 | { |
12 | 21 | public: |
— | — | @@ -14,12 +23,20 @@ |
15 | 24 | void Load(); |
16 | 25 | void FixBrokenProcessors(); |
17 | 26 | void Reload(); |
18 | | - void ProcessLine(char *buffer, size_t size); |
| 27 | + void ProcessBuffer(const char *data, size_t dataLength); |
| 28 | + void PrintLossReport(std::ostream & os); |
19 | 29 | |
| 30 | + inline ConfigWriteCallback GetWriteCallback(); |
| 31 | + |
| 32 | + bool reload; |
| 33 | + bool fixBrokenProcessors; |
| 34 | + |
| 35 | +protected: |
20 | 36 | std::string fileName; |
21 | 37 | boost::ptr_vector<LogProcessor> processors; |
22 | | - bool reload; |
23 | | - bool fixBrokenProcessors; |
| 38 | + PipePair bufferPipe; |
| 39 | + |
| 40 | + static PosixFile devNull; |
24 | 41 | }; |
25 | 42 | |
26 | 43 | class ConfigWatcher |
— | — | @@ -39,4 +56,23 @@ |
40 | 57 | {} |
41 | 58 | }; |
42 | 59 | |
| 60 | +class ConfigWriteCallback |
| 61 | +{ |
| 62 | +public: |
| 63 | + ConfigWriteCallback(Udp2LogConfig & config_) |
| 64 | + : config(config_) |
| 65 | + {} |
| 66 | + |
| 67 | + void operator()(const char* buffer, size_t bufSize) { |
| 68 | + return config.ProcessBuffer(buffer, bufSize); |
| 69 | + } |
| 70 | +protected: |
| 71 | + Udp2LogConfig & config; |
| 72 | +}; |
| 73 | + |
| 74 | +inline ConfigWriteCallback Udp2LogConfig::GetWriteCallback() { |
| 75 | + return ConfigWriteCallback(*this); |
| 76 | +} |
| 77 | + |
| 78 | + |
43 | 79 | #endif |
Index: trunk/udplog/udp2log/SendBuffer.h |
— | — | @@ -0,0 +1,43 @@ |
| 2 | +#ifndef UDPLOG_SENDBUFFER_H |
| 3 | +#define UDPLOG_SENDBUFFER_H |
| 4 | + |
| 5 | +template <class Callback> |
| 6 | +class SendBuffer { |
| 7 | +public: |
| 8 | + SendBuffer(size_t capacity_, Callback writeCallback_) |
| 9 | + : capacity(capacity_), size(0), writeCallback(writeCallback_) |
| 10 | + { |
| 11 | + data = new char[capacity]; |
| 12 | + } |
| 13 | + |
| 14 | + void Flush() { |
| 15 | + if (!size) { |
| 16 | + return; |
| 17 | + } |
| 18 | + writeCallback(data, size); |
| 19 | + size = 0; |
| 20 | + } |
| 21 | + |
| 22 | + void Write(const char* buffer, size_t bufSize); |
| 23 | +protected: |
| 24 | + char * data; |
| 25 | + size_t capacity, size; |
| 26 | + Callback writeCallback; |
| 27 | +}; |
| 28 | + |
| 29 | +template <class Callback> |
| 30 | +void SendBuffer<Callback>::Write(const char* buffer, size_t bufSize) |
| 31 | +{ |
| 32 | + if (bufSize > capacity) { |
| 33 | + Flush(); |
| 34 | + writeCallback(buffer, bufSize); |
| 35 | + } else if (size + bufSize > capacity) { |
| 36 | + Flush(); |
| 37 | + memcpy(data, buffer, bufSize); |
| 38 | + size = bufSize; |
| 39 | + } else { |
| 40 | + memcpy(data + size, buffer, bufSize); |
| 41 | + size += bufSize; |
| 42 | + } |
| 43 | +} |
| 44 | +#endif |
Property changes on: trunk/udplog/udp2log/SendBuffer.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 45 | + native |
Index: trunk/udplog/udp2log/udp2log.cpp |
— | — | @@ -13,10 +13,12 @@ |
14 | 14 | #include <fcntl.h> |
15 | 15 | #include <pwd.h> |
16 | 16 | #include <cstdlib> |
| 17 | +#include <functional> |
17 | 18 | |
18 | 19 | #include "../srclib/Exception.h" |
19 | 20 | #include "../srclib/Socket.h" |
20 | 21 | #include "Udp2LogConfig.h" |
| 22 | +#include "SendBuffer.h" |
21 | 23 | |
22 | 24 | std::string configFileName("/etc/udp2log"); |
23 | 25 | std::string logFileName("/var/log/udp2log/udp2log.log"); |
— | — | @@ -26,7 +28,7 @@ |
27 | 29 | |
28 | 30 | Udp2LogConfig config; |
29 | 31 | |
30 | | -void OnHangup(int) |
| 32 | +void OnHangup(int) |
31 | 33 | { |
32 | 34 | config.reload = true; |
33 | 35 | } |
— | — | @@ -53,7 +55,7 @@ |
54 | 56 | |
55 | 57 | // Change user |
56 | 58 | if (setgid(userData->pw_gid) == -1 |
57 | | - || setuid(userData->pw_uid) == -1) |
| 59 | + || setuid(userData->pw_uid) == -1) |
58 | 60 | { |
59 | 61 | throw libc_error("Error changing user ID"); |
60 | 62 | } |
— | — | @@ -188,6 +190,7 @@ |
189 | 191 | cerr << "Unable to open socket\n"; |
190 | 192 | return 1; |
191 | 193 | } |
| 194 | + socket.SetDescriptorFlags(FD_CLOEXEC); |
192 | 195 | socket.Bind(saddr); |
193 | 196 | |
194 | 197 | // Join a multicast group if requested |
— | — | @@ -199,14 +202,18 @@ |
200 | 203 | return 1; |
201 | 204 | } |
202 | 205 | } |
| 206 | + |
203 | 207 | // Process received packets |
204 | 208 | boost::shared_ptr<SocketAddress> address; |
205 | 209 | const size_t bufSize = 65536; |
206 | | - char buffer[bufSize]; |
207 | | - char *line1, *line2; |
208 | | - ssize_t bytesRemaining, bytesRead; |
| 210 | + char receiveBuffer[bufSize]; |
| 211 | + ssize_t bytesRead; |
| 212 | + SendBuffer<ConfigWriteCallback> sendBuffer(PIPE_BUF, config.GetWriteCallback()); |
| 213 | + |
| 214 | + time_t lossReportTime = 0; |
| 215 | + |
209 | 216 | for (;;) { |
210 | | - bytesRead = socket.RecvFrom(buffer, bufSize, address); |
| 217 | + bytesRead = socket.RecvFrom(receiveBuffer, bufSize, address); |
211 | 218 | if (bytesRead <= 0) { |
212 | 219 | continue; |
213 | 220 | } |
— | — | @@ -219,24 +226,12 @@ |
220 | 227 | continue; |
221 | 228 | } |
222 | 229 | |
223 | | - // Split into lines and hand off to the processors |
224 | | - line1 = buffer; |
225 | | - bytesRemaining = bytesRead; |
226 | | - while (bytesRemaining) { |
227 | | - // Find the next line break |
228 | | - line2 = (char*)memchr(line1, '\n', bytesRemaining); |
229 | | - if (line2) { |
230 | | - // advance line2 to the start of the next line |
231 | | - line2++; |
232 | | - // Process the line |
233 | | - config.ProcessLine(line1, line2 - line1); |
234 | | - bytesRemaining -= line2 - line1; |
235 | | - } else { |
236 | | - // no more lines, process the remainder of the buffer |
237 | | - config.ProcessLine(line1, bytesRemaining); |
238 | | - bytesRemaining = 0; |
239 | | - } |
240 | | - line1 = line2; |
| 230 | + sendBuffer.Write(receiveBuffer, bytesRead); |
| 231 | + |
| 232 | + time_t currentTime = time(NULL); |
| 233 | + if (currentTime - lossReportTime > 60) { |
| 234 | + config.PrintLossReport(std::cout); |
| 235 | + lossReportTime = currentTime; |
241 | 236 | } |
242 | 237 | } |
243 | 238 | } |
Index: trunk/udplog/Makefile |
— | — | @@ -1,8 +1,9 @@ |
2 | 2 | TARGETS = log2udp udprecv delta udp2log/udp2log packet-loss |
3 | | -HOST_OBJS = srcmisc/host.o srclib/HostEntry.o srclib/IPAddress.o |
4 | | -LOG2UDP_OBJS = srcmisc/log2udp.o srclib/HostEntry.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o |
5 | | -UDPRECV_OBJS = srcmisc/udprecv.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o |
6 | | -UDP2LOG_OBJS = udp2log/udp2log.o udp2log/LogProcessor.o udp2log/Udp2LogConfig.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o |
| 3 | +SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o |
| 4 | +HOST_OBJS = srcmisc/host.o $(SRCLIB_OBJS) |
| 5 | +LOG2UDP_OBJS = srcmisc/log2udp.o $(SRCLIB_OBJS) |
| 6 | +UDPRECV_OBJS = srcmisc/udprecv.o $(SRCLIB_OBJS) |
| 7 | +UDP2LOG_OBJS = udp2log/udp2log.o udp2log/LogProcessor.o udp2log/Udp2LogConfig.o $(SRCLIB_OBJS) |
7 | 8 | CFLAGS:=$(CFLAGS) -Wall |
8 | 9 | |
9 | 10 | all: $(TARGETS) |
Index: trunk/udplog/srclib/FileDescriptor.cpp |
— | — | @@ -0,0 +1,22 @@ |
| 2 | +#include "FileDescriptor.h" |
| 3 | +#include "Exception.h" |
| 4 | + |
| 5 | +void FileDescriptor::RaiseError(const char* msg) |
| 6 | +{ |
| 7 | + if (ignoreErrors.size() == 0) { |
| 8 | + // Ignore none |
| 9 | + throw libc_error(msg); |
| 10 | + } |
| 11 | + |
| 12 | + std::set<int> * curIgnore = ignoreErrors.back().get(); |
| 13 | + if (!curIgnore) { |
| 14 | + // Ignore all |
| 15 | + return; |
| 16 | + } |
| 17 | + |
| 18 | + if (!curIgnore->count(errno)) { |
| 19 | + // Don't ignore this one |
| 20 | + throw libc_error(msg); |
| 21 | + } |
| 22 | +} |
| 23 | + |
Property changes on: trunk/udplog/srclib/FileDescriptor.cpp |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 24 | + native |
Index: trunk/udplog/srclib/Pipe.h |
— | — | @@ -0,0 +1,63 @@ |
| 2 | +#ifndef UDPLOG_PIPE_H |
| 3 | +#define UDPLOG_PIPE_H |
| 4 | + |
| 5 | +#ifndef _GNU_SOURCE |
| 6 | +#define _GNU_SOURCE |
| 7 | +#endif |
| 8 | +#include <fcntl.h> |
| 9 | + |
| 10 | +class Pipe : public FileDescriptor |
| 11 | +{ |
| 12 | +public: |
| 13 | + friend class PipePair; |
| 14 | + |
| 15 | + ssize_t Tee(Pipe & output, size_t length, unsigned int flags) |
| 16 | + { |
| 17 | + ssize_t result = tee(fd, output.fd, length, flags); |
| 18 | + if (result == -1) { |
| 19 | + RaiseError("Pipe::Tee"); |
| 20 | + } |
| 21 | + return result; |
| 22 | + } |
| 23 | + |
| 24 | + ssize_t Splice(FileDescriptor & output, size_t length, unsigned int flags) |
| 25 | + { |
| 26 | + ssize_t result = splice(fd, NULL, output.GetFD(), NULL, length, flags); |
| 27 | + if (result == -1) { |
| 28 | + RaiseError("Pipe::Splice"); |
| 29 | + } |
| 30 | + return result; |
| 31 | + } |
| 32 | +}; |
| 33 | + |
| 34 | +class PipePair |
| 35 | +{ |
| 36 | +public: |
| 37 | + |
| 38 | + PipePair() |
| 39 | + { |
| 40 | + int pipefd[2]; |
| 41 | + |
| 42 | + if (pipe(pipefd) == -1) { |
| 43 | + readEnd.RaiseError("PipePair constructor"); |
| 44 | + good = false; |
| 45 | + } else { |
| 46 | + good = true; |
| 47 | + readEnd.fd = pipefd[0]; |
| 48 | + writeEnd.fd = pipefd[1]; |
| 49 | + readEnd.good = true; |
| 50 | + writeEnd.good = true; |
| 51 | + } |
| 52 | + } |
| 53 | + |
| 54 | + operator bool() { |
| 55 | + return good; |
| 56 | + } |
| 57 | + |
| 58 | + Pipe readEnd, writeEnd; |
| 59 | +protected: |
| 60 | + bool good; |
| 61 | +}; |
| 62 | + |
| 63 | + |
| 64 | +#endif |
Property changes on: trunk/udplog/srclib/Pipe.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 65 | + native |
Index: trunk/udplog/srclib/Exception.h |
— | — | @@ -9,7 +9,8 @@ |
10 | 10 | class libc_error : public std::runtime_error { |
11 | 11 | public: |
12 | 12 | libc_error(const std::string & what_arg) |
13 | | - : std::runtime_error(std::string(what_arg).append(": ").append(std::strerror(errno))) |
| 13 | + : std::runtime_error(std::string(what_arg).append(": ").append(std::strerror(errno))), |
| 14 | + code(errno) |
14 | 15 | {} |
15 | 16 | |
16 | 17 | const char* what() { |
— | — | @@ -19,5 +20,6 @@ |
20 | 21 | virtual ~libc_error() throw() {} |
21 | 22 | |
22 | 23 | std::string msg; |
| 24 | + int code; |
23 | 25 | }; |
24 | 26 | #endif |
Index: trunk/udplog/srclib/FileDescriptor.h |
— | — | @@ -0,0 +1,136 @@ |
| 2 | +#ifndef UDPLOG_FILEDESCRIPTOR_H |
| 3 | +#define UDPLOG_FILEDESCRIPTOR_H |
| 4 | + |
| 5 | +#include <set> |
| 6 | +#include <vector> |
| 7 | +#include <cerrno> |
| 8 | +#include <boost/shared_ptr.hpp> |
| 9 | +#include <unistd.h> |
| 10 | +#include <fcntl.h> |
| 11 | + |
| 12 | +class Socket; |
| 13 | + |
| 14 | +class FileDescriptor |
| 15 | +{ |
| 16 | +private: |
| 17 | + // Not copyable |
| 18 | + FileDescriptor(const Socket & s) {} |
| 19 | + |
| 20 | +protected: |
| 21 | + FileDescriptor() |
| 22 | + : fd(-1), good(false), ownFd(true) |
| 23 | + {} |
| 24 | +public: |
| 25 | + |
| 26 | + virtual ~FileDescriptor() { |
| 27 | + Close(); |
| 28 | + } |
| 29 | + |
| 30 | + operator bool() { |
| 31 | + return good; |
| 32 | + } |
| 33 | + |
| 34 | + int GetFD() const { |
| 35 | + return fd; |
| 36 | + } |
| 37 | + |
| 38 | + // close() wrapper |
| 39 | + int Close() { |
| 40 | + int status = 0; |
| 41 | + if (fd != -1 && ownFd) { |
| 42 | + status = close(fd); |
| 43 | + fd = -1; |
| 44 | + good = false; |
| 45 | + if (status == -1) { |
| 46 | + RaiseError("FileDescriptor::Close"); |
| 47 | + } |
| 48 | + } |
| 49 | + return status; |
| 50 | + } |
| 51 | + |
| 52 | + // read() wrapper |
| 53 | + int Read(void *buf, size_t len) { |
| 54 | + ssize_t length = read(fd, buf, len); |
| 55 | + if (length == (ssize_t)-1) { |
| 56 | + RaiseError("FileDescriptor::Read"); |
| 57 | + } |
| 58 | + return length; |
| 59 | + } |
| 60 | + |
| 61 | + // write() wrapper |
| 62 | + int Write(const void *buf, size_t len) { |
| 63 | + ssize_t length = write(fd, buf, len); |
| 64 | + if (length == (ssize_t)-1) { |
| 65 | + RaiseError("FileDescriptor::Write"); |
| 66 | + } |
| 67 | + return length; |
| 68 | + } |
| 69 | + |
| 70 | + // fcntl(F_SETFD) wrapper |
| 71 | + int SetDescriptorFlags(long flags) { |
| 72 | + int status = fcntl(fd, F_SETFD, flags); |
| 73 | + if (status == -1) { |
| 74 | + RaiseError("FileDescriptor::SetDescriptorFlags"); |
| 75 | + } |
| 76 | + return status; |
| 77 | + } |
| 78 | + |
| 79 | + // fcntl(F_GETFD) wrapper |
| 80 | + long GetDescriptorFlags() { |
| 81 | + int flags = fcntl(fd, F_GETFD); |
| 82 | + if (flags == -1) { |
| 83 | + RaiseError("FileDescriptor::GetDescriptorFlags"); |
| 84 | + } |
| 85 | + return flags; |
| 86 | + } |
| 87 | + |
| 88 | + // fcntl(F_SETFL) wrapper |
| 89 | + int SetStatusFlags(long flags) { |
| 90 | + int status = fcntl(fd, F_SETFL, flags); |
| 91 | + if (status == -1) { |
| 92 | + RaiseError("FileDescriptor::SetStatusFlags"); |
| 93 | + } |
| 94 | + return status; |
| 95 | + } |
| 96 | + |
| 97 | + // fcntl(F_GETFL) wrapper |
| 98 | + long GetStatusFlags() { |
| 99 | + int flags = fcntl(fd, F_GETFL); |
| 100 | + if (flags == -1) { |
| 101 | + RaiseError("FileDescriptor::GetStatusFlags"); |
| 102 | + } |
| 103 | + return flags; |
| 104 | + } |
| 105 | + |
| 106 | + int Dup2(int newfd) { |
| 107 | + int status = dup2(fd, newfd); |
| 108 | + if (status == -1) { |
| 109 | + RaiseError("FileDescriptor::Dup2"); |
| 110 | + } |
| 111 | + return status; |
| 112 | + } |
| 113 | + |
| 114 | + // Ignore a given set of errors |
| 115 | + void Ignore(boost::shared_ptr<std::set<int> > s) { |
| 116 | + ignoreErrors.push_back(s); |
| 117 | + } |
| 118 | + |
| 119 | + // Ignore all errors |
| 120 | + void IgnoreAll() { |
| 121 | + ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL)); |
| 122 | + } |
| 123 | + |
| 124 | + // Restore the previous ignore set |
| 125 | + void PopIgnore() { |
| 126 | + ignoreErrors.pop_back(); |
| 127 | + } |
| 128 | + |
| 129 | + void RaiseError(const char* msg); |
| 130 | + |
| 131 | +protected: |
| 132 | + int fd; |
| 133 | + bool good; |
| 134 | + bool ownFd; |
| 135 | + std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors; |
| 136 | +}; |
| 137 | +#endif |
Property changes on: trunk/udplog/srclib/FileDescriptor.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 138 | + native |
Index: trunk/udplog/srclib/PosixFile.h |
— | — | @@ -0,0 +1,17 @@ |
| 2 | +#ifndef UDPLOG_POSIXFILE_H |
| 3 | +#define UDPLOG_POSIXFILE_H |
| 4 | + |
| 5 | +class PosixFile : public FileDescriptor |
| 6 | +{ |
| 7 | +public: |
| 8 | + PosixFile(const char* pathname, int flags, int mode = 0) { |
| 9 | + fd = open(pathname, flags, mode); |
| 10 | + if (fd == -1) { |
| 11 | + RaiseError("PosixFile constructor"); |
| 12 | + } else { |
| 13 | + good = true; |
| 14 | + } |
| 15 | + } |
| 16 | +}; |
| 17 | + |
| 18 | +#endif |
Property changes on: trunk/udplog/srclib/PosixFile.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 19 | + native |
Index: trunk/udplog/srclib/Socket.cpp |
— | — | @@ -1,24 +1,5 @@ |
2 | 2 | #include "Socket.h" |
3 | 3 | |
4 | | -void Socket::RaiseError(const char* msg) |
5 | | -{ |
6 | | - if (ignoreErrors.size() == 0) { |
7 | | - // Ignore none |
8 | | - throw libc_error(msg); |
9 | | - } |
10 | | - |
11 | | - std::set<int> * curIgnore = ignoreErrors.back().get(); |
12 | | - if (!curIgnore) { |
13 | | - // Ignore all |
14 | | - return; |
15 | | - } |
16 | | - |
17 | | - if (!curIgnore->count(errno)) { |
18 | | - // Don't ignore this one |
19 | | - throw libc_error(msg); |
20 | | - } |
21 | | -} |
22 | | - |
23 | 4 | int UDPSocket::JoinMulticast(const IPAddress & addr) { |
24 | 5 | int level, optname; |
25 | 6 | void *optval; |
Index: trunk/udplog/srclib/Socket.h |
— | — | @@ -1,34 +1,22 @@ |
2 | 2 | #ifndef SOCKET_H______ |
3 | 3 | #define SOCKET_H______ |
4 | 4 | |
5 | | -#include <set> |
6 | | -#include <vector> |
7 | | -#include <boost/shared_ptr.hpp> |
8 | 5 | #include <sys/socket.h> |
9 | | -#include <cerrno> |
| 6 | +#include "FileDescriptor.h" |
10 | 7 | #include "SocketAddress.h" |
11 | 8 | #include "Exception.h" |
12 | 9 | |
13 | | - |
14 | 10 | // Socket base class |
15 | 11 | // Do not instantiate this directly |
16 | | -class Socket |
| 12 | +class Socket : public FileDescriptor |
17 | 13 | { |
18 | | -private: |
19 | | - // Not default constructible |
20 | | - Socket(){} |
21 | | - |
22 | | - // Not copyable |
23 | | - Socket(const Socket & s) {} |
24 | | - |
25 | 14 | protected: |
26 | 15 | Socket(int domain, int type, int protocol) |
27 | | - : fd(-1) |
| 16 | + : FileDescriptor() |
28 | 17 | { |
29 | 18 | fd = socket(domain, type, protocol); |
30 | 19 | if (fd == -1) { |
31 | 20 | RaiseError("Socket constructor"); |
32 | | - good = false; |
33 | 21 | } else { |
34 | 22 | good = true; |
35 | 23 | } |
— | — | @@ -36,10 +24,6 @@ |
37 | 25 | |
38 | 26 | public: |
39 | 27 | |
40 | | - operator bool() { |
41 | | - return good; |
42 | | - } |
43 | | - |
44 | 28 | int Connect(SocketAddress & s) { |
45 | 29 | if (connect(fd, s.GetBinaryData(), s.GetBinaryLength()) < 0) { |
46 | 30 | RaiseError("Socket::Connect"); |
— | — | @@ -122,28 +106,8 @@ |
123 | 107 | to = SocketAddress::NewFromBuffer(); |
124 | 108 | return length; |
125 | 109 | } |
126 | | - |
127 | | - // Ignore a given set of errors |
128 | | - void Ignore(boost::shared_ptr<std::set<int> > s) { |
129 | | - ignoreErrors.push_back(s); |
130 | | - } |
131 | | - |
132 | | - // Ignore all errors |
133 | | - void IgnoreAll() { |
134 | | - ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL)); |
135 | | - } |
136 | | - |
137 | | - // Restore the previous ignore set |
138 | | - void PopIgnore() { |
139 | | - ignoreErrors.pop_back(); |
140 | | - } |
141 | | - |
142 | | - void RaiseError(const char* msg); |
143 | 110 | protected: |
144 | | - int fd; |
145 | 111 | boost::shared_ptr<SocketAddress> peer; |
146 | | - bool good; |
147 | | - std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors; |
148 | 112 | }; |
149 | 113 | |
150 | 114 | class UDPSocket : public Socket |
— | — | @@ -162,9 +126,6 @@ |
163 | 127 | } |
164 | 128 | |
165 | 129 | int JoinMulticast(const IPAddress & addr); |
166 | | -private: |
167 | | - int JoinMulticast4(const IPAddress & addr); |
168 | | - int JoinMulticast6(const IPAddress & addr); |
169 | 130 | }; |
170 | 131 | |
171 | 132 | #endif |