Index: trunk/udplog/udp2log/Block.h |
— | — | @@ -0,0 +1,178 @@ |
| 2 | +#ifndef UDPLOG_BLOCK_H |
| 3 | +#define UDPLOG_BLOCK_H |
| 4 | + |
| 5 | +#include <stdexcept> |
| 6 | +#include <boost/pool/pool.hpp> |
| 7 | +#include <boost/pool/object_pool.hpp> |
| 8 | + |
| 9 | +class BlockPool; |
| 10 | +class BlockMemory; |
| 11 | + |
| 12 | +class Block { |
| 13 | +public: |
| 14 | + // Can't put the function definition here because BlockMemory is incomplete |
| 15 | + inline Block(BlockMemory * mem_); |
| 16 | + inline Block(const Block & b); |
| 17 | + inline ~Block(); |
| 18 | + inline void Destroy(); |
| 19 | + inline Block & operator=(const Block & b); |
| 20 | + |
| 21 | + // Return a new block pointing to the same memory, which is a substring of |
| 22 | + // *this, with a start position offset by the specified amount relative to |
| 23 | + // the start position of *this. |
| 24 | + inline Block Offset(size_t newOffset) const; |
| 25 | + |
| 26 | + inline char * GetData(); |
| 27 | + inline const char * GetData() const; |
| 28 | + inline size_t GetCapacity() const; |
| 29 | + inline size_t GetSize() const; |
| 30 | + inline void Append(const char * appendData, size_t appendSize); |
| 31 | + |
| 32 | + BlockMemory * mem; |
| 33 | + size_t offset; |
| 34 | + size_t size; |
| 35 | +}; |
| 36 | + |
| 37 | +class BlockMemory { |
| 38 | +public: |
| 39 | + inline BlockMemory(BlockPool * pool_, char * data_, size_t size_); |
| 40 | + inline ~BlockMemory(); |
| 41 | + inline Block GetReference(); |
| 42 | + |
| 43 | + BlockPool * pool; |
| 44 | + char * data; |
| 45 | + size_t size; |
| 46 | + volatile int numRefs; |
| 47 | +}; |
| 48 | + |
| 49 | + |
| 50 | +class BlockPool { |
| 51 | +public: |
| 52 | + |
| 53 | + BlockPool(size_t blockSize_) |
| 54 | + : dataPool(blockSize_), blockSize(blockSize_), emptyBlock(New()) |
| 55 | + {} |
| 56 | + |
| 57 | + Block New() { |
| 58 | + char * data = (char*)dataPool.malloc(); |
| 59 | + if (!data) { |
| 60 | + throw std::bad_alloc(); |
| 61 | + } |
| 62 | + BlockMemory * mem = objectPool.construct(this, data, blockSize); |
| 63 | + if (!mem) { |
| 64 | + throw std::bad_alloc(); |
| 65 | + } |
| 66 | + return mem->GetReference(); |
| 67 | + } |
| 68 | + |
| 69 | + Block GetEmptyBlock() { |
| 70 | + return emptyBlock; |
| 71 | + } |
| 72 | + |
| 73 | + void DeleteMemory(BlockMemory * mem) { |
| 74 | + objectPool.destroy(mem); |
| 75 | + } |
| 76 | + |
| 77 | + void DeleteData(char * data) { |
| 78 | + dataPool.free(data); |
| 79 | + } |
| 80 | + |
| 81 | +protected: |
| 82 | + boost::pool<> dataPool; |
| 83 | + boost::object_pool<BlockMemory> objectPool; |
| 84 | + size_t blockSize; |
| 85 | + Block emptyBlock; |
| 86 | +}; |
| 87 | + |
| 88 | +//----------------------------------------------------------------------------- |
| 89 | +// Block |
| 90 | +//----------------------------------------------------------------------------- |
| 91 | + |
| 92 | +Block::Block(BlockMemory * mem_) |
| 93 | + : mem(mem_), offset(0), size(0) |
| 94 | +{ |
| 95 | + mem->numRefs++; |
| 96 | +} |
| 97 | + |
| 98 | +Block::Block(const Block & b) |
| 99 | + : mem(b.mem), offset(b.offset), size(b.size) |
| 100 | +{ |
| 101 | + mem->numRefs++; |
| 102 | +} |
| 103 | + |
| 104 | +Block::~Block() { |
| 105 | + Destroy(); |
| 106 | +} |
| 107 | + |
| 108 | +void Block::Destroy() { |
| 109 | + mem->numRefs--; |
| 110 | + if (mem->numRefs <= 0) { |
| 111 | + mem->pool->DeleteMemory(mem); |
| 112 | + mem = NULL; |
| 113 | + } |
| 114 | +} |
| 115 | + |
| 116 | +Block & Block::operator=(const Block & b) { |
| 117 | + Destroy(); |
| 118 | + mem = b.mem; |
| 119 | + offset = b.offset; |
| 120 | + size = b.size; |
| 121 | + mem->numRefs++; |
| 122 | + return *this; |
| 123 | +} |
| 124 | + |
| 125 | +Block Block::Offset(size_t newOffset) const { |
| 126 | + if (offset + newOffset > size) { |
| 127 | + throw std::runtime_error("Block::Offset buffer overrun"); |
| 128 | + } |
| 129 | + Block b(*this); |
| 130 | + b.offset += newOffset; |
| 131 | + b.size -= newOffset; |
| 132 | + return b; |
| 133 | +} |
| 134 | + |
| 135 | +char * Block::GetData() { |
| 136 | + return mem->data + offset; |
| 137 | +} |
| 138 | + |
| 139 | +const char * Block::GetData() const { |
| 140 | + return mem->data + offset; |
| 141 | +} |
| 142 | + |
| 143 | +size_t Block::GetCapacity() const { |
| 144 | + if (offset > mem->size) { |
| 145 | + return 0; |
| 146 | + } else { |
| 147 | + return mem->size - offset; |
| 148 | + } |
| 149 | +} |
| 150 | + |
| 151 | +size_t Block::GetSize() const { |
| 152 | + return size; |
| 153 | +} |
| 154 | + |
| 155 | +void Block::Append(const char * appendData, size_t appendSize) { |
| 156 | + if (GetCapacity() < size + appendSize) { |
| 157 | + throw std::runtime_error("Block::Append buffer overrun"); |
| 158 | + } |
| 159 | + memcpy(GetData() + size, appendData, appendSize); |
| 160 | + size += appendSize; |
| 161 | +} |
| 162 | + |
| 163 | +//----------------------------------------------------------------------------- |
| 164 | +// BlockMemory |
| 165 | +//----------------------------------------------------------------------------- |
| 166 | +BlockMemory::BlockMemory(BlockPool * pool_, char * data_, size_t size_) |
| 167 | + : pool(pool_), data(data_), size(size_), numRefs(0) |
| 168 | +{} |
| 169 | + |
| 170 | +BlockMemory::~BlockMemory() { |
| 171 | + pool->DeleteData(data); |
| 172 | + data = NULL; |
| 173 | +} |
| 174 | + |
| 175 | +Block BlockMemory::GetReference() { |
| 176 | + return Block(this); |
| 177 | +} |
| 178 | + |
| 179 | +#endif |
Property changes on: trunk/udplog/udp2log/Block.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 180 | + native |
Index: trunk/udplog/udp2log/LogProcessor.cpp |
— | — | @@ -4,14 +4,38 @@ |
5 | 5 | #include <linux/limits.h> |
6 | 6 | #include <sys/types.h> |
7 | 7 | #include <sys/wait.h> |
| 8 | +#include <sys/prctl.h> |
8 | 9 | #include "LogProcessor.h" |
9 | 10 | #include "Udp2LogConfig.h" |
10 | 11 | |
11 | 12 | //--------------------------------------------------------------------------- |
| 13 | +// LogProcessor |
| 14 | +//--------------------------------------------------------------------------- |
| 15 | +bool LogProcessor::IsActive(const PosixClock::Time & currentTime) { |
| 16 | + if (!IsOpen()) { |
| 17 | + return false; |
| 18 | + } |
| 19 | + if (currentTime > holidayEndTime) { |
| 20 | + return true; |
| 21 | + } else { |
| 22 | + // On holiday |
| 23 | + return false; |
| 24 | + } |
| 25 | +} |
| 26 | + |
| 27 | +void LogProcessor::IncrementBytesLost(size_t bytes) { |
| 28 | + if (bytes) { |
| 29 | + bytesLost += bytes; |
| 30 | + lossRate.Increment(bytes); |
| 31 | + } |
| 32 | +} |
| 33 | + |
| 34 | +//--------------------------------------------------------------------------- |
12 | 35 | // FileProcessor |
13 | 36 | //--------------------------------------------------------------------------- |
14 | 37 | |
15 | | -LogProcessor * FileProcessor::NewFromConfig(char * params, bool flush) |
| 38 | +boost::shared_ptr<LogProcessor> FileProcessor::NewFromConfig( |
| 39 | + Udp2LogConfig & config, int index, char * params, bool flush) |
16 | 40 | { |
17 | 41 | char * strFactor = strtok(params, " \t"); |
18 | 42 | if (strFactor == NULL) { |
— | — | @@ -26,37 +50,41 @@ |
27 | 51 | ); |
28 | 52 | } |
29 | 53 | char * filename = strtok(NULL, ""); |
30 | | - FileProcessor * fp = new FileProcessor(filename, factor, flush); |
| 54 | + FileProcessor * fp = new FileProcessor(config, index, filename, factor, flush); |
31 | 55 | if (!fp->IsOpen()) { |
32 | 56 | delete fp; |
33 | 57 | throw ConfigError("Unable to open file"); |
34 | 58 | } |
35 | 59 | std::cerr << "Opened log file " << filename << " with sampling factor " << factor << std::endl; |
36 | | - return (LogProcessor*)fp; |
| 60 | + return boost::shared_ptr<LogProcessor>(fp); |
37 | 61 | } |
38 | 62 | |
39 | | -void FileProcessor::ProcessLine(const char *buffer, size_t size) |
| 63 | +ssize_t FileProcessor::Write(const char *buffer, size_t size) |
40 | 64 | { |
41 | | - if (Sample()) { |
| 65 | + if (IsActive(config.GetCurrentTime())) { |
42 | 66 | f.write(buffer, size); |
43 | 67 | if (flush) { |
44 | 68 | f.flush(); |
45 | 69 | } |
46 | 70 | } |
| 71 | + return (ssize_t)size; |
47 | 72 | } |
48 | 73 | |
49 | 74 | //--------------------------------------------------------------------------- |
50 | 75 | // PipeProcessor |
51 | 76 | //--------------------------------------------------------------------------- |
52 | 77 | |
53 | | -PipeProcessor::PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_) |
54 | | - : LogProcessor(factor_, flush_), child(0), blocking(blocking_) |
| 78 | +PipeProcessor::PipeProcessor(Udp2LogConfig & config_, int index_, |
| 79 | + char * command_, int factor_, bool flush_, bool blocking_) |
| 80 | + : LogProcessor(config_, index_, factor_, flush_), |
| 81 | + child(0), blocking(blocking_) |
55 | 82 | { |
56 | 83 | command = command_; |
57 | 84 | Open(); |
58 | 85 | } |
59 | 86 | |
60 | | -LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush, bool blocking) |
| 87 | +boost::shared_ptr<LogProcessor> PipeProcessor::NewFromConfig( |
| 88 | + Udp2LogConfig & config, int index, char * params, bool flush, bool blocking) |
61 | 89 | { |
62 | 90 | char * strFactor = strtok(params, " \t"); |
63 | 91 | if (strFactor == NULL) { |
— | — | @@ -71,20 +99,19 @@ |
72 | 100 | ); |
73 | 101 | } |
74 | 102 | char * command = strtok(NULL, ""); |
75 | | - PipeProcessor * pp = new PipeProcessor(command, factor, flush, blocking); |
| 103 | + PipeProcessor * pp = new PipeProcessor(config, index, command, factor, flush, blocking); |
76 | 104 | if (!pp->IsOpen()) { |
77 | 105 | delete pp; |
78 | 106 | throw ConfigError("Unable to open pipe"); |
79 | 107 | } |
80 | 108 | std::cerr << "Opened pipe with factor " << factor << ": " << command << std::endl; |
81 | | - return (LogProcessor*)pp; |
| 109 | + return boost::shared_ptr<LogProcessor>(pp); |
82 | 110 | } |
83 | 111 | |
84 | | -void PipeProcessor::HandleError(libc_error & e) |
| 112 | +void PipeProcessor::HandleError(libc_error & e, size_t bytes) |
85 | 113 | { |
86 | 114 | bool restart; |
87 | 115 | if (e.code == EAGAIN) { |
88 | | - numLost++; |
89 | 116 | restart = false; |
90 | 117 | } else if (e.code == EPIPE) { |
91 | 118 | std::cerr << "Pipe terminated, suspending output: " << command << std::endl; |
— | — | @@ -100,27 +127,23 @@ |
101 | 128 | } |
102 | 129 | } |
103 | 130 | |
104 | | -void PipeProcessor::ProcessLine(const char *buffer, size_t size) |
| 131 | +ssize_t PipeProcessor::Write(const char *buffer, size_t size) |
105 | 132 | { |
106 | | - if (!child) { |
107 | | - return; |
| 133 | + if (!IsActive(config.GetCurrentTime())) { |
| 134 | + IncrementBytesLost(size); |
| 135 | + return size; |
108 | 136 | } |
109 | 137 | |
| 138 | + ssize_t bytesWritten = 0; |
110 | 139 | |
111 | | - if (Sample()) { |
112 | | - try { |
113 | | - if (blocking && size > PIPE_BUF) { |
114 | | - // Write large packets in blocking mode to preserve data integrity |
115 | | - GetPipe().SetStatusFlags(0); |
116 | | - GetPipe().Write(buffer, size); |
117 | | - GetPipe().SetStatusFlags(O_NONBLOCK); |
118 | | - } else { |
119 | | - GetPipe().Write(buffer, size); |
120 | | - } |
121 | | - } catch (libc_error & e) { |
122 | | - HandleError(e); |
123 | | - } |
| 140 | + try { |
| 141 | + bytesWritten = GetPipe().Write(buffer, size); |
| 142 | + } catch (libc_error & e) { |
| 143 | + bytesWritten = 0; |
| 144 | + HandleError(e, size); |
124 | 145 | } |
| 146 | + IncrementBytesLost(size - bytesWritten); |
| 147 | + return bytesWritten; |
125 | 148 | } |
126 | 149 | |
127 | 150 | void PipeProcessor::FixIfBroken() |
— | — | @@ -141,8 +164,10 @@ |
142 | 165 | { |
143 | 166 | if (child) { |
144 | 167 | int status = 0; |
| 168 | + // Send HUP signal |
| 169 | + GetPipe().Close(); |
| 170 | + // Wait for it to respond |
145 | 171 | waitpid(child, &status, 0); |
146 | | - GetPipe().Close(); |
147 | 172 | child = 0; |
148 | 173 | } |
149 | 174 | } |
— | — | @@ -156,6 +181,7 @@ |
157 | 182 | child = fork(); |
158 | 183 | if (!child) { |
159 | 184 | // This is the child process |
| 185 | + prctl(PR_SET_PDEATHSIG, SIGTERM, 0, 0, 0); |
160 | 186 | pipes->writeEnd.Close(); |
161 | 187 | pipes->readEnd.Dup2(STDIN_FILENO); |
162 | 188 | pipes->readEnd.Close(); |
— | — | @@ -180,27 +206,27 @@ |
181 | 207 | } |
182 | 208 | } |
183 | 209 | |
184 | | -void PipeProcessor::CopyFromPipe(Pipe & source, size_t dataLength) |
| 210 | +ssize_t PipeProcessor::CopyFromPipe(Pipe & source, size_t size) |
185 | 211 | { |
186 | | - if (!child) { |
187 | | - return; |
| 212 | + if (!IsActive(config.GetCurrentTime())) { |
| 213 | + IncrementBytesLost(size); |
| 214 | + return size; |
188 | 215 | } |
189 | 216 | |
| 217 | + ssize_t bytesWritten = 0; |
190 | 218 | int flags = 0; |
191 | 219 | if (!blocking) { |
192 | 220 | flags |= SPLICE_F_NONBLOCK; |
193 | 221 | } |
194 | 222 | try { |
195 | | - if (blocking && dataLength > PIPE_BUF) { |
196 | | - // Write large packets in blocking mode to preserve data integrity |
197 | | - GetPipe().SetStatusFlags(0); |
198 | | - source.Tee(GetPipe(), dataLength, 0); |
199 | | - GetPipe().SetStatusFlags(O_NONBLOCK); |
200 | | - } else { |
201 | | - source.Tee(GetPipe(), dataLength, flags); |
202 | | - } |
| 223 | + bytesWritten = source.Tee(GetPipe(), size, flags); |
203 | 224 | } catch (libc_error & e) { |
204 | | - HandleError(e); |
| 225 | + bytesWritten = 0; |
| 226 | + HandleError(e, size); |
205 | 227 | } |
| 228 | + IncrementBytesLost(size - bytesWritten); |
| 229 | + return bytesWritten; |
206 | 230 | } |
207 | 231 | |
| 232 | + |
| 233 | + |
Index: trunk/udplog/udp2log/Udp2LogConfig.cpp |
— | — | @@ -1,14 +1,59 @@ |
2 | 2 | #include <fstream> |
3 | 3 | #include <cstring> |
4 | 4 | #include <sstream> |
| 5 | +#include <time.h> |
| 6 | +#include <cmath> |
5 | 7 | |
6 | 8 | #include "Udp2LogConfig.h" |
7 | 9 | |
8 | 10 | PosixFile Udp2LogConfig::devNull("/dev/null", O_WRONLY); |
9 | 11 | |
| 12 | +// UpdateCounters() should be called once every period of this length |
| 13 | +const double Udp2LogConfig::UPDATE_PERIOD = 1; |
| 14 | + |
| 15 | +// The packet rate counter uses an exponentially-weighted moving average filter. |
| 16 | +// This is the number of seconds corresponding to half of the contribution to |
| 17 | +// the result. |
| 18 | +const double Udp2LogConfig::RATE_PERIOD = 60; |
| 19 | + |
| 20 | +// Minimum percentage loss in a pipe before it can be considered to be a |
| 21 | +// measure of pipe capacity. Low loss implies that the maximum rate is |
| 22 | +// unknown, not that it equals the input rate. |
| 23 | +const double Udp2LogConfig::MIN_LOSS_RATIO = 0.05; |
| 24 | + |
| 25 | +// Minimum time a pipe can be put on holiday. This is used when a reliable |
| 26 | +// estimate of the pipe capacity is not available. |
| 27 | +const PosixClock::Time Udp2LogConfig::MIN_HOLIDAY_TIME(100e-6); |
| 28 | + |
| 29 | +// Maximum time a pipe can be put on holiday. The point of holiday is to avoid |
| 30 | +// regular backlog write operations, which adversely affect the global |
| 31 | +// throughput due to high CPU overhead. This rationale only makes sense as long |
| 32 | +// as the holiday time is only a few times higher than the write overhead. |
| 33 | +const PosixClock::Time Udp2LogConfig::MAX_HOLIDAY_TIME(1e-3); |
| 34 | + |
| 35 | +// The maximum input block size. On Linux 2.6.11 and later, this can be up |
| 36 | +// to 64KB. If it was more than 64KB, the temporary pipe buffer used for |
| 37 | +// tee() operations would overflow. In Linux 2.6.35 and later, |
| 38 | +// fcntl(F_SETPIPE_SZ) can be used to set the size of the temporary pipe |
| 39 | +// buffer, but currently we're not deploying to that version of Linux, so |
| 40 | +// it's unimplemented. |
| 41 | +const size_t Udp2LogConfig::BLOCK_SIZE = 65536; |
| 42 | + |
10 | 43 | Udp2LogConfig::Udp2LogConfig() |
11 | | - : reload(false) |
12 | | -{} |
| 44 | + : reload(false), |
| 45 | + pool(BLOCK_SIZE), |
| 46 | + packetRate(UPDATE_PERIOD, RATE_PERIOD), |
| 47 | + processRate(UPDATE_PERIOD, RATE_PERIOD), |
| 48 | + currentTimeValid(false), |
| 49 | + clock(CLOCK_REALTIME), |
| 50 | + numPipeProcessors(0), |
| 51 | + lineIndex(0) |
| 52 | +{ |
| 53 | + // Make the buffer pipe non-blocking |
| 54 | + // Throwing an exception and aborting is better than hanging forever |
| 55 | + bufferPipe.writeEnd.SetStatusFlags(O_NONBLOCK); |
| 56 | + bufferPipe.readEnd.SetStatusFlags(O_NONBLOCK); |
| 57 | +} |
13 | 58 | |
14 | 59 | void Udp2LogConfig::Open(const std::string & name) |
15 | 60 | { |
— | — | @@ -23,7 +68,7 @@ |
24 | 69 | char line[maxLineSize]; |
25 | 70 | char * type; |
26 | 71 | char * params; |
27 | | - boost::ptr_vector<LogProcessor> newProcessors; |
| 72 | + ProcessorArray newProcessors; |
28 | 73 | |
29 | 74 | ifstream f(fileName.c_str()); |
30 | 75 | if (!f.good()) { |
— | — | @@ -31,6 +76,7 @@ |
32 | 77 | } |
33 | 78 | |
34 | 79 | int lineNum = 1; |
| 80 | + int index = 0; |
35 | 81 | try { |
36 | 82 | // Parse all lines |
37 | 83 | for (f.getline(line, maxLineSize); f.good(); f.getline(line, maxLineSize), lineNum++) { |
— | — | @@ -43,7 +89,7 @@ |
44 | 90 | } |
45 | 91 | |
46 | 92 | params = strtok(NULL, ""); |
47 | | - LogProcessor * processor = NULL; |
| 93 | + ProcessorPointer processor; |
48 | 94 | bool flush = false, blocking = false; |
49 | 95 | |
50 | 96 | if (!strcmp(type, "flush")) { |
— | — | @@ -59,9 +105,9 @@ |
60 | 106 | |
61 | 107 | if (!strcmp(type, "file")) { |
62 | 108 | // TODO: support blocking for FileProcessor |
63 | | - processor = FileProcessor::NewFromConfig(params, flush); |
| 109 | + processor = FileProcessor::NewFromConfig(*this, index++, params, flush); |
64 | 110 | } else if (!strcmp(type, "pipe")) { |
65 | | - processor = PipeProcessor::NewFromConfig(params, flush, blocking); |
| 111 | + processor = PipeProcessor::NewFromConfig(*this, index++, params, flush, blocking); |
66 | 112 | } else { |
67 | 113 | throw ConfigError("Unrecognised log type"); |
68 | 114 | } |
— | — | @@ -71,21 +117,48 @@ |
72 | 118 | } |
73 | 119 | } |
74 | 120 | |
| 121 | + // Make a new epoll instance, deleting the old one |
| 122 | + epoll = MyEpollPointer(new MyEpoll); |
| 123 | + |
75 | 124 | // Swap in the new configuration |
76 | 125 | // The old configuration will go out of scope, closing files and pipes |
77 | 126 | processors.swap(newProcessors); |
| 127 | + |
78 | 128 | } catch (ConfigError & e) { |
79 | 129 | stringstream s; |
80 | 130 | s << "Error in configuration file on line " << lineNum << ": " << e.what(); |
81 | 131 | throw runtime_error(s.str().c_str()); |
82 | 132 | } |
| 133 | + |
| 134 | + // Initialise epoll and processorsByFactor |
| 135 | + processorsByFactor.clear(); |
| 136 | + numPipeProcessors = 0; |
| 137 | + for (unsigned i = 0; i < processors.size(); i++) { |
| 138 | + ProcessorPointer p = processors[i]; |
| 139 | + if (p->IsUnsampledPipe()) { |
| 140 | + AddUnsampledPipe(p); |
| 141 | + } |
| 142 | + processorsByFactor[p->GetFactor()].push_back(i); |
| 143 | + } |
| 144 | + |
| 145 | + epollEvents.resize(processors.size()); |
83 | 146 | } |
84 | 147 | |
| 148 | +void Udp2LogConfig::AddUnsampledPipe(ProcessorPointer lp) { |
| 149 | + boost::shared_ptr<PipeProcessor> p = |
| 150 | + boost::dynamic_pointer_cast<PipeProcessor, LogProcessor>(lp); |
| 151 | + if (!p) { |
| 152 | + throw std::runtime_error( |
| 153 | + "Udp2LogConfig::AddUnsampledPipe: only works on PipeProcessor for now"); |
| 154 | + } |
| 155 | + epoll->AddPoll(p->GetPipe(), EPOLLOUT, p); |
| 156 | + numPipeProcessors++; |
| 157 | +} |
| 158 | + |
85 | 159 | void Udp2LogConfig::FixBrokenProcessors() |
86 | 160 | { |
87 | | - boost::ptr_vector<LogProcessor>::iterator i; |
88 | | - for (i = processors.begin(); i != processors.end(); i++) { |
89 | | - i->FixIfBroken(); |
| 161 | + for (ProcessorIterator i = processors.begin(); i != processors.end(); i++) { |
| 162 | + (**i).FixIfBroken(); |
90 | 163 | } |
91 | 164 | } |
92 | 165 | |
— | — | @@ -101,42 +174,43 @@ |
102 | 175 | } |
103 | 176 | } |
104 | 177 | |
105 | | -void Udp2LogConfig::ProcessBuffer(const char *data, size_t dataLength) |
| 178 | +void Udp2LogConfig::ProcessBlock(const Block & block) |
106 | 179 | { |
107 | | - boost::ptr_vector<LogProcessor>::iterator iter; |
| 180 | + ResetCurrentTime(); |
| 181 | + PosixClock::Time currentTime = GetCurrentTime(); |
| 182 | + |
| 183 | + // Handle any pending backlog buffers, from previous failed operations |
| 184 | + HandleBacklogs(); |
| 185 | + |
| 186 | + // Do an epoll to make sure all pipes are ready for input |
| 187 | + // If any aren't ready, put them on holiday |
| 188 | + CheckReadiness(); |
| 189 | + |
| 190 | + // Identify the pipes that can be written to with tee() |
| 191 | + ProcessorIterator iter; |
| 192 | + teePipes.assign(processors.size(), false); |
| 193 | + |
108 | 194 | int numPipes = 0; |
109 | | - |
110 | 195 | for (iter = processors.begin(); iter != processors.end(); iter++) { |
111 | | - if (iter->IsUnsampledPipe() && iter->IsOpen()) { |
| 196 | + if ((**iter).IsUnsampledPipe() && (**iter).IsActive(currentTime)) { |
| 197 | + teePipes.at(iter - processors.begin()) = true; |
112 | 198 | numPipes++; |
113 | 199 | } |
114 | 200 | } |
115 | 201 | |
| 202 | + // Handle the tee writes if there are enough to make it worthwhile |
| 203 | + bool teeDone = false; |
116 | 204 | if (numPipes >= 2) { |
117 | | - // Efficiently send the packet to the unsampled pipes. |
118 | | - // First load the data into a pipe buffer. |
119 | | - // It just so happens that the size of the pipe buffer is the same |
120 | | - // as the maximum size of a UDP packet, so this won't block. |
121 | | - bufferPipe.writeEnd.Write(data, dataLength); |
122 | | - |
123 | | - // Now send it out to all unsampled pipes using the zero-copy tee() syscall. |
124 | | - for (iter = processors.begin(); iter != processors.end(); iter++) { |
125 | | - if (iter->IsUnsampledPipe() && iter->IsOpen()) { |
126 | | - PipeProcessor & p = dynamic_cast<PipeProcessor&>(*iter); |
127 | | - p.CopyFromPipe(bufferPipe.readEnd, dataLength); |
128 | | - } |
129 | | - } |
130 | | - |
131 | | - // Finally flush the pipe buffer by splicing it out to /dev/null |
132 | | - bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, dataLength, 0); |
| 205 | + HandleTeePipes(block); |
| 206 | + teeDone = true; |
133 | 207 | } |
134 | 208 | |
135 | 209 | // For the sampled processors, split the buffer into lines |
136 | 210 | const char *line1, *line2; |
137 | 211 | ssize_t bytesRemaining; |
138 | 212 | |
139 | | - line1 = data; |
140 | | - bytesRemaining = dataLength; |
| 213 | + line1 = block.GetData(); |
| 214 | + bytesRemaining = block.GetSize(); |
141 | 215 | while (bytesRemaining) { |
142 | 216 | size_t lineLength; |
143 | 217 | |
— | — | @@ -151,25 +225,156 @@ |
152 | 226 | lineLength = bytesRemaining; |
153 | 227 | } |
154 | 228 | |
155 | | - for (iter = processors.begin(); iter != processors.end(); iter++) { |
156 | | - if (numPipes >= 2 && iter->IsUnsampledPipe()) { |
| 229 | + for (ByFactorIterator bfi = processorsByFactor.begin(); |
| 230 | + bfi != processorsByFactor.end(); bfi++) |
| 231 | + { |
| 232 | + if (lineIndex % bfi->first != 0) { |
157 | 233 | continue; |
158 | 234 | } |
| 235 | + for (std::vector<int>::iterator bfi2 = bfi->second.begin(); |
| 236 | + bfi2 != bfi->second.end(); bfi2++) |
| 237 | + { |
| 238 | + if (teeDone && teePipes.at(*bfi2)) { |
| 239 | + continue; |
| 240 | + } |
| 241 | + LogProcessor & p = *processors[*bfi2]; |
| 242 | + ssize_t bytesWritten = p.Write(line1, lineLength); |
| 243 | + if (bytesWritten != (ssize_t)lineLength) { |
| 244 | + // Partial write done |
| 245 | + // Make a backlog entry |
| 246 | + Block block = pool.New(); |
| 247 | + block.Append(line1 + bytesWritten, lineLength - bytesWritten); |
| 248 | + p.SetBacklog(block); |
| 249 | + PutOnHoliday(*processors[*bfi2]); |
| 250 | + } |
| 251 | + } |
| 252 | + } |
159 | 253 | |
160 | | - iter->ProcessLine(line1, lineLength); |
161 | | - } |
162 | 254 | bytesRemaining -= lineLength; |
163 | 255 | line1 = line2; |
| 256 | + lineIndex++; |
164 | 257 | } |
165 | 258 | } |
166 | 259 | |
167 | | -void Udp2LogConfig::PrintLossReport(std::ostream & os) |
| 260 | +void Udp2LogConfig::HandleBacklogs() { |
| 261 | + for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) { |
| 262 | + if (!(**iter).IsBacklogged()) { |
| 263 | + continue; |
| 264 | + } |
| 265 | + Block & backlogBlock = (**iter).GetBacklogBlock(); |
| 266 | + ssize_t bytesWritten = (**iter).Write(backlogBlock.GetData(), backlogBlock.GetSize()); |
| 267 | + if (bytesWritten == (ssize_t)backlogBlock.GetSize()) { |
| 268 | + (**iter).ClearBacklog(); |
| 269 | + } else { |
| 270 | + PutOnHoliday(**iter); |
| 271 | + if (bytesWritten != 0) { |
| 272 | + (**iter).SetBacklog(backlogBlock.Offset(bytesWritten)); |
| 273 | + } |
| 274 | + } |
| 275 | + } |
| 276 | +} |
| 277 | + |
| 278 | +void Udp2LogConfig::CheckReadiness() { |
| 279 | + epoll->EpollWait(&epollEvents, 0); |
| 280 | + if (epollEvents.size() != numPipeProcessors) { |
| 281 | + readyPipes.assign(numPipeProcessors, false); |
| 282 | + for (MyEventsIterator iter = epollEvents.begin(); iter != epollEvents.end(); iter++) { |
| 283 | + if (iter->first & EPOLLOUT) { |
| 284 | + readyPipes.at(iter->second->GetProcessorIndex()) = true; |
| 285 | + } |
| 286 | + } |
| 287 | + |
| 288 | + std::vector<bool>::iterator bi = readyPipes.begin(); |
| 289 | + //std::cout << "Check readiness\n"; |
| 290 | + while (readyPipes.end() != (bi = find(bi, readyPipes.end(), false))) { |
| 291 | + //std::cout << "Not ready: " << bi - readyPipes.begin() << "\n"; |
| 292 | + PutOnHoliday(*processors.at(bi - readyPipes.begin())); |
| 293 | + bi++; |
| 294 | + } |
| 295 | + } |
| 296 | +} |
| 297 | + |
| 298 | +void Udp2LogConfig::HandleTeePipes(const Block & block) { |
| 299 | + // Efficiently send the packet to the unsampled pipes. |
| 300 | + // First load the data into a pipe buffer. |
| 301 | + bufferPipe.writeEnd.Write(block.GetData(), block.GetSize()); |
| 302 | + |
| 303 | + // Now send it out to all unsampled pipes using the zero-copy tee() syscall. |
| 304 | + for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) { |
| 305 | + if (!teePipes.at(iter - processors.begin())) { |
| 306 | + continue; |
| 307 | + } |
| 308 | + PipeProcessor * p = dynamic_cast<PipeProcessor*>(iter->get()); |
| 309 | + ssize_t bytesWritten = p->CopyFromPipe(bufferPipe.readEnd, block.GetSize()); |
| 310 | + if (bytesWritten != (ssize_t)block.GetSize()) { |
| 311 | + //std::cout << "Processor[" << (iter - processors.begin()) << "]: teed " << |
| 312 | + // bytesWritten << "/" << (ssize_t)block.GetSize() << "\n"; |
| 313 | + // Partial write done, make a backlog entry and put the |
| 314 | + // processor on an input holiday as punishment |
| 315 | + p->SetBacklog(block.Offset(bytesWritten)); |
| 316 | + PutOnHoliday(**iter); |
| 317 | + } |
| 318 | + } |
| 319 | + |
| 320 | + // Finally flush the pipe buffer by splicing it out to /dev/null |
| 321 | + bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, block.GetSize(), 0); |
| 322 | +} |
| 323 | + |
| 324 | +void Udp2LogConfig::PutOnHoliday(LogProcessor & p) |
168 | 325 | { |
169 | | - boost::ptr_vector<LogProcessor>::iterator iter; |
| 326 | + PosixClock::Time endTime = GetCurrentTime(); |
| 327 | + int lossRate = p.GetLossRate(); |
| 328 | + int inputRate = GetProcessRate(); |
| 329 | + int writeRate = inputRate - lossRate; |
| 330 | + |
| 331 | + PosixClock::Time interval; |
| 332 | + |
| 333 | + if (inputRate * MIN_LOSS_RATIO < lossRate) { |
| 334 | + interval = MIN_HOLIDAY_TIME; |
| 335 | + } else { |
| 336 | + interval = PosixClock::Time((double)BLOCK_SIZE / writeRate); |
| 337 | + if (interval > MAX_HOLIDAY_TIME) { |
| 338 | + interval = MAX_HOLIDAY_TIME; |
| 339 | + } else if (interval < MIN_HOLIDAY_TIME) { |
| 340 | + interval = MIN_HOLIDAY_TIME; |
| 341 | + } |
| 342 | + } |
| 343 | + |
| 344 | + endTime += interval; |
| 345 | + p.SetHoliday(endTime); |
| 346 | + //std::cout << "Putting processor on holiday for " << (double)interval << " seconds: " << |
| 347 | + // p.GetProcessorIndex() << "\n"; |
| 348 | +} |
| 349 | + |
| 350 | +void Udp2LogConfig::UpdateCounters() |
| 351 | +{ |
| 352 | + packetRate.Update(); |
| 353 | + processRate.Update(); |
| 354 | + for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) { |
| 355 | + (**iter).UpdateLossRate(); |
| 356 | + } |
| 357 | +} |
| 358 | + |
| 359 | +void Udp2LogConfig::PrintStatusReport(std::ostream & os) |
| 360 | +{ |
| 361 | + time_t unixTime = time(NULL); |
| 362 | + char timeBuf[30]; |
| 363 | + struct tm * tm = localtime(&unixTime); |
| 364 | + strftime(timeBuf, sizeof(timeBuf), "%F %T UTC%z", tm); |
| 365 | + |
| 366 | + ProcessorIterator iter; |
170 | 367 | for (iter = processors.begin(); iter != processors.end(); iter++) { |
171 | | - uint64_t lost = iter->GetNumLost(); |
| 368 | + uint64_t lost = (**iter).GetBytesLost(); |
172 | 369 | if (lost) { |
173 | | - os << "Lost output chunks: " << lost << ": " << iter->GetName() << "\n"; |
| 370 | + os << "[" << timeBuf << "] Lost output bytes: " << lost << ": " << |
| 371 | + (**iter).GetName() << "\n"; |
| 372 | + (**iter).ResetBytesLost(); |
174 | 373 | } |
175 | 374 | } |
| 375 | + |
| 376 | + os.precision(3); |
| 377 | + os.flags(std::ios::fixed); |
| 378 | + os << "[" << timeBuf << "] Packet rate: " << ((double)GetPacketRate() / 1000) << " k/s\n"; |
176 | 379 | } |
| 380 | + |
| 381 | + |
Index: trunk/udplog/udp2log/LogProcessor.h |
— | — | @@ -4,14 +4,18 @@ |
5 | 5 | #include <fstream> |
6 | 6 | #include <sys/time.h> |
7 | 7 | #include <string> |
| 8 | +#include <boost/shared_ptr.hpp> |
8 | 9 | #include "../srclib/Exception.h" |
9 | 10 | #include "../srclib/Socket.h" |
10 | 11 | #include "../srclib/Pipe.h" |
| 12 | +#include "../srclib/PosixClock.h" |
| 13 | +#include "RateAverage.h" |
| 14 | +#include "Udp2LogConfig.h" |
11 | 15 | |
12 | 16 | class LogProcessor |
13 | 17 | { |
14 | 18 | public: |
15 | | - virtual void ProcessLine(const char *buffer, size_t size) = 0; |
| 19 | + virtual ssize_t Write(const char *buffer, size_t size) = 0; |
16 | 20 | virtual void FixIfBroken() {} |
17 | 21 | virtual ~LogProcessor() {} |
18 | 22 | virtual bool IsOpen() = 0; |
— | — | @@ -26,14 +30,55 @@ |
27 | 31 | return factor; |
28 | 32 | } |
29 | 33 | |
30 | | - uint64_t GetNumLost() { return numLost; } |
| 34 | + uint64_t GetBytesLost() { return bytesLost; } |
31 | 35 | |
| 36 | + void ResetBytesLost() { |
| 37 | + bytesLost = 0; |
| 38 | + } |
| 39 | + |
| 40 | + void UpdateLossRate() { |
| 41 | + lossRate.Update(); |
| 42 | + } |
| 43 | + |
| 44 | + int GetLossRate() { |
| 45 | + return (int)lossRate.GetRate(); |
| 46 | + } |
| 47 | + |
| 48 | + void SetHoliday(const PosixClock::Time & endTime) { |
| 49 | + holidayEndTime = endTime; |
| 50 | + } |
| 51 | + |
| 52 | + bool IsBacklogged() { |
| 53 | + return (bool)backlog.GetSize(); |
| 54 | + } |
| 55 | + |
| 56 | + Block & GetBacklogBlock() { |
| 57 | + return backlog; |
| 58 | + } |
| 59 | + |
| 60 | + void ClearBacklog() { |
| 61 | + backlog = config.GetPool().GetEmptyBlock(); |
| 62 | + } |
| 63 | + |
| 64 | + void SetBacklog(const Block & block) { |
| 65 | + backlog = block; |
| 66 | + } |
| 67 | + |
| 68 | + int GetProcessorIndex() { |
| 69 | + return index; |
| 70 | + } |
| 71 | + |
| 72 | + bool IsActive(const PosixClock::Time & currentTime); |
| 73 | + void IncrementBytesLost(size_t bytes); |
| 74 | + |
32 | 75 | protected: |
33 | | - LogProcessor(int factor_, bool flush_) |
34 | | - : counter(0), factor(factor_), flush(flush_), numLost(0) |
| 76 | + LogProcessor(Udp2LogConfig & config_, int index_, int factor_, bool flush_) |
| 77 | + : config(config_), index(index_), counter(0), |
| 78 | + factor(factor_), flush(flush_), bytesLost(0), |
| 79 | + lossRate(Udp2LogConfig::UPDATE_PERIOD, Udp2LogConfig::RATE_PERIOD), |
| 80 | + backlog(config_.GetPool().GetEmptyBlock()) |
35 | 81 | {} |
36 | 82 | |
37 | | - |
38 | 83 | bool Sample() { |
39 | 84 | if (factor != 1) { |
40 | 85 | counter++; |
— | — | @@ -48,20 +93,27 @@ |
49 | 94 | } |
50 | 95 | } |
51 | 96 | |
| 97 | + Udp2LogConfig & config; |
| 98 | + int index; |
52 | 99 | int counter; |
53 | 100 | int factor; |
54 | 101 | bool flush; |
55 | | - uint64_t numLost; |
| 102 | + uint64_t bytesLost; |
| 103 | + PosixClock::Time holidayEndTime; |
| 104 | + RateAverage lossRate; |
| 105 | + Block backlog; |
56 | 106 | }; |
57 | 107 | |
58 | 108 | class FileProcessor : public LogProcessor |
59 | 109 | { |
60 | 110 | public: |
61 | | - static LogProcessor * NewFromConfig(char * params, bool flush); |
62 | | - virtual void ProcessLine(const char *buffer, size_t size); |
| 111 | + static boost::shared_ptr<LogProcessor> NewFromConfig( |
| 112 | + Udp2LogConfig & config, int index, char * params, bool flush); |
| 113 | + virtual ssize_t Write(const char *buffer, size_t size); |
63 | 114 | |
64 | | - FileProcessor(char * fileName_, int factor_, bool flush_) |
65 | | - : LogProcessor(factor_, flush_) |
| 115 | + FileProcessor(Udp2LogConfig & config_, int index_, char * fileName_, |
| 116 | + int factor_, bool flush_) |
| 117 | + : LogProcessor(config_, index_, factor_, flush_) |
66 | 118 | { |
67 | 119 | fileName = fileName_; |
68 | 120 | f.open(fileName_, std::ios::app | std::ios::out); |
— | — | @@ -83,11 +135,13 @@ |
84 | 136 | class PipeProcessor : public LogProcessor |
85 | 137 | { |
86 | 138 | public: |
87 | | - static LogProcessor * NewFromConfig(char * params, bool flush, bool blocking); |
88 | | - virtual void ProcessLine(const char *buffer, size_t size); |
| 139 | + static boost::shared_ptr<LogProcessor> NewFromConfig( |
| 140 | + Udp2LogConfig & config, int index, char * params, bool flush, bool blocking); |
| 141 | + virtual ssize_t Write(const char *buffer, size_t size); |
89 | 142 | virtual void FixIfBroken(); |
90 | 143 | |
91 | | - PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_); |
| 144 | + PipeProcessor(Udp2LogConfig & config_, int index_, char * command_, int factor_, |
| 145 | + bool flush_, bool blocking_); |
92 | 146 | |
93 | 147 | ~PipeProcessor() |
94 | 148 | { |
— | — | @@ -107,7 +161,7 @@ |
108 | 162 | return pipes->writeEnd; |
109 | 163 | } |
110 | 164 | |
111 | | - void CopyFromPipe(Pipe & source, size_t dataLength); |
| 165 | + ssize_t CopyFromPipe(Pipe & source, size_t dataLength); |
112 | 166 | virtual std::string & GetName() { return command; } |
113 | 167 | |
114 | 168 | protected: |
— | — | @@ -115,7 +169,7 @@ |
116 | 170 | |
117 | 171 | void Open(); |
118 | 172 | void Close(); |
119 | | - void HandleError(libc_error & e); |
| 173 | + void HandleError(libc_error & e, size_t bytes); |
120 | 174 | |
121 | 175 | std::string command; |
122 | 176 | |
— | — | @@ -127,5 +181,4 @@ |
128 | 182 | |
129 | 183 | }; |
130 | 184 | |
131 | | - |
132 | 185 | #endif |
Index: trunk/udplog/udp2log/Udp2LogConfig.h |
— | — | @@ -2,14 +2,19 @@ |
3 | 3 | #define UDP2LOGCONFIG_H |
4 | 4 | |
5 | 5 | #include <string> |
6 | | -#include <boost/ptr_container/ptr_vector.hpp> |
7 | 6 | #include <stdexcept> |
8 | 7 | #include <iostream> |
9 | | -#include "LogProcessor.h" |
| 8 | +#include <vector> |
10 | 9 | #include "../srclib/PosixFile.h" |
11 | 10 | #include "../srclib/Pipe.h" |
| 11 | +#include "../srclib/PosixClock.h" |
| 12 | +#include "RateAverage.h" |
| 13 | +#include "Block.h" |
| 14 | +#include "../srclib/TypedEpollInstance.h" |
12 | 15 | |
13 | 16 | class ConfigWriteCallback; |
| 17 | +class LogProcessor; |
| 18 | +class PipeProcessor; |
14 | 19 | |
15 | 20 | /** |
16 | 21 | * The configuration and current processing state for udp2log. |
— | — | @@ -18,25 +23,108 @@ |
19 | 24 | class Udp2LogConfig |
20 | 25 | { |
21 | 26 | public: |
| 27 | + typedef boost::shared_ptr<LogProcessor> ProcessorPointer; |
| 28 | + |
22 | 29 | Udp2LogConfig(); |
23 | 30 | void Open(const std::string & name); |
24 | 31 | void Load(); |
| 32 | + void AddUnsampledPipe(ProcessorPointer p); |
25 | 33 | void FixBrokenProcessors(); |
26 | 34 | void Reload(); |
27 | | - void ProcessBuffer(const char *data, size_t dataLength); |
28 | | - void PrintLossReport(std::ostream & os); |
| 35 | + void ProcessBlock(const Block & block); |
| 36 | + void HandleBacklogs(); |
| 37 | + void CheckReadiness(); |
| 38 | + void HandleTeePipes(const Block & block); |
| 39 | + void PutOnHoliday(LogProcessor & p); |
| 40 | + void UpdateCounters(); |
| 41 | + void PrintStatusReport(std::ostream & os); |
29 | 42 | |
| 43 | + void IncrementPacketCount() { |
| 44 | + packetRate.Increment(); |
| 45 | + } |
| 46 | + |
| 47 | + void IncrementProcessBytes(int bytes) { |
| 48 | + processRate.Increment(bytes); |
| 49 | + } |
| 50 | + |
| 51 | + int GetPacketRate() { |
| 52 | + return (int)packetRate.GetRate(); |
| 53 | + } |
| 54 | + |
| 55 | + int GetProcessRate() { |
| 56 | + return (int)processRate.GetRate(); |
| 57 | + } |
| 58 | + |
30 | 59 | inline ConfigWriteCallback GetWriteCallback(); |
31 | 60 | |
| 61 | + void ResetCurrentTime() { |
| 62 | + currentTimeValid = false; |
| 63 | + } |
| 64 | + |
| 65 | + const PosixClock::Time & GetCurrentTime() { |
| 66 | + if (!currentTimeValid) { |
| 67 | + currentTime = clock.Get(); |
| 68 | + currentTimeValid = true; |
| 69 | + } |
| 70 | + return currentTime; |
| 71 | + } |
| 72 | + |
| 73 | + BlockPool & GetPool() { |
| 74 | + return pool; |
| 75 | + } |
| 76 | + |
32 | 77 | bool reload; |
33 | 78 | bool fixBrokenProcessors; |
34 | 79 | |
| 80 | + static const double UPDATE_PERIOD; |
| 81 | + static const double RATE_PERIOD; |
| 82 | + static const double MIN_LOSS_RATIO; |
| 83 | + static const PosixClock::Time MIN_HOLIDAY_TIME; |
| 84 | + static const PosixClock::Time MAX_HOLIDAY_TIME; |
| 85 | + static const size_t BLOCK_SIZE; |
| 86 | + |
35 | 87 | protected: |
| 88 | + // The storage for blocks |
| 89 | + // This declaration has to be above the declaration of any member objects |
| 90 | + // that hold Block references, such as LogProcessor, so that destruction of |
| 91 | + // the Block references occurs before the destruction of the pool. |
| 92 | + BlockPool pool; |
| 93 | + |
36 | 94 | std::string fileName; |
37 | | - boost::ptr_vector<LogProcessor> processors; |
| 95 | + |
| 96 | + typedef std::vector<boost::shared_ptr<LogProcessor> > ProcessorArray; |
| 97 | + typedef ProcessorArray::iterator ProcessorIterator; |
| 98 | + ProcessorArray processors; |
| 99 | + |
38 | 100 | PipePair bufferPipe; |
39 | 101 | |
40 | 102 | static PosixFile devNull; |
| 103 | + RateAverage packetRate, processRate; |
| 104 | + |
| 105 | + typedef TypedEpollInstance<PipeProcessor> MyEpoll; |
| 106 | + typedef boost::shared_ptr<MyEpoll> MyEpollPointer; |
| 107 | + MyEpollPointer epoll; |
| 108 | + |
| 109 | + typedef MyEpoll::TypedEventArray MyEpollEvents; |
| 110 | + typedef MyEpollEvents::iterator MyEventsIterator; |
| 111 | + // This is a temporary vector used to hold the result of an epoll_wait(). |
| 112 | + // It's in the object state instead of the stack to avoid reallocation |
| 113 | + // during the high-performance sections of the code. The same goes for |
| 114 | + // teePipes and readyPipes. |
| 115 | + MyEpollEvents epollEvents; |
| 116 | + |
| 117 | + bool currentTimeValid; |
| 118 | + PosixClock::Time currentTime; |
| 119 | + PosixClock clock; |
| 120 | + std::vector<bool> teePipes; |
| 121 | + size_t numPipeProcessors; |
| 122 | + std::vector<bool> readyPipes; |
| 123 | + |
| 124 | + typedef std::map<int, std::vector<int> > ByFactorArray; |
| 125 | + typedef ByFactorArray::iterator ByFactorIterator; |
| 126 | + ByFactorArray processorsByFactor; |
| 127 | + |
| 128 | + unsigned lineIndex; |
41 | 129 | }; |
42 | 130 | |
43 | 131 | class ConfigWatcher |
— | — | @@ -63,8 +151,8 @@ |
64 | 152 | : config(config_) |
65 | 153 | {} |
66 | 154 | |
67 | | - void operator()(const char* buffer, size_t bufSize) { |
68 | | - return config.ProcessBuffer(buffer, bufSize); |
| 155 | + void operator()(Block block) { |
| 156 | + return config.ProcessBlock(block); |
69 | 157 | } |
70 | 158 | protected: |
71 | 159 | Udp2LogConfig & config; |
— | — | @@ -74,5 +162,6 @@ |
75 | 163 | return ConfigWriteCallback(*this); |
76 | 164 | } |
77 | 165 | |
| 166 | +#include "LogProcessor.h" |
78 | 167 | |
79 | 168 | #endif |
Index: trunk/udplog/udp2log/SendBuffer.h |
— | — | @@ -4,24 +4,23 @@ |
5 | 5 | template <class Callback> |
6 | 6 | class SendBuffer { |
7 | 7 | public: |
8 | | - SendBuffer(size_t capacity_, Callback writeCallback_) |
9 | | - : capacity(capacity_), size(0), writeCallback(writeCallback_) |
10 | | - { |
11 | | - data = new char[capacity]; |
12 | | - } |
| 8 | + SendBuffer(BlockPool & pool_, size_t capacity_, Callback writeCallback_) |
| 9 | + : pool(pool_), block(pool.New()), capacity(capacity_), writeCallback(writeCallback_) |
| 10 | + {} |
13 | 11 | |
14 | 12 | void Flush() { |
15 | | - if (!size) { |
| 13 | + if (!block.GetSize()) { |
16 | 14 | return; |
17 | 15 | } |
18 | | - writeCallback(data, size); |
19 | | - size = 0; |
| 16 | + writeCallback(block); |
| 17 | + block = pool.New(); |
20 | 18 | } |
21 | 19 | |
22 | 20 | void Write(const char* buffer, size_t bufSize); |
23 | 21 | protected: |
24 | | - char * data; |
25 | | - size_t capacity, size; |
| 22 | + BlockPool & pool; |
| 23 | + Block block; |
| 24 | + size_t capacity; |
26 | 25 | Callback writeCallback; |
27 | 26 | }; |
28 | 27 | |
— | — | @@ -29,15 +28,13 @@ |
30 | 29 | void SendBuffer<Callback>::Write(const char* buffer, size_t bufSize) |
31 | 30 | { |
32 | 31 | if (bufSize > capacity) { |
| 32 | + // Truncate oversize writes |
| 33 | + bufSize = capacity; |
| 34 | + } |
| 35 | + |
| 36 | + if (block.GetSize() + bufSize > capacity) { |
33 | 37 | Flush(); |
34 | | - writeCallback(buffer, bufSize); |
35 | | - } else if (size + bufSize > capacity) { |
36 | | - Flush(); |
37 | | - memcpy(data, buffer, bufSize); |
38 | | - size = bufSize; |
39 | | - } else { |
40 | | - memcpy(data + size, buffer, bufSize); |
41 | | - size += bufSize; |
42 | 38 | } |
| 39 | + block.Append(buffer, bufSize); |
43 | 40 | } |
44 | 41 | #endif |
Index: trunk/udplog/udp2log/RateAverage.h |
— | — | @@ -0,0 +1,33 @@ |
| 2 | +#ifndef UDPLOG_RATEAVERAGE_H |
| 3 | +#define UDPLOG_RATEAVERAGE_H |
| 4 | + |
| 5 | +#include <cmath> |
| 6 | +#include <iostream> |
| 7 | + |
| 8 | +class RateAverage { |
| 9 | +public: |
| 10 | + RateAverage(double samplePeriod_, double averagePeriod_) |
| 11 | + : samplePeriod(samplePeriod_), averagePeriod(averagePeriod_) |
| 12 | + { |
| 13 | + constant = 1. - std::exp(-std::log(2) / averagePeriod_ * samplePeriod_); |
| 14 | + } |
| 15 | + |
| 16 | + void Increment(int x = 1) { |
| 17 | + currentSample += x; |
| 18 | + } |
| 19 | + |
| 20 | + void Update() { |
| 21 | + estimatedEventsPerSample = constant * currentSample + |
| 22 | + (1. - constant ) * estimatedEventsPerSample; |
| 23 | + currentSample = 0; |
| 24 | + } |
| 25 | + |
| 26 | + double GetRate() { |
| 27 | + return estimatedEventsPerSample / samplePeriod; |
| 28 | + } |
| 29 | +protected: |
| 30 | + int64_t currentSample; |
| 31 | + double samplePeriod, averagePeriod, estimatedEventsPerSample, constant; |
| 32 | +}; |
| 33 | + |
| 34 | +#endif |
Property changes on: trunk/udplog/udp2log/RateAverage.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 35 | + native |
Index: trunk/udplog/udp2log/Makefile |
— | — | @@ -0,0 +1,2 @@ |
| 2 | +all: |
| 3 | + make -C .. |
Property changes on: trunk/udplog/udp2log/Makefile |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 4 | + native |
Index: trunk/udplog/udp2log/udp2log.cpp |
— | — | @@ -19,12 +19,14 @@ |
20 | 20 | #include "../srclib/Socket.h" |
21 | 21 | #include "Udp2LogConfig.h" |
22 | 22 | #include "SendBuffer.h" |
| 23 | +#include "../srclib/EpollInstance.h" |
23 | 24 | |
24 | 25 | std::string configFileName("/etc/udp2log"); |
25 | 26 | std::string logFileName("/var/log/udp2log/udp2log.log"); |
26 | 27 | std::string pidFileName("/var/run/udp2log.pid"); |
27 | 28 | std::string daemonUserName("udp2log"); |
28 | 29 | std::string multicastAddr("0"); |
| 30 | +int udpReceiveQueue = 128; // KB |
29 | 31 | |
30 | 32 | Udp2LogConfig config; |
31 | 33 | |
— | — | @@ -119,7 +121,6 @@ |
120 | 122 | // Process command line |
121 | 123 | options_description optDesc; |
122 | 124 | optDesc.add_options() |
123 | | - ("multicast", value<string>(&multicastAddr)->default_value(multicastAddr), "multicast address to listen to") |
124 | 125 | ("help", "Show help message.") |
125 | 126 | ("port,p", value<unsigned int>(&port)->default_value(port), "UDP port.") |
126 | 127 | ("config-file,f", value<string>(&configFileName)->default_value(configFileName), |
— | — | @@ -130,7 +131,11 @@ |
131 | 132 | ("pid-file", value<string>(&pidFileName)->default_value(pidFileName), |
132 | 133 | "The location to write the new PID, if --daemon is specified.") |
133 | 134 | ("user", value<string>(&daemonUserName)->default_value(daemonUserName), |
134 | | - "User to switch to, after daemonizing"); |
| 135 | + "User to switch to, after daemonizing") |
| 136 | + ("multicast", value<string>(&multicastAddr)->default_value(multicastAddr), |
| 137 | + "Multicast address to listen to") |
| 138 | + ("recv-queue", value<int>(&udpReceiveQueue)->default_value(udpReceiveQueue), |
| 139 | + "The size of the kernel UDP receive buffer, in KB"); |
135 | 140 | |
136 | 141 | variables_map vm; |
137 | 142 | try { |
— | — | @@ -179,8 +184,14 @@ |
180 | 185 | return 1; |
181 | 186 | } |
182 | 187 | |
| 188 | + // Set up signals |
183 | 189 | signal(SIGHUP, OnHangup); |
184 | | - signal(SIGALRM, OnAlarm); |
| 190 | + |
| 191 | + struct sigaction sa; |
| 192 | + memset(&sa, 0, sizeof(sa)); |
| 193 | + sa.sa_handler = OnAlarm; |
| 194 | + sigaction(SIGALRM, &sa, NULL); |
| 195 | + |
185 | 196 | signal(SIGPIPE, SIG_IGN); |
186 | 197 | |
187 | 198 | // Open the receiving socket |
— | — | @@ -191,6 +202,7 @@ |
192 | 203 | return 1; |
193 | 204 | } |
194 | 205 | socket.SetDescriptorFlags(FD_CLOEXEC); |
| 206 | + socket.SetSockOpt(SOL_SOCKET, SO_RCVBUF, udpReceiveQueue * 1024); |
195 | 207 | socket.Bind(saddr); |
196 | 208 | |
197 | 209 | // Join a multicast group if requested |
— | — | @@ -204,35 +216,64 @@ |
205 | 217 | } |
206 | 218 | |
207 | 219 | // Process received packets |
208 | | - boost::shared_ptr<SocketAddress> address; |
209 | 220 | const size_t bufSize = 65536; |
| 221 | + |
210 | 222 | char receiveBuffer[bufSize]; |
211 | 223 | ssize_t bytesRead; |
212 | | - SendBuffer<ConfigWriteCallback> sendBuffer(PIPE_BUF, config.GetWriteCallback()); |
| 224 | + SendBuffer<ConfigWriteCallback> sendBuffer( |
| 225 | + config.GetPool(), |
| 226 | + Udp2LogConfig::BLOCK_SIZE, |
| 227 | + config.GetWriteCallback()); |
| 228 | + |
| 229 | + const PosixClock::Time reportInterval(5, 0); |
| 230 | + const PosixClock::Time updateInterval(Udp2LogConfig::UPDATE_PERIOD); |
| 231 | + PosixClock clock(CLOCK_REALTIME); |
| 232 | + PosixClock::Time nextReportTime = clock.Get() + reportInterval; |
| 233 | + PosixClock::Time nextUpdateTime = clock.Get() + updateInterval; |
| 234 | + struct itimerval itv; |
| 235 | + itv.it_interval.tv_sec = 0; |
| 236 | + itv.it_interval.tv_usec = 250000; |
| 237 | + itv.it_value = itv.it_interval; |
| 238 | + setitimer(ITIMER_REAL, &itv, NULL); |
213 | 239 | |
214 | | - time_t lossReportTime = 0; |
| 240 | + FileDescriptor::ErrorSetPointer ignoredErrors(new FileDescriptor::ErrorSet); |
| 241 | + ignoredErrors->insert(EINTR); |
| 242 | + socket.Ignore(ignoredErrors); |
215 | 243 | |
216 | 244 | for (;;) { |
217 | | - bytesRead = socket.RecvFrom(receiveBuffer, bufSize, address); |
218 | | - if (bytesRead <= 0) { |
219 | | - continue; |
220 | | - } |
| 245 | + bytesRead = socket.Recv(receiveBuffer, bufSize); |
221 | 246 | |
222 | 247 | // Reload configuration |
223 | 248 | try { |
224 | 249 | config.Reload(); |
225 | 250 | } catch (runtime_error & e) { |
226 | 251 | cerr << e.what() << endl; |
227 | | - continue; |
228 | 252 | } |
229 | 253 | |
230 | | - sendBuffer.Write(receiveBuffer, bytesRead); |
| 254 | + if (bytesRead <= 0) { |
| 255 | + // Timeout |
| 256 | + sendBuffer.Flush(); |
| 257 | + } else { |
| 258 | + sendBuffer.Write(receiveBuffer, bytesRead); |
| 259 | + config.IncrementPacketCount(); |
| 260 | + } |
231 | 261 | |
232 | | - time_t currentTime = time(NULL); |
233 | | - if (currentTime - lossReportTime > 60) { |
234 | | - config.PrintLossReport(std::cout); |
235 | | - lossReportTime = currentTime; |
| 262 | + // Counter update |
| 263 | + PosixClock::Time currentTime = clock.Get(); |
| 264 | + if (currentTime >= nextUpdateTime) { |
| 265 | + while (currentTime >= nextUpdateTime) { |
| 266 | + nextUpdateTime += updateInterval; |
| 267 | + } |
| 268 | + config.UpdateCounters(); |
236 | 269 | } |
| 270 | + |
| 271 | + // Status report |
| 272 | + if (currentTime >= nextReportTime) { |
| 273 | + while (currentTime >= nextReportTime) { |
| 274 | + nextReportTime += reportInterval; |
| 275 | + } |
| 276 | + config.PrintStatusReport(std::cout); |
| 277 | + } |
237 | 278 | } |
238 | 279 | } |
239 | 280 | |
Index: trunk/udplog/Makefile |
— | — | @@ -1,5 +1,5 @@ |
2 | 2 | TARGETS = log2udp udprecv delta udp2log/udp2log packet-loss |
3 | | -SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o |
| 3 | +SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o srclib/PosixClock.o |
4 | 4 | HOST_OBJS = srcmisc/host.o $(SRCLIB_OBJS) |
5 | 5 | LOG2UDP_OBJS = srcmisc/log2udp.o $(SRCLIB_OBJS) |
6 | 6 | UDPRECV_OBJS = srcmisc/udprecv.o $(SRCLIB_OBJS) |
— | — | @@ -30,7 +30,7 @@ |
31 | 31 | g++ $(CFLAGS) -o packet-loss srcmisc/packet-loss.cpp |
32 | 32 | |
33 | 33 | udp2log/udp2log: $(UDP2LOG_OBJS) |
34 | | - g++ $(CFLAGS) -o udp2log/udp2log $(UDP2LOG_OBJS) -lboost_program_options |
| 34 | + g++ $(CFLAGS) -o udp2log/udp2log $(UDP2LOG_OBJS) -lboost_program_options -lrt |
35 | 35 | |
36 | 36 | install: |
37 | 37 | install log2udp $(DESTDIR)/usr/bin/log2udp |
Index: trunk/udplog/srclib/Pipe.h |
— | — | @@ -6,6 +6,8 @@ |
7 | 7 | #endif |
8 | 8 | #include <fcntl.h> |
9 | 9 | |
| 10 | +#include "FileDescriptor.h" |
| 11 | + |
10 | 12 | class Pipe : public FileDescriptor |
11 | 13 | { |
12 | 14 | public: |
Index: trunk/udplog/srclib/EpollInstance.h |
— | — | @@ -0,0 +1,92 @@ |
| 2 | +#ifndef UDPLOG_EPOLL_H |
| 3 | +#define UDPLOG_EPOLL_H |
| 4 | + |
| 5 | +#include "FileDescriptor.h" |
| 6 | +#include <sys/epoll.h> |
| 7 | +#include <boost/shared_ptr.hpp> |
| 8 | + |
| 9 | +class EpollInstance : public FileDescriptor |
| 10 | +{ |
| 11 | +public: |
| 12 | + typedef std::vector<struct epoll_event> EventArray; |
| 13 | + |
| 14 | + EpollInstance(int flags = 0) |
| 15 | + : FileDescriptor() |
| 16 | + { |
| 17 | + fd = epoll_create1(flags); |
| 18 | + if (fd == -1) { |
| 19 | + good = false; |
| 20 | + RaiseError("EpollInstance constructor"); |
| 21 | + } else { |
| 22 | + good = true; |
| 23 | + } |
| 24 | + } |
| 25 | + |
| 26 | + // epoll_ctl(EPOLL_CTL_ADD) wrapper |
| 27 | + int AddPoll(const FileDescriptor & polled, struct epoll_event & event) { |
| 28 | + if (epoll_ctl(fd, EPOLL_CTL_ADD, polled.GetFD(), &event) == -1) { |
| 29 | + RaiseError("EpollInstance::AddPoll"); |
| 30 | + return errno; |
| 31 | + } else { |
| 32 | + return 0; |
| 33 | + } |
| 34 | + } |
| 35 | + |
| 36 | + // epoll_ctl(EPOLL_CTL_MOD) wrapper |
| 37 | + int ModifyPoll(const FileDescriptor & polled, struct epoll_event & event) { |
| 38 | + if (epoll_ctl(fd, EPOLL_CTL_MOD, polled.GetFD(), &event) == -1) { |
| 39 | + RaiseError("EpollInstance::ModifyPoll"); |
| 40 | + return errno; |
| 41 | + } else { |
| 42 | + return 0; |
| 43 | + } |
| 44 | + } |
| 45 | + |
| 46 | + // epoll_ctl(EPOLL_CTL_DEL) wrapper |
| 47 | + int DeletePoll(const FileDescriptor & polled) { |
| 48 | + static struct epoll_event dummy; |
| 49 | + |
| 50 | + if (epoll_ctl(fd, EPOLL_CTL_DEL, polled.GetFD(), &dummy) == -1) { |
| 51 | + RaiseError("EpollInstance::DeletePoll"); |
| 52 | + return errno; |
| 53 | + } else { |
| 54 | + return 0; |
| 55 | + } |
| 56 | + } |
| 57 | + |
| 58 | + enum {DEFAULT_WAIT_SIZE = 10}; |
| 59 | + |
| 60 | + // epoll_wait() wrapper |
| 61 | + int EpollWait(EventArray * events, int timeout) { |
| 62 | + if (!events->size()) { |
| 63 | + events->resize(DEFAULT_WAIT_SIZE); |
| 64 | + } |
| 65 | + int result = epoll_wait(fd, &((*events)[0]), events->size(), timeout); |
| 66 | + if (result == -1) { |
| 67 | + events->resize(0); |
| 68 | + RaiseError("EpollInstance::EpollWait"); |
| 69 | + } else { |
| 70 | + events->resize(result); |
| 71 | + } |
| 72 | + return result; |
| 73 | + } |
| 74 | + |
| 75 | + // epoll_pwait() wrapper |
| 76 | + int EpollSignalWait(EventArray * events, |
| 77 | + int timeout, const sigset_t & sigmask) { |
| 78 | + if (!events->size()) { |
| 79 | + events->resize(DEFAULT_WAIT_SIZE); |
| 80 | + } |
| 81 | + int result = epoll_pwait(fd, &((*events)[0]), events->size(), timeout, &sigmask); |
| 82 | + if (result == -1) { |
| 83 | + events->resize(0); |
| 84 | + RaiseError("EpollInstance::EpollSignalWait"); |
| 85 | + } else { |
| 86 | + events->resize(result); |
| 87 | + } |
| 88 | + return result; |
| 89 | + } |
| 90 | + |
| 91 | +}; |
| 92 | + |
| 93 | +#endif |
Property changes on: trunk/udplog/srclib/EpollInstance.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 94 | + native |
Index: trunk/udplog/srclib/PosixFile.h |
— | — | @@ -1,6 +1,8 @@ |
2 | 2 | #ifndef UDPLOG_POSIXFILE_H |
3 | 3 | #define UDPLOG_POSIXFILE_H |
4 | 4 | |
| 5 | +#include "FileDescriptor.h" |
| 6 | + |
5 | 7 | class PosixFile : public FileDescriptor |
6 | 8 | { |
7 | 9 | public: |
Index: trunk/udplog/srclib/TypedEpollInstance.h |
— | — | @@ -0,0 +1,113 @@ |
| 2 | +#ifndef UDPLOG_TYPEDEPOLLINSTANCE_H |
| 3 | +#define UDPLOG_TYPEDEPOLLINSTANCE_H |
| 4 | + |
| 5 | +#include "EpollInstance.h" |
| 6 | +#include <map> |
| 7 | +#include <boost/shared_ptr.hpp> |
| 8 | + |
| 9 | +// An epoll wrapper where the user data is a smart pointer of the specified type. |
| 10 | +// |
| 11 | +// A smart pointer reference to the data object will be held until the |
| 12 | +// relevant FD is removed from the poll set. |
| 13 | +// |
| 14 | +// Note that if you close an FD without calling DeletePoll(), events may |
| 15 | +// be delivered with with the wrong data pointer. |
| 16 | + |
| 17 | +template <class DataType> |
| 18 | +class TypedEpollInstance : public EpollInstance |
| 19 | +{ |
| 20 | +public: |
| 21 | + typedef boost::shared_ptr<DataType> DataPointer; |
| 22 | + typedef std::vector<std::pair<uint32_t, DataPointer> > TypedEventArray; |
| 23 | + |
| 24 | + int AddPoll(const FileDescriptor & polled, uint32_t events, DataPointer data); |
| 25 | + int ModifyPoll(const FileDescriptor & polled, uint32_t events, DataPointer data); |
| 26 | + int DeletePoll(const FileDescriptor & polled); |
| 27 | + int EpollWait(TypedEventArray * events, int timeout); |
| 28 | + int EpollSignalWait(TypedEventArray * events, int timeout, const sigset_t & sigmask); |
| 29 | +protected: |
| 30 | + void EpollSetup(TypedEventArray * events); |
| 31 | + void EpollFinish(TypedEventArray * events); |
| 32 | + |
| 33 | + typedef std::map<int, DataPointer> DataPointerMap; |
| 34 | + |
| 35 | + DataPointerMap pointers; |
| 36 | + EventArray rawEvents; |
| 37 | +}; |
| 38 | + |
| 39 | +template <class DataType> |
| 40 | +int |
| 41 | +TypedEpollInstance<DataType>::AddPoll( |
| 42 | + const FileDescriptor & polled, uint32_t events, DataPointer data) |
| 43 | +{ |
| 44 | + struct epoll_event epe; |
| 45 | + epe.events = events; |
| 46 | + epe.data.fd = polled.GetFD(); |
| 47 | + pointers[polled.GetFD()] = data; |
| 48 | + return EpollInstance::AddPoll(polled, epe); |
| 49 | +} |
| 50 | + |
| 51 | +template <class DataType> |
| 52 | +int |
| 53 | +TypedEpollInstance<DataType>::ModifyPoll( |
| 54 | + const FileDescriptor & polled, uint32_t events, DataPointer data) |
| 55 | +{ |
| 56 | + struct epoll_event epe; |
| 57 | + epe.events = events; |
| 58 | + epe.data.fd = polled.GetFD(); |
| 59 | + pointers[polled.GetFD()] = data; |
| 60 | + return EpollInstance::ModifyPoll(polled, epe); |
| 61 | +} |
| 62 | + |
| 63 | +template <class DataType> |
| 64 | +int |
| 65 | +TypedEpollInstance<DataType>::DeletePoll(const FileDescriptor & polled) |
| 66 | +{ |
| 67 | + pointers.erase(polled.GetFD()); |
| 68 | + return EpollInstance::DeletePoll(polled); |
| 69 | +} |
| 70 | + |
| 71 | +template <class DataType> |
| 72 | +void |
| 73 | +TypedEpollInstance<DataType>::EpollSetup(TypedEventArray * events) |
| 74 | +{ |
| 75 | + if (!events->size()) { |
| 76 | + rawEvents.resize(DEFAULT_WAIT_SIZE); |
| 77 | + } else { |
| 78 | + rawEvents.resize(events->size()); |
| 79 | + } |
| 80 | +} |
| 81 | + |
| 82 | +template <class DataType> |
| 83 | +void |
| 84 | +TypedEpollInstance<DataType>::EpollFinish(TypedEventArray * events) |
| 85 | +{ |
| 86 | + events->resize(rawEvents.size()); |
| 87 | + for (unsigned i = 0; i < rawEvents.size(); i++) { |
| 88 | + (*events)[i].first = rawEvents[i].events; |
| 89 | + (*events)[i].second = pointers[rawEvents[i].data.fd]; |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +template <class DataType> |
| 94 | +int |
| 95 | +TypedEpollInstance<DataType>::EpollWait(TypedEventArray * events, int timeout) |
| 96 | +{ |
| 97 | + EpollSetup(events); |
| 98 | + int result = EpollInstance::EpollWait(&rawEvents, timeout); |
| 99 | + EpollFinish(events); |
| 100 | + return result; |
| 101 | +} |
| 102 | + |
| 103 | +template <class DataType> |
| 104 | +int |
| 105 | +TypedEpollInstance<DataType>::EpollSignalWait(TypedEventArray * events, int timeout, |
| 106 | + const sigset_t & sigmask) |
| 107 | +{ |
| 108 | + EpollSetup(events); |
| 109 | + int result = EpollInstance::EpollSignalWait(&rawEvents, timeout, sigmask); |
| 110 | + EpollFinish(events); |
| 111 | + return result; |
| 112 | +} |
| 113 | + |
| 114 | +#endif |
Property changes on: trunk/udplog/srclib/TypedEpollInstance.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 115 | + native |
Index: trunk/udplog/srclib/FileDescriptor.cpp |
— | — | @@ -8,7 +8,7 @@ |
9 | 9 | throw libc_error(msg); |
10 | 10 | } |
11 | 11 | |
12 | | - std::set<int> * curIgnore = ignoreErrors.back().get(); |
| 12 | + ErrorSet * curIgnore = ignoreErrors.back().get(); |
13 | 13 | if (!curIgnore) { |
14 | 14 | // Ignore all |
15 | 15 | return; |
Index: trunk/udplog/srclib/PosixClock.cpp |
— | — | @@ -0,0 +1,41 @@ |
| 2 | +#include <time.h> |
| 3 | +#include <limits> |
| 4 | +#include <cmath> |
| 5 | + |
| 6 | +#include "PosixClock.h" |
| 7 | + |
| 8 | +const PosixClock::Time PosixClock::Time::MAX(std::numeric_limits<time_t>::max(), BILLION - 1); |
| 9 | +const PosixClock::Time PosixClock::Time::MIN(std::numeric_limits<time_t>::min(), 0); |
| 10 | +const PosixClock::Time PosixClock::Time::ZERO(0, 0); |
| 11 | + |
| 12 | +PosixClock::Time::Time(double seconds) { |
| 13 | + if (seconds > std::numeric_limits<time_t>::max()) { |
| 14 | + *this = MAX; |
| 15 | + } else if (seconds < std::numeric_limits<time_t>::min()) { |
| 16 | + *this = MIN; |
| 17 | + } else { |
| 18 | + double intPart = std::floor(seconds); |
| 19 | + data.tv_sec = (time_t)intPart; |
| 20 | + data.tv_nsec = (long)((seconds - intPart) * BILLION); |
| 21 | + } |
| 22 | + Normalise(); |
| 23 | +} |
| 24 | + |
| 25 | +void PosixClock::Time::FixNormalisation() |
| 26 | +{ |
| 27 | + if (data.tv_nsec < 0) { |
| 28 | + long seconds = (-data.tv_nsec) / BILLION; |
| 29 | + data.tv_sec -= seconds; |
| 30 | + data.tv_nsec += seconds * BILLION; |
| 31 | + } |
| 32 | + if (data.tv_nsec >= BILLION) { |
| 33 | + long seconds = data.tv_nsec / BILLION; |
| 34 | + data.tv_sec += seconds; |
| 35 | + data.tv_nsec -= seconds * BILLION; |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +PosixClock::Time::operator double() |
| 40 | +{ |
| 41 | + return (double)data.tv_sec + ((double)data.tv_nsec) / BILLION; |
| 42 | +} |
Property changes on: trunk/udplog/srclib/PosixClock.cpp |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 43 | + native |
Index: trunk/udplog/srclib/FileDescriptor.h |
— | — | @@ -21,6 +21,9 @@ |
22 | 22 | : fd(-1), good(false), ownFd(true) |
23 | 23 | {} |
24 | 24 | public: |
| 25 | + |
| 26 | + typedef std::set<int> ErrorSet; |
| 27 | + typedef boost::shared_ptr<ErrorSet> ErrorSetPointer; |
25 | 28 | |
26 | 29 | virtual ~FileDescriptor() { |
27 | 30 | Close(); |
— | — | @@ -111,13 +114,13 @@ |
112 | 115 | } |
113 | 116 | |
114 | 117 | // Ignore a given set of errors |
115 | | - void Ignore(boost::shared_ptr<std::set<int> > s) { |
| 118 | + void Ignore(ErrorSetPointer s) { |
116 | 119 | ignoreErrors.push_back(s); |
117 | 120 | } |
118 | 121 | |
119 | 122 | // Ignore all errors |
120 | 123 | void IgnoreAll() { |
121 | | - ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL)); |
| 124 | + ignoreErrors.push_back(ErrorSetPointer((std::set<int>*)NULL)); |
122 | 125 | } |
123 | 126 | |
124 | 127 | // Restore the previous ignore set |
— | — | @@ -131,6 +134,6 @@ |
132 | 135 | int fd; |
133 | 136 | bool good; |
134 | 137 | bool ownFd; |
135 | | - std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors; |
| 138 | + std::vector<ErrorSetPointer> ignoreErrors; |
136 | 139 | }; |
137 | 140 | #endif |
Index: trunk/udplog/srclib/PosixClock.h |
— | — | @@ -0,0 +1,112 @@ |
| 2 | +#ifndef UDPLOG_POSIXCLOCK_H |
| 3 | +#define UDPLOG_POSIXCLOCK_H |
| 4 | + |
| 5 | +#include <time.h> |
| 6 | +#include <boost/operators.hpp> |
| 7 | +#include "Exception.h" |
| 8 | +class PosixClock |
| 9 | +{ |
| 10 | +public: |
| 11 | + class Time : boost::additive<Time, |
| 12 | + boost::totally_ordered<Time |
| 13 | + > > |
| 14 | + { |
| 15 | + public: |
| 16 | + Time() { |
| 17 | + data.tv_sec = data.tv_nsec = 0; |
| 18 | + } |
| 19 | + |
| 20 | + Time(time_t sec, long nsec = 0) { |
| 21 | + data.tv_sec = sec; |
| 22 | + data.tv_nsec = nsec; |
| 23 | + Normalise(); |
| 24 | + } |
| 25 | + |
| 26 | + // Saturates at MIN or MAX |
| 27 | + Time(double seconds); |
| 28 | + |
| 29 | + Time & operator+=(const Time & other) { |
| 30 | + data.tv_sec += other.data.tv_sec; |
| 31 | + data.tv_nsec += other.data.tv_nsec; |
| 32 | + Normalise(); |
| 33 | + return *this; |
| 34 | + } |
| 35 | + |
| 36 | + Time & operator-=(const Time & other) { |
| 37 | + data.tv_sec -= other.data.tv_sec; |
| 38 | + data.tv_nsec -= other.data.tv_nsec; |
| 39 | + Normalise(); |
| 40 | + return *this; |
| 41 | + } |
| 42 | + |
| 43 | + operator double(); |
| 44 | + |
| 45 | + enum {BILLION = 1000000000}; |
| 46 | + |
| 47 | + struct timespec data; |
| 48 | + static const Time MAX; |
| 49 | + static const Time MIN; |
| 50 | + static const Time ZERO; |
| 51 | + |
| 52 | + protected: |
| 53 | + |
| 54 | + void Normalise() { |
| 55 | + if (data.tv_nsec >= BILLION || data.tv_nsec < 0) { |
| 56 | + FixNormalisation(); |
| 57 | + } |
| 58 | + } |
| 59 | + |
| 60 | + void FixNormalisation(); |
| 61 | + }; |
| 62 | + |
| 63 | + PosixClock(clockid_t id_) |
| 64 | + : id(id_), resSet(false) |
| 65 | + {} |
| 66 | + |
| 67 | + Time Get() { |
| 68 | + Time t; |
| 69 | + int result = clock_gettime(id, &t.data); |
| 70 | + if (result == -1) { |
| 71 | + throw libc_error("PosixClock::Get"); |
| 72 | + } |
| 73 | + return t; |
| 74 | + } |
| 75 | + |
| 76 | + void Set(const Time & t) { |
| 77 | + int result = clock_settime(id, &t.data); |
| 78 | + if (result == -1) { |
| 79 | + throw libc_error("PosixClock::Set"); |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + Time GetRes() { |
| 84 | + if (!resSet) { |
| 85 | + resSet = true; |
| 86 | + clock_getres(id, &res.data); |
| 87 | + } |
| 88 | + return res; |
| 89 | + } |
| 90 | + |
| 91 | +protected: |
| 92 | + clockid_t id; |
| 93 | + Time res; |
| 94 | + bool resSet; |
| 95 | +}; |
| 96 | + |
| 97 | +inline bool operator==(const PosixClock::Time & a, const PosixClock::Time & b) |
| 98 | +{ |
| 99 | + return a.data.tv_sec == b.data.tv_sec && a.data.tv_nsec == b.data.tv_nsec; |
| 100 | +} |
| 101 | + |
| 102 | +inline bool operator<(const PosixClock::Time & a, const PosixClock::Time & b) |
| 103 | +{ |
| 104 | + if (a.data.tv_sec < b.data.tv_sec) { |
| 105 | + return true; |
| 106 | + } |
| 107 | + if (a.data.tv_nsec < b.data.tv_nsec) { |
| 108 | + return true; |
| 109 | + } |
| 110 | + return false; |
| 111 | +} |
| 112 | + |
| 113 | +#endif |
Property changes on: trunk/udplog/srclib/PosixClock.h |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 114 | + native |
Index: trunk/udplog/srclib/Socket.h |
— | — | @@ -102,10 +102,32 @@ |
103 | 103 | ssize_t length = recvfrom(fd, buf, len, flags, SocketAddress::GetBuffer(), &addrLength); |
104 | 104 | if (length == (ssize_t)-1) { |
105 | 105 | RaiseError("Socket::RecvFrom"); |
| 106 | + } else { |
| 107 | + to = SocketAddress::NewFromBuffer(); |
106 | 108 | } |
107 | | - to = SocketAddress::NewFromBuffer(); |
108 | 109 | return length; |
109 | 110 | } |
| 111 | + |
| 112 | + int SetSockOpt(int level, int optName, const void * optValue, socklen_t optLength) { |
| 113 | + int result = setsockopt(fd, level, optName, optValue, optLength); |
| 114 | + if (result == -1) { |
| 115 | + RaiseError("Socket::SetSockOpt"); |
| 116 | + return errno; |
| 117 | + } |
| 118 | + return result; |
| 119 | + } |
| 120 | + |
| 121 | + int SetSockOpt(int level, int optName, bool optValue) { |
| 122 | + return SetSockOpt(level, optName, (int)optValue); |
| 123 | + } |
| 124 | + |
| 125 | + int SetSockOpt(int level, int optName, int optValue) { |
| 126 | + return SetSockOpt(level, optName, &optValue, sizeof(int)); |
| 127 | + } |
| 128 | + |
| 129 | + int SetSockOpt(int level, int optName, const std::string & optValue) { |
| 130 | + return SetSockOpt(level, optName, optValue.data(), optValue.size()); |
| 131 | + } |
110 | 132 | protected: |
111 | 133 | boost::shared_ptr<SocketAddress> peer; |
112 | 134 | }; |