r101616 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r101615‎ | r101616 | r101617 >
Date:11:18, 2 November 2011
Author:tstarling
Status:deferred
Tags:
Comment:
Committing work in progress as a reference point prior to further testing. Do not deploy.
* Increase the block size to 64KB for a substantial performance improvement.
* Implemented a single-block per-pipe backlog buffer which ensures that partial writes are eventually completed. Entire 64KB blocks are thrown away while waiting for the backlog to be written.
* Added block pool classes Block, BlockMemory and BlockPool, for efficient handling of backlog buffers (no copy, no allocation).
* When a pipe reports that it would block, or does a partial write, put it "on holiday" to avoid high CPU usage due to large numbers of backlog operations.
* Per Asher's suggestion, increase the UDP receive buffer (SO_RCVBUF) by a configurable amount on startup. Added relevant wrappers for setsockopt().
* Added some typedefs to ease FileDescriptor::Ignore() usage.
* Implemented a UDP read timeout using setitimer(). This allows statistics to be updated and regular reports to be done. Requires the SIGALRM handler to not restart syscalls by default. I originally tried it with epoll but that was twice as slow as a blocking read.
* Renamed Udp2LogConfig::ProcessBuffer() to Udp2LogConfig::ProcessBlock() and split it up a bit for sanity.
* In the status report, add a timestamp to the start of each line.
* Added a generic rate average class. Used it to generate packet per second stats for eye candy, and per-pipe loss rate stats for holiday time tuning. Rate averaging is done by a single-pole IIR filter, apparently known to statisticians as an EWMA.
* Fixed missing header include FileDescriptor.h in two places.
* Added object wrappers for epoll and clock_gettime().
* Currently, the pipe write loss rate is high even when CPU usage is low, due to pipes randomly not clearing their buffers. I'm doing some testing to work out what can be done about that.
Modified paths:
  • /trunk/udplog/Makefile (modified) (history)
  • /trunk/udplog/srclib/EpollInstance.h (added) (history)
  • /trunk/udplog/srclib/FileDescriptor.cpp (modified) (history)
  • /trunk/udplog/srclib/FileDescriptor.h (modified) (history)
  • /trunk/udplog/srclib/Pipe.h (modified) (history)
  • /trunk/udplog/srclib/PosixClock.cpp (added) (history)
  • /trunk/udplog/srclib/PosixClock.h (added) (history)
  • /trunk/udplog/srclib/PosixFile.h (modified) (history)
  • /trunk/udplog/srclib/Socket.h (modified) (history)
  • /trunk/udplog/srclib/TypedEpollInstance.h (added) (history)
  • /trunk/udplog/udp2log/Block.h (added) (history)
  • /trunk/udplog/udp2log/LogProcessor.cpp (modified) (history)
  • /trunk/udplog/udp2log/LogProcessor.h (modified) (history)
  • /trunk/udplog/udp2log/Makefile (added) (history)
  • /trunk/udplog/udp2log/RateAverage.h (added) (history)
  • /trunk/udplog/udp2log/SendBuffer.h (modified) (history)
  • /trunk/udplog/udp2log/Udp2LogConfig.cpp (modified) (history)
  • /trunk/udplog/udp2log/Udp2LogConfig.h (modified) (history)
  • /trunk/udplog/udp2log/udp2log.cpp (modified) (history)

Diff [purge]

Index: trunk/udplog/udp2log/Block.h
@@ -0,0 +1,178 @@
 2+#ifndef UDPLOG_BLOCK_H
 3+#define UDPLOG_BLOCK_H
 4+
 5+#include <stdexcept>
 6+#include <boost/pool/pool.hpp>
 7+#include <boost/pool/object_pool.hpp>
 8+
 9+class BlockPool;
 10+class BlockMemory;
 11+
 12+class Block {
 13+public:
 14+ // Can't put the function definition here because BlockMemory is incomplete
 15+ inline Block(BlockMemory * mem_);
 16+ inline Block(const Block & b);
 17+ inline ~Block();
 18+ inline void Destroy();
 19+ inline Block & operator=(const Block & b);
 20+
 21+ // Return a new block pointing to the same memory, which is a substring of
 22+ // *this, with a start position offset by the specified amount relative to
 23+ // the start position of *this.
 24+ inline Block Offset(size_t newOffset) const;
 25+
 26+ inline char * GetData();
 27+ inline const char * GetData() const;
 28+ inline size_t GetCapacity() const;
 29+ inline size_t GetSize() const;
 30+ inline void Append(const char * appendData, size_t appendSize);
 31+
 32+ BlockMemory * mem;
 33+ size_t offset;
 34+ size_t size;
 35+};
 36+
 37+class BlockMemory {
 38+public:
 39+ inline BlockMemory(BlockPool * pool_, char * data_, size_t size_);
 40+ inline ~BlockMemory();
 41+ inline Block GetReference();
 42+
 43+ BlockPool * pool;
 44+ char * data;
 45+ size_t size;
 46+ volatile int numRefs;
 47+};
 48+
 49+
 50+class BlockPool {
 51+public:
 52+
 53+ BlockPool(size_t blockSize_)
 54+ : dataPool(blockSize_), blockSize(blockSize_), emptyBlock(New())
 55+ {}
 56+
 57+ Block New() {
 58+ char * data = (char*)dataPool.malloc();
 59+ if (!data) {
 60+ throw std::bad_alloc();
 61+ }
 62+ BlockMemory * mem = objectPool.construct(this, data, blockSize);
 63+ if (!mem) {
 64+ throw std::bad_alloc();
 65+ }
 66+ return mem->GetReference();
 67+ }
 68+
 69+ Block GetEmptyBlock() {
 70+ return emptyBlock;
 71+ }
 72+
 73+ void DeleteMemory(BlockMemory * mem) {
 74+ objectPool.destroy(mem);
 75+ }
 76+
 77+ void DeleteData(char * data) {
 78+ dataPool.free(data);
 79+ }
 80+
 81+protected:
 82+ boost::pool<> dataPool;
 83+ boost::object_pool<BlockMemory> objectPool;
 84+ size_t blockSize;
 85+ Block emptyBlock;
 86+};
 87+
 88+//-----------------------------------------------------------------------------
 89+// Block
 90+//-----------------------------------------------------------------------------
 91+
 92+Block::Block(BlockMemory * mem_)
 93+ : mem(mem_), offset(0), size(0)
 94+{
 95+ mem->numRefs++;
 96+}
 97+
 98+Block::Block(const Block & b)
 99+ : mem(b.mem), offset(b.offset), size(b.size)
 100+{
 101+ mem->numRefs++;
 102+}
 103+
 104+Block::~Block() {
 105+ Destroy();
 106+}
 107+
 108+void Block::Destroy() {
 109+ mem->numRefs--;
 110+ if (mem->numRefs <= 0) {
 111+ mem->pool->DeleteMemory(mem);
 112+ mem = NULL;
 113+ }
 114+}
 115+
 116+Block & Block::operator=(const Block & b) {
 117+ Destroy();
 118+ mem = b.mem;
 119+ offset = b.offset;
 120+ size = b.size;
 121+ mem->numRefs++;
 122+ return *this;
 123+}
 124+
 125+Block Block::Offset(size_t newOffset) const {
 126+ if (offset + newOffset > size) {
 127+ throw std::runtime_error("Block::Offset buffer overrun");
 128+ }
 129+ Block b(*this);
 130+ b.offset += newOffset;
 131+ b.size -= newOffset;
 132+ return b;
 133+}
 134+
 135+char * Block::GetData() {
 136+ return mem->data + offset;
 137+}
 138+
 139+const char * Block::GetData() const {
 140+ return mem->data + offset;
 141+}
 142+
 143+size_t Block::GetCapacity() const {
 144+ if (offset > mem->size) {
 145+ return 0;
 146+ } else {
 147+ return mem->size - offset;
 148+ }
 149+}
 150+
 151+size_t Block::GetSize() const {
 152+ return size;
 153+}
 154+
 155+void Block::Append(const char * appendData, size_t appendSize) {
 156+ if (GetCapacity() < size + appendSize) {
 157+ throw std::runtime_error("Block::Append buffer overrun");
 158+ }
 159+ memcpy(GetData() + size, appendData, appendSize);
 160+ size += appendSize;
 161+}
 162+
 163+//-----------------------------------------------------------------------------
 164+// BlockMemory
 165+//-----------------------------------------------------------------------------
 166+BlockMemory::BlockMemory(BlockPool * pool_, char * data_, size_t size_)
 167+ : pool(pool_), data(data_), size(size_), numRefs(0)
 168+{}
 169+
 170+BlockMemory::~BlockMemory() {
 171+ pool->DeleteData(data);
 172+ data = NULL;
 173+}
 174+
 175+Block BlockMemory::GetReference() {
 176+ return Block(this);
 177+}
 178+
 179+#endif
Property changes on: trunk/udplog/udp2log/Block.h
___________________________________________________________________
Added: svn:eol-style
1180 + native
Index: trunk/udplog/udp2log/LogProcessor.cpp
@@ -4,14 +4,38 @@
55 #include <linux/limits.h>
66 #include <sys/types.h>
77 #include <sys/wait.h>
 8+#include <sys/prctl.h>
89 #include "LogProcessor.h"
910 #include "Udp2LogConfig.h"
1011
1112 //---------------------------------------------------------------------------
 13+// LogProcessor
 14+//---------------------------------------------------------------------------
 15+bool LogProcessor::IsActive(const PosixClock::Time & currentTime) {
 16+ if (!IsOpen()) {
 17+ return false;
 18+ }
 19+ if (currentTime > holidayEndTime) {
 20+ return true;
 21+ } else {
 22+ // On holiday
 23+ return false;
 24+ }
 25+}
 26+
 27+void LogProcessor::IncrementBytesLost(size_t bytes) {
 28+ if (bytes) {
 29+ bytesLost += bytes;
 30+ lossRate.Increment(bytes);
 31+ }
 32+}
 33+
 34+//---------------------------------------------------------------------------
1235 // FileProcessor
1336 //---------------------------------------------------------------------------
1437
15 -LogProcessor * FileProcessor::NewFromConfig(char * params, bool flush)
 38+boost::shared_ptr<LogProcessor> FileProcessor::NewFromConfig(
 39+ Udp2LogConfig & config, int index, char * params, bool flush)
1640 {
1741 char * strFactor = strtok(params, " \t");
1842 if (strFactor == NULL) {
@@ -26,37 +50,41 @@
2751 );
2852 }
2953 char * filename = strtok(NULL, "");
30 - FileProcessor * fp = new FileProcessor(filename, factor, flush);
 54+ FileProcessor * fp = new FileProcessor(config, index, filename, factor, flush);
3155 if (!fp->IsOpen()) {
3256 delete fp;
3357 throw ConfigError("Unable to open file");
3458 }
3559 std::cerr << "Opened log file " << filename << " with sampling factor " << factor << std::endl;
36 - return (LogProcessor*)fp;
 60+ return boost::shared_ptr<LogProcessor>(fp);
3761 }
3862
39 -void FileProcessor::ProcessLine(const char *buffer, size_t size)
 63+ssize_t FileProcessor::Write(const char *buffer, size_t size)
4064 {
41 - if (Sample()) {
 65+ if (IsActive(config.GetCurrentTime())) {
4266 f.write(buffer, size);
4367 if (flush) {
4468 f.flush();
4569 }
4670 }
 71+ return (ssize_t)size;
4772 }
4873
4974 //---------------------------------------------------------------------------
5075 // PipeProcessor
5176 //---------------------------------------------------------------------------
5277
53 -PipeProcessor::PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_)
54 - : LogProcessor(factor_, flush_), child(0), blocking(blocking_)
 78+PipeProcessor::PipeProcessor(Udp2LogConfig & config_, int index_,
 79+ char * command_, int factor_, bool flush_, bool blocking_)
 80+ : LogProcessor(config_, index_, factor_, flush_),
 81+ child(0), blocking(blocking_)
5582 {
5683 command = command_;
5784 Open();
5885 }
5986
60 -LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush, bool blocking)
 87+boost::shared_ptr<LogProcessor> PipeProcessor::NewFromConfig(
 88+ Udp2LogConfig & config, int index, char * params, bool flush, bool blocking)
6189 {
6290 char * strFactor = strtok(params, " \t");
6391 if (strFactor == NULL) {
@@ -71,20 +99,19 @@
72100 );
73101 }
74102 char * command = strtok(NULL, "");
75 - PipeProcessor * pp = new PipeProcessor(command, factor, flush, blocking);
 103+ PipeProcessor * pp = new PipeProcessor(config, index, command, factor, flush, blocking);
76104 if (!pp->IsOpen()) {
77105 delete pp;
78106 throw ConfigError("Unable to open pipe");
79107 }
80108 std::cerr << "Opened pipe with factor " << factor << ": " << command << std::endl;
81 - return (LogProcessor*)pp;
 109+ return boost::shared_ptr<LogProcessor>(pp);
82110 }
83111
84 -void PipeProcessor::HandleError(libc_error & e)
 112+void PipeProcessor::HandleError(libc_error & e, size_t bytes)
85113 {
86114 bool restart;
87115 if (e.code == EAGAIN) {
88 - numLost++;
89116 restart = false;
90117 } else if (e.code == EPIPE) {
91118 std::cerr << "Pipe terminated, suspending output: " << command << std::endl;
@@ -100,27 +127,23 @@
101128 }
102129 }
103130
104 -void PipeProcessor::ProcessLine(const char *buffer, size_t size)
 131+ssize_t PipeProcessor::Write(const char *buffer, size_t size)
105132 {
106 - if (!child) {
107 - return;
 133+ if (!IsActive(config.GetCurrentTime())) {
 134+ IncrementBytesLost(size);
 135+ return size;
108136 }
109137
 138+ ssize_t bytesWritten = 0;
110139
111 - if (Sample()) {
112 - try {
113 - if (blocking && size > PIPE_BUF) {
114 - // Write large packets in blocking mode to preserve data integrity
115 - GetPipe().SetStatusFlags(0);
116 - GetPipe().Write(buffer, size);
117 - GetPipe().SetStatusFlags(O_NONBLOCK);
118 - } else {
119 - GetPipe().Write(buffer, size);
120 - }
121 - } catch (libc_error & e) {
122 - HandleError(e);
123 - }
 140+ try {
 141+ bytesWritten = GetPipe().Write(buffer, size);
 142+ } catch (libc_error & e) {
 143+ bytesWritten = 0;
 144+ HandleError(e, size);
124145 }
 146+ IncrementBytesLost(size - bytesWritten);
 147+ return bytesWritten;
125148 }
126149
127150 void PipeProcessor::FixIfBroken()
@@ -141,8 +164,10 @@
142165 {
143166 if (child) {
144167 int status = 0;
 168+ // Send HUP signal
 169+ GetPipe().Close();
 170+ // Wait for it to respond
145171 waitpid(child, &status, 0);
146 - GetPipe().Close();
147172 child = 0;
148173 }
149174 }
@@ -156,6 +181,7 @@
157182 child = fork();
158183 if (!child) {
159184 // This is the child process
 185+ prctl(PR_SET_PDEATHSIG, SIGTERM, 0, 0, 0);
160186 pipes->writeEnd.Close();
161187 pipes->readEnd.Dup2(STDIN_FILENO);
162188 pipes->readEnd.Close();
@@ -180,27 +206,27 @@
181207 }
182208 }
183209
184 -void PipeProcessor::CopyFromPipe(Pipe & source, size_t dataLength)
 210+ssize_t PipeProcessor::CopyFromPipe(Pipe & source, size_t size)
185211 {
186 - if (!child) {
187 - return;
 212+ if (!IsActive(config.GetCurrentTime())) {
 213+ IncrementBytesLost(size);
 214+ return size;
188215 }
189216
 217+ ssize_t bytesWritten = 0;
190218 int flags = 0;
191219 if (!blocking) {
192220 flags |= SPLICE_F_NONBLOCK;
193221 }
194222 try {
195 - if (blocking && dataLength > PIPE_BUF) {
196 - // Write large packets in blocking mode to preserve data integrity
197 - GetPipe().SetStatusFlags(0);
198 - source.Tee(GetPipe(), dataLength, 0);
199 - GetPipe().SetStatusFlags(O_NONBLOCK);
200 - } else {
201 - source.Tee(GetPipe(), dataLength, flags);
202 - }
 223+ bytesWritten = source.Tee(GetPipe(), size, flags);
203224 } catch (libc_error & e) {
204 - HandleError(e);
 225+ bytesWritten = 0;
 226+ HandleError(e, size);
205227 }
 228+ IncrementBytesLost(size - bytesWritten);
 229+ return bytesWritten;
206230 }
207231
 232+
 233+
Index: trunk/udplog/udp2log/Udp2LogConfig.cpp
@@ -1,14 +1,59 @@
22 #include <fstream>
33 #include <cstring>
44 #include <sstream>
 5+#include <time.h>
 6+#include <cmath>
57
68 #include "Udp2LogConfig.h"
79
810 PosixFile Udp2LogConfig::devNull("/dev/null", O_WRONLY);
911
 12+// UpdateCounters() should be called once every period of this length
 13+const double Udp2LogConfig::UPDATE_PERIOD = 1;
 14+
 15+// The packet rate counter uses an exponentially-weighted moving average filter.
 16+// This is the number of seconds corresponding to half of the contribution to
 17+// the result.
 18+const double Udp2LogConfig::RATE_PERIOD = 60;
 19+
 20+// Minimum percentage loss in a pipe before it can be considered to be a
 21+// measure of pipe capacity. Low loss implies that the maximum rate is
 22+// unknown, not that it equals the input rate.
 23+const double Udp2LogConfig::MIN_LOSS_RATIO = 0.05;
 24+
 25+// Minimum time a pipe can be put on holiday. This is used when a reliable
 26+// estimate of the pipe capacity is not available.
 27+const PosixClock::Time Udp2LogConfig::MIN_HOLIDAY_TIME(100e-6);
 28+
 29+// Maximum time a pipe can be put on holiday. The point of holiday is to avoid
 30+// regular backlog write operations, which adversely affect the global
 31+// throughput due to high CPU overhead. This rationale only makes sense as long
 32+// as the holiday time is only a few times higher than the write overhead.
 33+const PosixClock::Time Udp2LogConfig::MAX_HOLIDAY_TIME(1e-3);
 34+
 35+// The maximum input block size. On Linux 2.6.11 and later, this can be up
 36+// to 64KB. If it was more than 64KB, the temporary pipe buffer used for
 37+// tee() operations would overflow. In Linux 2.6.35 and later,
 38+// fcntl(F_SETPIPE_SZ) can be used to set the size of the temporary pipe
 39+// buffer, but currently we're not deploying to that version of Linux, so
 40+// it's unimplemented.
 41+const size_t Udp2LogConfig::BLOCK_SIZE = 65536;
 42+
1043 Udp2LogConfig::Udp2LogConfig()
11 - : reload(false)
12 -{}
 44+ : reload(false),
 45+ pool(BLOCK_SIZE),
 46+ packetRate(UPDATE_PERIOD, RATE_PERIOD),
 47+ processRate(UPDATE_PERIOD, RATE_PERIOD),
 48+ currentTimeValid(false),
 49+ clock(CLOCK_REALTIME),
 50+ numPipeProcessors(0),
 51+ lineIndex(0)
 52+{
 53+ // Make the buffer pipe non-blocking
 54+ // Throwing an exception and aborting is better than hanging forever
 55+ bufferPipe.writeEnd.SetStatusFlags(O_NONBLOCK);
 56+ bufferPipe.readEnd.SetStatusFlags(O_NONBLOCK);
 57+}
1358
1459 void Udp2LogConfig::Open(const std::string & name)
1560 {
@@ -23,7 +68,7 @@
2469 char line[maxLineSize];
2570 char * type;
2671 char * params;
27 - boost::ptr_vector<LogProcessor> newProcessors;
 72+ ProcessorArray newProcessors;
2873
2974 ifstream f(fileName.c_str());
3075 if (!f.good()) {
@@ -31,6 +76,7 @@
3277 }
3378
3479 int lineNum = 1;
 80+ int index = 0;
3581 try {
3682 // Parse all lines
3783 for (f.getline(line, maxLineSize); f.good(); f.getline(line, maxLineSize), lineNum++) {
@@ -43,7 +89,7 @@
4490 }
4591
4692 params = strtok(NULL, "");
47 - LogProcessor * processor = NULL;
 93+ ProcessorPointer processor;
4894 bool flush = false, blocking = false;
4995
5096 if (!strcmp(type, "flush")) {
@@ -59,9 +105,9 @@
60106
61107 if (!strcmp(type, "file")) {
62108 // TODO: support blocking for FileProcessor
63 - processor = FileProcessor::NewFromConfig(params, flush);
 109+ processor = FileProcessor::NewFromConfig(*this, index++, params, flush);
64110 } else if (!strcmp(type, "pipe")) {
65 - processor = PipeProcessor::NewFromConfig(params, flush, blocking);
 111+ processor = PipeProcessor::NewFromConfig(*this, index++, params, flush, blocking);
66112 } else {
67113 throw ConfigError("Unrecognised log type");
68114 }
@@ -71,21 +117,48 @@
72118 }
73119 }
74120
 121+ // Make a new epoll instance, deleting the old one
 122+ epoll = MyEpollPointer(new MyEpoll);
 123+
75124 // Swap in the new configuration
76125 // The old configuration will go out of scope, closing files and pipes
77126 processors.swap(newProcessors);
 127+
78128 } catch (ConfigError & e) {
79129 stringstream s;
80130 s << "Error in configuration file on line " << lineNum << ": " << e.what();
81131 throw runtime_error(s.str().c_str());
82132 }
 133+
 134+ // Initialise epoll and processorsByFactor
 135+ processorsByFactor.clear();
 136+ numPipeProcessors = 0;
 137+ for (unsigned i = 0; i < processors.size(); i++) {
 138+ ProcessorPointer p = processors[i];
 139+ if (p->IsUnsampledPipe()) {
 140+ AddUnsampledPipe(p);
 141+ }
 142+ processorsByFactor[p->GetFactor()].push_back(i);
 143+ }
 144+
 145+ epollEvents.resize(processors.size());
83146 }
84147
 148+void Udp2LogConfig::AddUnsampledPipe(ProcessorPointer lp) {
 149+ boost::shared_ptr<PipeProcessor> p =
 150+ boost::dynamic_pointer_cast<PipeProcessor, LogProcessor>(lp);
 151+ if (!p) {
 152+ throw std::runtime_error(
 153+ "Udp2LogConfig::AddUnsampledPipe: only works on PipeProcessor for now");
 154+ }
 155+ epoll->AddPoll(p->GetPipe(), EPOLLOUT, p);
 156+ numPipeProcessors++;
 157+}
 158+
85159 void Udp2LogConfig::FixBrokenProcessors()
86160 {
87 - boost::ptr_vector<LogProcessor>::iterator i;
88 - for (i = processors.begin(); i != processors.end(); i++) {
89 - i->FixIfBroken();
 161+ for (ProcessorIterator i = processors.begin(); i != processors.end(); i++) {
 162+ (**i).FixIfBroken();
90163 }
91164 }
92165
@@ -101,42 +174,43 @@
102175 }
103176 }
104177
105 -void Udp2LogConfig::ProcessBuffer(const char *data, size_t dataLength)
 178+void Udp2LogConfig::ProcessBlock(const Block & block)
106179 {
107 - boost::ptr_vector<LogProcessor>::iterator iter;
 180+ ResetCurrentTime();
 181+ PosixClock::Time currentTime = GetCurrentTime();
 182+
 183+ // Handle any pending backlog buffers, from previous failed operations
 184+ HandleBacklogs();
 185+
 186+ // Do an epoll to make sure all pipes are ready for input
 187+ // If any aren't ready, put them on holiday
 188+ CheckReadiness();
 189+
 190+ // Identify the pipes that can be written to with tee()
 191+ ProcessorIterator iter;
 192+ teePipes.assign(processors.size(), false);
 193+
108194 int numPipes = 0;
109 -
110195 for (iter = processors.begin(); iter != processors.end(); iter++) {
111 - if (iter->IsUnsampledPipe() && iter->IsOpen()) {
 196+ if ((**iter).IsUnsampledPipe() && (**iter).IsActive(currentTime)) {
 197+ teePipes.at(iter - processors.begin()) = true;
112198 numPipes++;
113199 }
114200 }
115201
 202+ // Handle the tee writes if there are enough to make it worthwhile
 203+ bool teeDone = false;
116204 if (numPipes >= 2) {
117 - // Efficiently send the packet to the unsampled pipes.
118 - // First load the data into a pipe buffer.
119 - // It just so happens that the size of the pipe buffer is the same
120 - // as the maximum size of a UDP packet, so this won't block.
121 - bufferPipe.writeEnd.Write(data, dataLength);
122 -
123 - // Now send it out to all unsampled pipes using the zero-copy tee() syscall.
124 - for (iter = processors.begin(); iter != processors.end(); iter++) {
125 - if (iter->IsUnsampledPipe() && iter->IsOpen()) {
126 - PipeProcessor & p = dynamic_cast<PipeProcessor&>(*iter);
127 - p.CopyFromPipe(bufferPipe.readEnd, dataLength);
128 - }
129 - }
130 -
131 - // Finally flush the pipe buffer by splicing it out to /dev/null
132 - bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, dataLength, 0);
 205+ HandleTeePipes(block);
 206+ teeDone = true;
133207 }
134208
135209 // For the sampled processors, split the buffer into lines
136210 const char *line1, *line2;
137211 ssize_t bytesRemaining;
138212
139 - line1 = data;
140 - bytesRemaining = dataLength;
 213+ line1 = block.GetData();
 214+ bytesRemaining = block.GetSize();
141215 while (bytesRemaining) {
142216 size_t lineLength;
143217
@@ -151,25 +225,156 @@
152226 lineLength = bytesRemaining;
153227 }
154228
155 - for (iter = processors.begin(); iter != processors.end(); iter++) {
156 - if (numPipes >= 2 && iter->IsUnsampledPipe()) {
 229+ for (ByFactorIterator bfi = processorsByFactor.begin();
 230+ bfi != processorsByFactor.end(); bfi++)
 231+ {
 232+ if (lineIndex % bfi->first != 0) {
157233 continue;
158234 }
 235+ for (std::vector<int>::iterator bfi2 = bfi->second.begin();
 236+ bfi2 != bfi->second.end(); bfi2++)
 237+ {
 238+ if (teeDone && teePipes.at(*bfi2)) {
 239+ continue;
 240+ }
 241+ LogProcessor & p = *processors[*bfi2];
 242+ ssize_t bytesWritten = p.Write(line1, lineLength);
 243+ if (bytesWritten != (ssize_t)lineLength) {
 244+ // Partial write done
 245+ // Make a backlog entry
 246+ Block block = pool.New();
 247+ block.Append(line1 + bytesWritten, lineLength - bytesWritten);
 248+ p.SetBacklog(block);
 249+ PutOnHoliday(*processors[*bfi2]);
 250+ }
 251+ }
 252+ }
159253
160 - iter->ProcessLine(line1, lineLength);
161 - }
162254 bytesRemaining -= lineLength;
163255 line1 = line2;
 256+ lineIndex++;
164257 }
165258 }
166259
167 -void Udp2LogConfig::PrintLossReport(std::ostream & os)
 260+void Udp2LogConfig::HandleBacklogs() {
 261+ for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) {
 262+ if (!(**iter).IsBacklogged()) {
 263+ continue;
 264+ }
 265+ Block & backlogBlock = (**iter).GetBacklogBlock();
 266+ ssize_t bytesWritten = (**iter).Write(backlogBlock.GetData(), backlogBlock.GetSize());
 267+ if (bytesWritten == (ssize_t)backlogBlock.GetSize()) {
 268+ (**iter).ClearBacklog();
 269+ } else {
 270+ PutOnHoliday(**iter);
 271+ if (bytesWritten != 0) {
 272+ (**iter).SetBacklog(backlogBlock.Offset(bytesWritten));
 273+ }
 274+ }
 275+ }
 276+}
 277+
 278+void Udp2LogConfig::CheckReadiness() {
 279+ epoll->EpollWait(&epollEvents, 0);
 280+ if (epollEvents.size() != numPipeProcessors) {
 281+ readyPipes.assign(numPipeProcessors, false);
 282+ for (MyEventsIterator iter = epollEvents.begin(); iter != epollEvents.end(); iter++) {
 283+ if (iter->first & EPOLLOUT) {
 284+ readyPipes.at(iter->second->GetProcessorIndex()) = true;
 285+ }
 286+ }
 287+
 288+ std::vector<bool>::iterator bi = readyPipes.begin();
 289+ //std::cout << "Check readiness\n";
 290+ while (readyPipes.end() != (bi = find(bi, readyPipes.end(), false))) {
 291+ //std::cout << "Not ready: " << bi - readyPipes.begin() << "\n";
 292+ PutOnHoliday(*processors.at(bi - readyPipes.begin()));
 293+ bi++;
 294+ }
 295+ }
 296+}
 297+
 298+void Udp2LogConfig::HandleTeePipes(const Block & block) {
 299+ // Efficiently send the packet to the unsampled pipes.
 300+ // First load the data into a pipe buffer.
 301+ bufferPipe.writeEnd.Write(block.GetData(), block.GetSize());
 302+
 303+ // Now send it out to all unsampled pipes using the zero-copy tee() syscall.
 304+ for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) {
 305+ if (!teePipes.at(iter - processors.begin())) {
 306+ continue;
 307+ }
 308+ PipeProcessor * p = dynamic_cast<PipeProcessor*>(iter->get());
 309+ ssize_t bytesWritten = p->CopyFromPipe(bufferPipe.readEnd, block.GetSize());
 310+ if (bytesWritten != (ssize_t)block.GetSize()) {
 311+ //std::cout << "Processor[" << (iter - processors.begin()) << "]: teed " <<
 312+ // bytesWritten << "/" << (ssize_t)block.GetSize() << "\n";
 313+ // Partial write done, make a backlog entry and put the
 314+ // processor on an input holiday as punishment
 315+ p->SetBacklog(block.Offset(bytesWritten));
 316+ PutOnHoliday(**iter);
 317+ }
 318+ }
 319+
 320+ // Finally flush the pipe buffer by splicing it out to /dev/null
 321+ bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, block.GetSize(), 0);
 322+}
 323+
 324+void Udp2LogConfig::PutOnHoliday(LogProcessor & p)
168325 {
169 - boost::ptr_vector<LogProcessor>::iterator iter;
 326+ PosixClock::Time endTime = GetCurrentTime();
 327+ int lossRate = p.GetLossRate();
 328+ int inputRate = GetProcessRate();
 329+ int writeRate = inputRate - lossRate;
 330+
 331+ PosixClock::Time interval;
 332+
 333+ if (inputRate * MIN_LOSS_RATIO < lossRate) {
 334+ interval = MIN_HOLIDAY_TIME;
 335+ } else {
 336+ interval = PosixClock::Time((double)BLOCK_SIZE / writeRate);
 337+ if (interval > MAX_HOLIDAY_TIME) {
 338+ interval = MAX_HOLIDAY_TIME;
 339+ } else if (interval < MIN_HOLIDAY_TIME) {
 340+ interval = MIN_HOLIDAY_TIME;
 341+ }
 342+ }
 343+
 344+ endTime += interval;
 345+ p.SetHoliday(endTime);
 346+ //std::cout << "Putting processor on holiday for " << (double)interval << " seconds: " <<
 347+ // p.GetProcessorIndex() << "\n";
 348+}
 349+
 350+void Udp2LogConfig::UpdateCounters()
 351+{
 352+ packetRate.Update();
 353+ processRate.Update();
 354+ for (ProcessorIterator iter = processors.begin(); iter != processors.end(); iter++) {
 355+ (**iter).UpdateLossRate();
 356+ }
 357+}
 358+
 359+void Udp2LogConfig::PrintStatusReport(std::ostream & os)
 360+{
 361+ time_t unixTime = time(NULL);
 362+ char timeBuf[30];
 363+ struct tm * tm = localtime(&unixTime);
 364+ strftime(timeBuf, sizeof(timeBuf), "%F %T UTC%z", tm);
 365+
 366+ ProcessorIterator iter;
170367 for (iter = processors.begin(); iter != processors.end(); iter++) {
171 - uint64_t lost = iter->GetNumLost();
 368+ uint64_t lost = (**iter).GetBytesLost();
172369 if (lost) {
173 - os << "Lost output chunks: " << lost << ": " << iter->GetName() << "\n";
 370+ os << "[" << timeBuf << "] Lost output bytes: " << lost << ": " <<
 371+ (**iter).GetName() << "\n";
 372+ (**iter).ResetBytesLost();
174373 }
175374 }
 375+
 376+ os.precision(3);
 377+ os.flags(std::ios::fixed);
 378+ os << "[" << timeBuf << "] Packet rate: " << ((double)GetPacketRate() / 1000) << " k/s\n";
176379 }
 380+
 381+
Index: trunk/udplog/udp2log/LogProcessor.h
@@ -4,14 +4,18 @@
55 #include <fstream>
66 #include <sys/time.h>
77 #include <string>
 8+#include <boost/shared_ptr.hpp>
89 #include "../srclib/Exception.h"
910 #include "../srclib/Socket.h"
1011 #include "../srclib/Pipe.h"
 12+#include "../srclib/PosixClock.h"
 13+#include "RateAverage.h"
 14+#include "Udp2LogConfig.h"
1115
1216 class LogProcessor
1317 {
1418 public:
15 - virtual void ProcessLine(const char *buffer, size_t size) = 0;
 19+ virtual ssize_t Write(const char *buffer, size_t size) = 0;
1620 virtual void FixIfBroken() {}
1721 virtual ~LogProcessor() {}
1822 virtual bool IsOpen() = 0;
@@ -26,14 +30,55 @@
2731 return factor;
2832 }
2933
30 - uint64_t GetNumLost() { return numLost; }
 34+ uint64_t GetBytesLost() { return bytesLost; }
3135
 36+ void ResetBytesLost() {
 37+ bytesLost = 0;
 38+ }
 39+
 40+ void UpdateLossRate() {
 41+ lossRate.Update();
 42+ }
 43+
 44+ int GetLossRate() {
 45+ return (int)lossRate.GetRate();
 46+ }
 47+
 48+ void SetHoliday(const PosixClock::Time & endTime) {
 49+ holidayEndTime = endTime;
 50+ }
 51+
 52+ bool IsBacklogged() {
 53+ return (bool)backlog.GetSize();
 54+ }
 55+
 56+ Block & GetBacklogBlock() {
 57+ return backlog;
 58+ }
 59+
 60+ void ClearBacklog() {
 61+ backlog = config.GetPool().GetEmptyBlock();
 62+ }
 63+
 64+ void SetBacklog(const Block & block) {
 65+ backlog = block;
 66+ }
 67+
 68+ int GetProcessorIndex() {
 69+ return index;
 70+ }
 71+
 72+ bool IsActive(const PosixClock::Time & currentTime);
 73+ void IncrementBytesLost(size_t bytes);
 74+
3275 protected:
33 - LogProcessor(int factor_, bool flush_)
34 - : counter(0), factor(factor_), flush(flush_), numLost(0)
 76+ LogProcessor(Udp2LogConfig & config_, int index_, int factor_, bool flush_)
 77+ : config(config_), index(index_), counter(0),
 78+ factor(factor_), flush(flush_), bytesLost(0),
 79+ lossRate(Udp2LogConfig::UPDATE_PERIOD, Udp2LogConfig::RATE_PERIOD),
 80+ backlog(config_.GetPool().GetEmptyBlock())
3581 {}
3682
37 -
3883 bool Sample() {
3984 if (factor != 1) {
4085 counter++;
@@ -48,20 +93,27 @@
4994 }
5095 }
5196
 97+ Udp2LogConfig & config;
 98+ int index;
5299 int counter;
53100 int factor;
54101 bool flush;
55 - uint64_t numLost;
 102+ uint64_t bytesLost;
 103+ PosixClock::Time holidayEndTime;
 104+ RateAverage lossRate;
 105+ Block backlog;
56106 };
57107
58108 class FileProcessor : public LogProcessor
59109 {
60110 public:
61 - static LogProcessor * NewFromConfig(char * params, bool flush);
62 - virtual void ProcessLine(const char *buffer, size_t size);
 111+ static boost::shared_ptr<LogProcessor> NewFromConfig(
 112+ Udp2LogConfig & config, int index, char * params, bool flush);
 113+ virtual ssize_t Write(const char *buffer, size_t size);
63114
64 - FileProcessor(char * fileName_, int factor_, bool flush_)
65 - : LogProcessor(factor_, flush_)
 115+ FileProcessor(Udp2LogConfig & config_, int index_, char * fileName_,
 116+ int factor_, bool flush_)
 117+ : LogProcessor(config_, index_, factor_, flush_)
66118 {
67119 fileName = fileName_;
68120 f.open(fileName_, std::ios::app | std::ios::out);
@@ -83,11 +135,13 @@
84136 class PipeProcessor : public LogProcessor
85137 {
86138 public:
87 - static LogProcessor * NewFromConfig(char * params, bool flush, bool blocking);
88 - virtual void ProcessLine(const char *buffer, size_t size);
 139+ static boost::shared_ptr<LogProcessor> NewFromConfig(
 140+ Udp2LogConfig & config, int index, char * params, bool flush, bool blocking);
 141+ virtual ssize_t Write(const char *buffer, size_t size);
89142 virtual void FixIfBroken();
90143
91 - PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_);
 144+ PipeProcessor(Udp2LogConfig & config_, int index_, char * command_, int factor_,
 145+ bool flush_, bool blocking_);
92146
93147 ~PipeProcessor()
94148 {
@@ -107,7 +161,7 @@
108162 return pipes->writeEnd;
109163 }
110164
111 - void CopyFromPipe(Pipe & source, size_t dataLength);
 165+ ssize_t CopyFromPipe(Pipe & source, size_t dataLength);
112166 virtual std::string & GetName() { return command; }
113167
114168 protected:
@@ -115,7 +169,7 @@
116170
117171 void Open();
118172 void Close();
119 - void HandleError(libc_error & e);
 173+ void HandleError(libc_error & e, size_t bytes);
120174
121175 std::string command;
122176
@@ -127,5 +181,4 @@
128182
129183 };
130184
131 -
132185 #endif
Index: trunk/udplog/udp2log/Udp2LogConfig.h
@@ -2,14 +2,19 @@
33 #define UDP2LOGCONFIG_H
44
55 #include <string>
6 -#include <boost/ptr_container/ptr_vector.hpp>
76 #include <stdexcept>
87 #include <iostream>
9 -#include "LogProcessor.h"
 8+#include <vector>
109 #include "../srclib/PosixFile.h"
1110 #include "../srclib/Pipe.h"
 11+#include "../srclib/PosixClock.h"
 12+#include "RateAverage.h"
 13+#include "Block.h"
 14+#include "../srclib/TypedEpollInstance.h"
1215
1316 class ConfigWriteCallback;
 17+class LogProcessor;
 18+class PipeProcessor;
1419
1520 /**
1621 * The configuration and current processing state for udp2log.
@@ -18,25 +23,108 @@
1924 class Udp2LogConfig
2025 {
2126 public:
 27+ typedef boost::shared_ptr<LogProcessor> ProcessorPointer;
 28+
2229 Udp2LogConfig();
2330 void Open(const std::string & name);
2431 void Load();
 32+ void AddUnsampledPipe(ProcessorPointer p);
2533 void FixBrokenProcessors();
2634 void Reload();
27 - void ProcessBuffer(const char *data, size_t dataLength);
28 - void PrintLossReport(std::ostream & os);
 35+ void ProcessBlock(const Block & block);
 36+ void HandleBacklogs();
 37+ void CheckReadiness();
 38+ void HandleTeePipes(const Block & block);
 39+ void PutOnHoliday(LogProcessor & p);
 40+ void UpdateCounters();
 41+ void PrintStatusReport(std::ostream & os);
2942
 43+ void IncrementPacketCount() {
 44+ packetRate.Increment();
 45+ }
 46+
 47+ void IncrementProcessBytes(int bytes) {
 48+ processRate.Increment(bytes);
 49+ }
 50+
 51+ int GetPacketRate() {
 52+ return (int)packetRate.GetRate();
 53+ }
 54+
 55+ int GetProcessRate() {
 56+ return (int)processRate.GetRate();
 57+ }
 58+
3059 inline ConfigWriteCallback GetWriteCallback();
3160
 61+ void ResetCurrentTime() {
 62+ currentTimeValid = false;
 63+ }
 64+
 65+ const PosixClock::Time & GetCurrentTime() {
 66+ if (!currentTimeValid) {
 67+ currentTime = clock.Get();
 68+ currentTimeValid = true;
 69+ }
 70+ return currentTime;
 71+ }
 72+
 73+ BlockPool & GetPool() {
 74+ return pool;
 75+ }
 76+
3277 bool reload;
3378 bool fixBrokenProcessors;
3479
 80+ static const double UPDATE_PERIOD;
 81+ static const double RATE_PERIOD;
 82+ static const double MIN_LOSS_RATIO;
 83+ static const PosixClock::Time MIN_HOLIDAY_TIME;
 84+ static const PosixClock::Time MAX_HOLIDAY_TIME;
 85+ static const size_t BLOCK_SIZE;
 86+
3587 protected:
 88+ // The storage for blocks
 89+ // This declaration has to be above the declaration of any member objects
 90+ // that hold Block references, such as LogProcessor, so that destruction of
 91+ // the Block references occurs before the destruction of the pool.
 92+ BlockPool pool;
 93+
3694 std::string fileName;
37 - boost::ptr_vector<LogProcessor> processors;
 95+
 96+ typedef std::vector<boost::shared_ptr<LogProcessor> > ProcessorArray;
 97+ typedef ProcessorArray::iterator ProcessorIterator;
 98+ ProcessorArray processors;
 99+
38100 PipePair bufferPipe;
39101
40102 static PosixFile devNull;
 103+ RateAverage packetRate, processRate;
 104+
 105+ typedef TypedEpollInstance<PipeProcessor> MyEpoll;
 106+ typedef boost::shared_ptr<MyEpoll> MyEpollPointer;
 107+ MyEpollPointer epoll;
 108+
 109+ typedef MyEpoll::TypedEventArray MyEpollEvents;
 110+ typedef MyEpollEvents::iterator MyEventsIterator;
 111+ // This is a temporary vector used to hold the result of an epoll_wait().
 112+ // It's in the object state instead of the stack to avoid reallocation
 113+ // during the high-performance sections of the code. The same goes for
 114+ // teePipes and readyPipes.
 115+ MyEpollEvents epollEvents;
 116+
 117+ bool currentTimeValid;
 118+ PosixClock::Time currentTime;
 119+ PosixClock clock;
 120+ std::vector<bool> teePipes;
 121+ size_t numPipeProcessors;
 122+ std::vector<bool> readyPipes;
 123+
 124+ typedef std::map<int, std::vector<int> > ByFactorArray;
 125+ typedef ByFactorArray::iterator ByFactorIterator;
 126+ ByFactorArray processorsByFactor;
 127+
 128+ unsigned lineIndex;
41129 };
42130
43131 class ConfigWatcher
@@ -63,8 +151,8 @@
64152 : config(config_)
65153 {}
66154
67 - void operator()(const char* buffer, size_t bufSize) {
68 - return config.ProcessBuffer(buffer, bufSize);
 155+ void operator()(Block block) {
 156+ return config.ProcessBlock(block);
69157 }
70158 protected:
71159 Udp2LogConfig & config;
@@ -74,5 +162,6 @@
75163 return ConfigWriteCallback(*this);
76164 }
77165
 166+#include "LogProcessor.h"
78167
79168 #endif
Index: trunk/udplog/udp2log/SendBuffer.h
@@ -4,24 +4,23 @@
55 template <class Callback>
66 class SendBuffer {
77 public:
8 - SendBuffer(size_t capacity_, Callback writeCallback_)
9 - : capacity(capacity_), size(0), writeCallback(writeCallback_)
10 - {
11 - data = new char[capacity];
12 - }
 8+ SendBuffer(BlockPool & pool_, size_t capacity_, Callback writeCallback_)
 9+ : pool(pool_), block(pool.New()), capacity(capacity_), writeCallback(writeCallback_)
 10+ {}
1311
1412 void Flush() {
15 - if (!size) {
 13+ if (!block.GetSize()) {
1614 return;
1715 }
18 - writeCallback(data, size);
19 - size = 0;
 16+ writeCallback(block);
 17+ block = pool.New();
2018 }
2119
2220 void Write(const char* buffer, size_t bufSize);
2321 protected:
24 - char * data;
25 - size_t capacity, size;
 22+ BlockPool & pool;
 23+ Block block;
 24+ size_t capacity;
2625 Callback writeCallback;
2726 };
2827
@@ -29,15 +28,13 @@
3029 void SendBuffer<Callback>::Write(const char* buffer, size_t bufSize)
3130 {
3231 if (bufSize > capacity) {
 32+ // Truncate oversize writes
 33+ bufSize = capacity;
 34+ }
 35+
 36+ if (block.GetSize() + bufSize > capacity) {
3337 Flush();
34 - writeCallback(buffer, bufSize);
35 - } else if (size + bufSize > capacity) {
36 - Flush();
37 - memcpy(data, buffer, bufSize);
38 - size = bufSize;
39 - } else {
40 - memcpy(data + size, buffer, bufSize);
41 - size += bufSize;
4238 }
 39+ block.Append(buffer, bufSize);
4340 }
4441 #endif
Index: trunk/udplog/udp2log/RateAverage.h
@@ -0,0 +1,33 @@
 2+#ifndef UDPLOG_RATEAVERAGE_H
 3+#define UDPLOG_RATEAVERAGE_H
 4+
 5+#include <cmath>
 6+#include <iostream>
 7+
 8+class RateAverage {
 9+public:
 10+ RateAverage(double samplePeriod_, double averagePeriod_)
 11+ : samplePeriod(samplePeriod_), averagePeriod(averagePeriod_)
 12+ {
 13+ constant = 1. - std::exp(-std::log(2) / averagePeriod_ * samplePeriod_);
 14+ }
 15+
 16+ void Increment(int x = 1) {
 17+ currentSample += x;
 18+ }
 19+
 20+ void Update() {
 21+ estimatedEventsPerSample = constant * currentSample +
 22+ (1. - constant ) * estimatedEventsPerSample;
 23+ currentSample = 0;
 24+ }
 25+
 26+ double GetRate() {
 27+ return estimatedEventsPerSample / samplePeriod;
 28+ }
 29+protected:
 30+ int64_t currentSample;
 31+ double samplePeriod, averagePeriod, estimatedEventsPerSample, constant;
 32+};
 33+
 34+#endif
Property changes on: trunk/udplog/udp2log/RateAverage.h
___________________________________________________________________
Added: svn:eol-style
135 + native
Index: trunk/udplog/udp2log/Makefile
@@ -0,0 +1,2 @@
 2+all:
 3+ make -C ..
Property changes on: trunk/udplog/udp2log/Makefile
___________________________________________________________________
Added: svn:eol-style
14 + native
Index: trunk/udplog/udp2log/udp2log.cpp
@@ -19,12 +19,14 @@
2020 #include "../srclib/Socket.h"
2121 #include "Udp2LogConfig.h"
2222 #include "SendBuffer.h"
 23+#include "../srclib/EpollInstance.h"
2324
2425 std::string configFileName("/etc/udp2log");
2526 std::string logFileName("/var/log/udp2log/udp2log.log");
2627 std::string pidFileName("/var/run/udp2log.pid");
2728 std::string daemonUserName("udp2log");
2829 std::string multicastAddr("0");
 30+int udpReceiveQueue = 128; // KB
2931
3032 Udp2LogConfig config;
3133
@@ -119,7 +121,6 @@
120122 // Process command line
121123 options_description optDesc;
122124 optDesc.add_options()
123 - ("multicast", value<string>(&multicastAddr)->default_value(multicastAddr), "multicast address to listen to")
124125 ("help", "Show help message.")
125126 ("port,p", value<unsigned int>(&port)->default_value(port), "UDP port.")
126127 ("config-file,f", value<string>(&configFileName)->default_value(configFileName),
@@ -130,7 +131,11 @@
131132 ("pid-file", value<string>(&pidFileName)->default_value(pidFileName),
132133 "The location to write the new PID, if --daemon is specified.")
133134 ("user", value<string>(&daemonUserName)->default_value(daemonUserName),
134 - "User to switch to, after daemonizing");
 135+ "User to switch to, after daemonizing")
 136+ ("multicast", value<string>(&multicastAddr)->default_value(multicastAddr),
 137+ "Multicast address to listen to")
 138+ ("recv-queue", value<int>(&udpReceiveQueue)->default_value(udpReceiveQueue),
 139+ "The size of the kernel UDP receive buffer, in KB");
135140
136141 variables_map vm;
137142 try {
@@ -179,8 +184,14 @@
180185 return 1;
181186 }
182187
 188+ // Set up signals
183189 signal(SIGHUP, OnHangup);
184 - signal(SIGALRM, OnAlarm);
 190+
 191+ struct sigaction sa;
 192+ memset(&sa, 0, sizeof(sa));
 193+ sa.sa_handler = OnAlarm;
 194+ sigaction(SIGALRM, &sa, NULL);
 195+
185196 signal(SIGPIPE, SIG_IGN);
186197
187198 // Open the receiving socket
@@ -191,6 +202,7 @@
192203 return 1;
193204 }
194205 socket.SetDescriptorFlags(FD_CLOEXEC);
 206+ socket.SetSockOpt(SOL_SOCKET, SO_RCVBUF, udpReceiveQueue * 1024);
195207 socket.Bind(saddr);
196208
197209 // Join a multicast group if requested
@@ -204,35 +216,64 @@
205217 }
206218
207219 // Process received packets
208 - boost::shared_ptr<SocketAddress> address;
209220 const size_t bufSize = 65536;
 221+
210222 char receiveBuffer[bufSize];
211223 ssize_t bytesRead;
212 - SendBuffer<ConfigWriteCallback> sendBuffer(PIPE_BUF, config.GetWriteCallback());
 224+ SendBuffer<ConfigWriteCallback> sendBuffer(
 225+ config.GetPool(),
 226+ Udp2LogConfig::BLOCK_SIZE,
 227+ config.GetWriteCallback());
 228+
 229+ const PosixClock::Time reportInterval(5, 0);
 230+ const PosixClock::Time updateInterval(Udp2LogConfig::UPDATE_PERIOD);
 231+ PosixClock clock(CLOCK_REALTIME);
 232+ PosixClock::Time nextReportTime = clock.Get() + reportInterval;
 233+ PosixClock::Time nextUpdateTime = clock.Get() + updateInterval;
 234+ struct itimerval itv;
 235+ itv.it_interval.tv_sec = 0;
 236+ itv.it_interval.tv_usec = 250000;
 237+ itv.it_value = itv.it_interval;
 238+ setitimer(ITIMER_REAL, &itv, NULL);
213239
214 - time_t lossReportTime = 0;
 240+ FileDescriptor::ErrorSetPointer ignoredErrors(new FileDescriptor::ErrorSet);
 241+ ignoredErrors->insert(EINTR);
 242+ socket.Ignore(ignoredErrors);
215243
216244 for (;;) {
217 - bytesRead = socket.RecvFrom(receiveBuffer, bufSize, address);
218 - if (bytesRead <= 0) {
219 - continue;
220 - }
 245+ bytesRead = socket.Recv(receiveBuffer, bufSize);
221246
222247 // Reload configuration
223248 try {
224249 config.Reload();
225250 } catch (runtime_error & e) {
226251 cerr << e.what() << endl;
227 - continue;
228252 }
229253
230 - sendBuffer.Write(receiveBuffer, bytesRead);
 254+ if (bytesRead <= 0) {
 255+ // Timeout
 256+ sendBuffer.Flush();
 257+ } else {
 258+ sendBuffer.Write(receiveBuffer, bytesRead);
 259+ config.IncrementPacketCount();
 260+ }
231261
232 - time_t currentTime = time(NULL);
233 - if (currentTime - lossReportTime > 60) {
234 - config.PrintLossReport(std::cout);
235 - lossReportTime = currentTime;
 262+ // Counter update
 263+ PosixClock::Time currentTime = clock.Get();
 264+ if (currentTime >= nextUpdateTime) {
 265+ while (currentTime >= nextUpdateTime) {
 266+ nextUpdateTime += updateInterval;
 267+ }
 268+ config.UpdateCounters();
236269 }
 270+
 271+ // Status report
 272+ if (currentTime >= nextReportTime) {
 273+ while (currentTime >= nextReportTime) {
 274+ nextReportTime += reportInterval;
 275+ }
 276+ config.PrintStatusReport(std::cout);
 277+ }
237278 }
238279 }
239280
Index: trunk/udplog/Makefile
@@ -1,5 +1,5 @@
22 TARGETS = log2udp udprecv delta udp2log/udp2log packet-loss
3 -SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o
 3+SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o srclib/PosixClock.o
44 HOST_OBJS = srcmisc/host.o $(SRCLIB_OBJS)
55 LOG2UDP_OBJS = srcmisc/log2udp.o $(SRCLIB_OBJS)
66 UDPRECV_OBJS = srcmisc/udprecv.o $(SRCLIB_OBJS)
@@ -30,7 +30,7 @@
3131 g++ $(CFLAGS) -o packet-loss srcmisc/packet-loss.cpp
3232
3333 udp2log/udp2log: $(UDP2LOG_OBJS)
34 - g++ $(CFLAGS) -o udp2log/udp2log $(UDP2LOG_OBJS) -lboost_program_options
 34+ g++ $(CFLAGS) -o udp2log/udp2log $(UDP2LOG_OBJS) -lboost_program_options -lrt
3535
3636 install:
3737 install log2udp $(DESTDIR)/usr/bin/log2udp
Index: trunk/udplog/srclib/Pipe.h
@@ -6,6 +6,8 @@
77 #endif
88 #include <fcntl.h>
99
 10+#include "FileDescriptor.h"
 11+
1012 class Pipe : public FileDescriptor
1113 {
1214 public:
Index: trunk/udplog/srclib/EpollInstance.h
@@ -0,0 +1,92 @@
 2+#ifndef UDPLOG_EPOLL_H
 3+#define UDPLOG_EPOLL_H
 4+
 5+#include "FileDescriptor.h"
 6+#include <sys/epoll.h>
 7+#include <boost/shared_ptr.hpp>
 8+
 9+class EpollInstance : public FileDescriptor
 10+{
 11+public:
 12+ typedef std::vector<struct epoll_event> EventArray;
 13+
 14+ EpollInstance(int flags = 0)
 15+ : FileDescriptor()
 16+ {
 17+ fd = epoll_create1(flags);
 18+ if (fd == -1) {
 19+ good = false;
 20+ RaiseError("EpollInstance constructor");
 21+ } else {
 22+ good = true;
 23+ }
 24+ }
 25+
 26+ // epoll_ctl(EPOLL_CTL_ADD) wrapper
 27+ int AddPoll(const FileDescriptor & polled, struct epoll_event & event) {
 28+ if (epoll_ctl(fd, EPOLL_CTL_ADD, polled.GetFD(), &event) == -1) {
 29+ RaiseError("EpollInstance::AddPoll");
 30+ return errno;
 31+ } else {
 32+ return 0;
 33+ }
 34+ }
 35+
 36+ // epoll_ctl(EPOLL_CTL_MOD) wrapper
 37+ int ModifyPoll(const FileDescriptor & polled, struct epoll_event & event) {
 38+ if (epoll_ctl(fd, EPOLL_CTL_MOD, polled.GetFD(), &event) == -1) {
 39+ RaiseError("EpollInstance::ModifyPoll");
 40+ return errno;
 41+ } else {
 42+ return 0;
 43+ }
 44+ }
 45+
 46+ // epoll_ctl(EPOLL_CTL_DEL) wrapper
 47+ int DeletePoll(const FileDescriptor & polled) {
 48+ static struct epoll_event dummy;
 49+
 50+ if (epoll_ctl(fd, EPOLL_CTL_DEL, polled.GetFD(), &dummy) == -1) {
 51+ RaiseError("EpollInstance::DeletePoll");
 52+ return errno;
 53+ } else {
 54+ return 0;
 55+ }
 56+ }
 57+
 58+ enum {DEFAULT_WAIT_SIZE = 10};
 59+
 60+ // epoll_wait() wrapper
 61+ int EpollWait(EventArray * events, int timeout) {
 62+ if (!events->size()) {
 63+ events->resize(DEFAULT_WAIT_SIZE);
 64+ }
 65+ int result = epoll_wait(fd, &((*events)[0]), events->size(), timeout);
 66+ if (result == -1) {
 67+ events->resize(0);
 68+ RaiseError("EpollInstance::EpollWait");
 69+ } else {
 70+ events->resize(result);
 71+ }
 72+ return result;
 73+ }
 74+
 75+ // epoll_pwait() wrapper
 76+ int EpollSignalWait(EventArray * events,
 77+ int timeout, const sigset_t & sigmask) {
 78+ if (!events->size()) {
 79+ events->resize(DEFAULT_WAIT_SIZE);
 80+ }
 81+ int result = epoll_pwait(fd, &((*events)[0]), events->size(), timeout, &sigmask);
 82+ if (result == -1) {
 83+ events->resize(0);
 84+ RaiseError("EpollInstance::EpollSignalWait");
 85+ } else {
 86+ events->resize(result);
 87+ }
 88+ return result;
 89+ }
 90+
 91+};
 92+
 93+#endif
Property changes on: trunk/udplog/srclib/EpollInstance.h
___________________________________________________________________
Added: svn:eol-style
194 + native
Index: trunk/udplog/srclib/PosixFile.h
@@ -1,6 +1,8 @@
22 #ifndef UDPLOG_POSIXFILE_H
33 #define UDPLOG_POSIXFILE_H
44
 5+#include "FileDescriptor.h"
 6+
57 class PosixFile : public FileDescriptor
68 {
79 public:
Index: trunk/udplog/srclib/TypedEpollInstance.h
@@ -0,0 +1,113 @@
 2+#ifndef UDPLOG_TYPEDEPOLLINSTANCE_H
 3+#define UDPLOG_TYPEDEPOLLINSTANCE_H
 4+
 5+#include "EpollInstance.h"
 6+#include <map>
 7+#include <boost/shared_ptr.hpp>
 8+
 9+// An epoll wrapper where the user data is a smart pointer of the specified type.
 10+//
 11+// A smart pointer reference to the data object will be held until the
 12+// relevant FD is removed from the poll set.
 13+//
 14+// Note that if you close an FD without calling DeletePoll(), events may
 15+// be delivered with with the wrong data pointer.
 16+
 17+template <class DataType>
 18+class TypedEpollInstance : public EpollInstance
 19+{
 20+public:
 21+ typedef boost::shared_ptr<DataType> DataPointer;
 22+ typedef std::vector<std::pair<uint32_t, DataPointer> > TypedEventArray;
 23+
 24+ int AddPoll(const FileDescriptor & polled, uint32_t events, DataPointer data);
 25+ int ModifyPoll(const FileDescriptor & polled, uint32_t events, DataPointer data);
 26+ int DeletePoll(const FileDescriptor & polled);
 27+ int EpollWait(TypedEventArray * events, int timeout);
 28+ int EpollSignalWait(TypedEventArray * events, int timeout, const sigset_t & sigmask);
 29+protected:
 30+ void EpollSetup(TypedEventArray * events);
 31+ void EpollFinish(TypedEventArray * events);
 32+
 33+ typedef std::map<int, DataPointer> DataPointerMap;
 34+
 35+ DataPointerMap pointers;
 36+ EventArray rawEvents;
 37+};
 38+
 39+template <class DataType>
 40+int
 41+TypedEpollInstance<DataType>::AddPoll(
 42+ const FileDescriptor & polled, uint32_t events, DataPointer data)
 43+{
 44+ struct epoll_event epe;
 45+ epe.events = events;
 46+ epe.data.fd = polled.GetFD();
 47+ pointers[polled.GetFD()] = data;
 48+ return EpollInstance::AddPoll(polled, epe);
 49+}
 50+
 51+template <class DataType>
 52+int
 53+TypedEpollInstance<DataType>::ModifyPoll(
 54+ const FileDescriptor & polled, uint32_t events, DataPointer data)
 55+{
 56+ struct epoll_event epe;
 57+ epe.events = events;
 58+ epe.data.fd = polled.GetFD();
 59+ pointers[polled.GetFD()] = data;
 60+ return EpollInstance::ModifyPoll(polled, epe);
 61+}
 62+
 63+template <class DataType>
 64+int
 65+TypedEpollInstance<DataType>::DeletePoll(const FileDescriptor & polled)
 66+{
 67+ pointers.erase(polled.GetFD());
 68+ return EpollInstance::DeletePoll(polled);
 69+}
 70+
 71+template <class DataType>
 72+void
 73+TypedEpollInstance<DataType>::EpollSetup(TypedEventArray * events)
 74+{
 75+ if (!events->size()) {
 76+ rawEvents.resize(DEFAULT_WAIT_SIZE);
 77+ } else {
 78+ rawEvents.resize(events->size());
 79+ }
 80+}
 81+
 82+template <class DataType>
 83+void
 84+TypedEpollInstance<DataType>::EpollFinish(TypedEventArray * events)
 85+{
 86+ events->resize(rawEvents.size());
 87+ for (unsigned i = 0; i < rawEvents.size(); i++) {
 88+ (*events)[i].first = rawEvents[i].events;
 89+ (*events)[i].second = pointers[rawEvents[i].data.fd];
 90+ }
 91+}
 92+
 93+template <class DataType>
 94+int
 95+TypedEpollInstance<DataType>::EpollWait(TypedEventArray * events, int timeout)
 96+{
 97+ EpollSetup(events);
 98+ int result = EpollInstance::EpollWait(&rawEvents, timeout);
 99+ EpollFinish(events);
 100+ return result;
 101+}
 102+
 103+template <class DataType>
 104+int
 105+TypedEpollInstance<DataType>::EpollSignalWait(TypedEventArray * events, int timeout,
 106+ const sigset_t & sigmask)
 107+{
 108+ EpollSetup(events);
 109+ int result = EpollInstance::EpollSignalWait(&rawEvents, timeout, sigmask);
 110+ EpollFinish(events);
 111+ return result;
 112+}
 113+
 114+#endif
Property changes on: trunk/udplog/srclib/TypedEpollInstance.h
___________________________________________________________________
Added: svn:eol-style
1115 + native
Index: trunk/udplog/srclib/FileDescriptor.cpp
@@ -8,7 +8,7 @@
99 throw libc_error(msg);
1010 }
1111
12 - std::set<int> * curIgnore = ignoreErrors.back().get();
 12+ ErrorSet * curIgnore = ignoreErrors.back().get();
1313 if (!curIgnore) {
1414 // Ignore all
1515 return;
Index: trunk/udplog/srclib/PosixClock.cpp
@@ -0,0 +1,41 @@
 2+#include <time.h>
 3+#include <limits>
 4+#include <cmath>
 5+
 6+#include "PosixClock.h"
 7+
 8+const PosixClock::Time PosixClock::Time::MAX(std::numeric_limits<time_t>::max(), BILLION - 1);
 9+const PosixClock::Time PosixClock::Time::MIN(std::numeric_limits<time_t>::min(), 0);
 10+const PosixClock::Time PosixClock::Time::ZERO(0, 0);
 11+
 12+PosixClock::Time::Time(double seconds) {
 13+ if (seconds > std::numeric_limits<time_t>::max()) {
 14+ *this = MAX;
 15+ } else if (seconds < std::numeric_limits<time_t>::min()) {
 16+ *this = MIN;
 17+ } else {
 18+ double intPart = std::floor(seconds);
 19+ data.tv_sec = (time_t)intPart;
 20+ data.tv_nsec = (long)((seconds - intPart) * BILLION);
 21+ }
 22+ Normalise();
 23+}
 24+
 25+void PosixClock::Time::FixNormalisation()
 26+{
 27+ if (data.tv_nsec < 0) {
 28+ long seconds = (-data.tv_nsec) / BILLION;
 29+ data.tv_sec -= seconds;
 30+ data.tv_nsec += seconds * BILLION;
 31+ }
 32+ if (data.tv_nsec >= BILLION) {
 33+ long seconds = data.tv_nsec / BILLION;
 34+ data.tv_sec += seconds;
 35+ data.tv_nsec -= seconds * BILLION;
 36+ }
 37+}
 38+
 39+PosixClock::Time::operator double()
 40+{
 41+ return (double)data.tv_sec + ((double)data.tv_nsec) / BILLION;
 42+}
Property changes on: trunk/udplog/srclib/PosixClock.cpp
___________________________________________________________________
Added: svn:eol-style
143 + native
Index: trunk/udplog/srclib/FileDescriptor.h
@@ -21,6 +21,9 @@
2222 : fd(-1), good(false), ownFd(true)
2323 {}
2424 public:
 25+
 26+ typedef std::set<int> ErrorSet;
 27+ typedef boost::shared_ptr<ErrorSet> ErrorSetPointer;
2528
2629 virtual ~FileDescriptor() {
2730 Close();
@@ -111,13 +114,13 @@
112115 }
113116
114117 // Ignore a given set of errors
115 - void Ignore(boost::shared_ptr<std::set<int> > s) {
 118+ void Ignore(ErrorSetPointer s) {
116119 ignoreErrors.push_back(s);
117120 }
118121
119122 // Ignore all errors
120123 void IgnoreAll() {
121 - ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL));
 124+ ignoreErrors.push_back(ErrorSetPointer((std::set<int>*)NULL));
122125 }
123126
124127 // Restore the previous ignore set
@@ -131,6 +134,6 @@
132135 int fd;
133136 bool good;
134137 bool ownFd;
135 - std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors;
 138+ std::vector<ErrorSetPointer> ignoreErrors;
136139 };
137140 #endif
Index: trunk/udplog/srclib/PosixClock.h
@@ -0,0 +1,112 @@
 2+#ifndef UDPLOG_POSIXCLOCK_H
 3+#define UDPLOG_POSIXCLOCK_H
 4+
 5+#include <time.h>
 6+#include <boost/operators.hpp>
 7+#include "Exception.h"
 8+class PosixClock
 9+{
 10+public:
 11+ class Time : boost::additive<Time,
 12+ boost::totally_ordered<Time
 13+ > >
 14+ {
 15+ public:
 16+ Time() {
 17+ data.tv_sec = data.tv_nsec = 0;
 18+ }
 19+
 20+ Time(time_t sec, long nsec = 0) {
 21+ data.tv_sec = sec;
 22+ data.tv_nsec = nsec;
 23+ Normalise();
 24+ }
 25+
 26+ // Saturates at MIN or MAX
 27+ Time(double seconds);
 28+
 29+ Time & operator+=(const Time & other) {
 30+ data.tv_sec += other.data.tv_sec;
 31+ data.tv_nsec += other.data.tv_nsec;
 32+ Normalise();
 33+ return *this;
 34+ }
 35+
 36+ Time & operator-=(const Time & other) {
 37+ data.tv_sec -= other.data.tv_sec;
 38+ data.tv_nsec -= other.data.tv_nsec;
 39+ Normalise();
 40+ return *this;
 41+ }
 42+
 43+ operator double();
 44+
 45+ enum {BILLION = 1000000000};
 46+
 47+ struct timespec data;
 48+ static const Time MAX;
 49+ static const Time MIN;
 50+ static const Time ZERO;
 51+
 52+ protected:
 53+
 54+ void Normalise() {
 55+ if (data.tv_nsec >= BILLION || data.tv_nsec < 0) {
 56+ FixNormalisation();
 57+ }
 58+ }
 59+
 60+ void FixNormalisation();
 61+ };
 62+
 63+ PosixClock(clockid_t id_)
 64+ : id(id_), resSet(false)
 65+ {}
 66+
 67+ Time Get() {
 68+ Time t;
 69+ int result = clock_gettime(id, &t.data);
 70+ if (result == -1) {
 71+ throw libc_error("PosixClock::Get");
 72+ }
 73+ return t;
 74+ }
 75+
 76+ void Set(const Time & t) {
 77+ int result = clock_settime(id, &t.data);
 78+ if (result == -1) {
 79+ throw libc_error("PosixClock::Set");
 80+ }
 81+ }
 82+
 83+ Time GetRes() {
 84+ if (!resSet) {
 85+ resSet = true;
 86+ clock_getres(id, &res.data);
 87+ }
 88+ return res;
 89+ }
 90+
 91+protected:
 92+ clockid_t id;
 93+ Time res;
 94+ bool resSet;
 95+};
 96+
 97+inline bool operator==(const PosixClock::Time & a, const PosixClock::Time & b)
 98+{
 99+ return a.data.tv_sec == b.data.tv_sec && a.data.tv_nsec == b.data.tv_nsec;
 100+}
 101+
 102+inline bool operator<(const PosixClock::Time & a, const PosixClock::Time & b)
 103+{
 104+ if (a.data.tv_sec < b.data.tv_sec) {
 105+ return true;
 106+ }
 107+ if (a.data.tv_nsec < b.data.tv_nsec) {
 108+ return true;
 109+ }
 110+ return false;
 111+}
 112+
 113+#endif
Property changes on: trunk/udplog/srclib/PosixClock.h
___________________________________________________________________
Added: svn:eol-style
1114 + native
Index: trunk/udplog/srclib/Socket.h
@@ -102,10 +102,32 @@
103103 ssize_t length = recvfrom(fd, buf, len, flags, SocketAddress::GetBuffer(), &addrLength);
104104 if (length == (ssize_t)-1) {
105105 RaiseError("Socket::RecvFrom");
 106+ } else {
 107+ to = SocketAddress::NewFromBuffer();
106108 }
107 - to = SocketAddress::NewFromBuffer();
108109 return length;
109110 }
 111+
 112+ int SetSockOpt(int level, int optName, const void * optValue, socklen_t optLength) {
 113+ int result = setsockopt(fd, level, optName, optValue, optLength);
 114+ if (result == -1) {
 115+ RaiseError("Socket::SetSockOpt");
 116+ return errno;
 117+ }
 118+ return result;
 119+ }
 120+
 121+ int SetSockOpt(int level, int optName, bool optValue) {
 122+ return SetSockOpt(level, optName, (int)optValue);
 123+ }
 124+
 125+ int SetSockOpt(int level, int optName, int optValue) {
 126+ return SetSockOpt(level, optName, &optValue, sizeof(int));
 127+ }
 128+
 129+ int SetSockOpt(int level, int optName, const std::string & optValue) {
 130+ return SetSockOpt(level, optName, optValue.data(), optValue.size());
 131+ }
110132 protected:
111133 boost::shared_ptr<SocketAddress> peer;
112134 };

Status & tagging log