r100961 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r100960‎ | r100961 | r100962 >
Date:11:42, 27 October 2011
Author:tstarling
Status:deferred
Tags:
Comment:
Intermediate commit for testing only, do not deploy.
* Implemented non-blocking writes to pipes
* Implemented tee/splice zero-copy multiplexing for unsampled pipes
* Use pipe/fork/exec instead of popen to obtain reliable access to the underlying FD for non-blocking writing
* Added a new base class for Socket called FileDescriptor. Moved generic FD functionality from Socket to FileDescriptor. Added some commonly-used wrapper functions, more can be added as the need arises.
* Added PosixFile, a FileDescriptor subclass for actual files, only used for /dev/null at the moment
* Added Pipe and PipePair, to manage pipes in the FileDescriptor hierarchy
* Added "blocking" option to the configuration syntax, analogous with "flush". Flush doesn't work with pipes anymore, instead sampled pipes are directly written to and unsampled pipes are buffered internally. Non-blocking I/O doesn't work with files yet.
* Added loss monitoring for non-blocking pipes. A report is written to stdout once per minute.
* Added a generic SendBuffer template class, which is used to buffer writes to unsampled pipes.
* In libc_error, provide the errno as well as the string explanation
* Removed a couple of unused function definitions from Socket
* Allowed the const plague to spread somewhat
* Refactored the makefile variables
Modified paths:
  • /trunk/udplog/Makefile (modified) (history)
  • /trunk/udplog/srclib/Exception.h (modified) (history)
  • /trunk/udplog/srclib/FileDescriptor.cpp (added) (history)
  • /trunk/udplog/srclib/FileDescriptor.h (added) (history)
  • /trunk/udplog/srclib/Pipe.h (added) (history)
  • /trunk/udplog/srclib/PosixFile.h (added) (history)
  • /trunk/udplog/srclib/Socket.cpp (modified) (history)
  • /trunk/udplog/srclib/Socket.h (modified) (history)
  • /trunk/udplog/udp2log/LogProcessor.cpp (modified) (history)
  • /trunk/udplog/udp2log/LogProcessor.h (modified) (history)
  • /trunk/udplog/udp2log/SendBuffer.h (added) (history)
  • /trunk/udplog/udp2log/Udp2LogConfig.cpp (modified) (history)
  • /trunk/udplog/udp2log/Udp2LogConfig.h (modified) (history)
  • /trunk/udplog/udp2log/udp2log.cpp (modified) (history)

Diff [purge]

Index: trunk/udplog/udp2log/LogProcessor.cpp
@@ -1,5 +1,9 @@
22 #include <stdio.h>
33 #include <iostream>
 4+#include <unistd.h>
 5+#include <linux/limits.h>
 6+#include <sys/types.h>
 7+#include <sys/wait.h>
48 #include "LogProcessor.h"
59 #include "Udp2LogConfig.h"
610
@@ -31,7 +35,7 @@
3236 return (LogProcessor*)fp;
3337 }
3438
35 -void FileProcessor::ProcessLine(char *buffer, size_t size)
 39+void FileProcessor::ProcessLine(const char *buffer, size_t size)
3640 {
3741 if (Sample()) {
3842 f.write(buffer, size);
@@ -45,8 +49,15 @@
4650 // PipeProcessor
4751 //---------------------------------------------------------------------------
4852
49 -LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush)
 53+PipeProcessor::PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_)
 54+ : LogProcessor(factor_, flush_), child(0), blocking(blocking_)
5055 {
 56+ command = command_;
 57+ Open();
 58+}
 59+
 60+LogProcessor * PipeProcessor::NewFromConfig(char * params, bool flush, bool blocking)
 61+{
5162 char * strFactor = strtok(params, " \t");
5263 if (strFactor == NULL) {
5364 throw ConfigError(
@@ -60,7 +71,7 @@
6172 );
6273 }
6374 char * command = strtok(NULL, "");
64 - PipeProcessor * pp = new PipeProcessor(command, factor, flush);
 75+ PipeProcessor * pp = new PipeProcessor(command, factor, flush, blocking);
6576 if (!pp->IsOpen()) {
6677 delete pp;
6778 throw ConfigError("Unable to open pipe");
@@ -69,37 +80,54 @@
7081 return (LogProcessor*)pp;
7182 }
7283
73 -void PipeProcessor::ProcessLine(char *buffer, size_t size)
 84+void PipeProcessor::HandleError(libc_error & e)
7485 {
75 - if (!f) {
 86+ bool restart;
 87+ if (e.code == EAGAIN) {
 88+ numLost++;
 89+ restart = false;
 90+ } else if (e.code == EPIPE) {
 91+ std::cerr << "Pipe terminated, suspending output: " << command << std::endl;
 92+ restart = true;
 93+ } else {
 94+ std::cerr << "Error writing to pipe: " << e.what() << std::endl;
 95+ restart = true;
 96+ }
 97+ if (restart) {
 98+ Close();
 99+ // Reopen it after a few seconds
 100+ alarm(RESTART_INTERVAL);
 101+ }
 102+}
 103+
 104+void PipeProcessor::ProcessLine(const char *buffer, size_t size)
 105+{
 106+ if (!child) {
76107 return;
77108 }
78109
 110+
79111 if (Sample()) {
80 - fwrite(buffer, 1, size, f);
81 - if (ferror(f)) {
82 - if (errno == EPIPE) {
83 - std::cerr << "Pipe terminated, suspending output: " << command << std::endl;
 112+ try {
 113+ if (blocking && size > PIPE_BUF) {
 114+ // Write large packets in blocking mode to preserve data integrity
 115+ GetPipe().SetStatusFlags(0);
 116+ GetPipe().Write(buffer, size);
 117+ GetPipe().SetStatusFlags(O_NONBLOCK);
84118 } else {
85 - std::cerr << "Error writing to pipe: " << strerror(errno) << std::endl;
 119+ GetPipe().Write(buffer, size);
86120 }
87 - pclose(f);
88 - f = NULL;
89 - // Reopen it after a few seconds
90 - alarm(RESTART_INTERVAL);
91 - } else {
92 - if (flush) {
93 - fflush(f);
94 - }
 121+ } catch (libc_error & e) {
 122+ HandleError(e);
95123 }
96124 }
97125 }
98126
99127 void PipeProcessor::FixIfBroken()
100128 {
101 - if (!f) {
102 - f = popen(command, "w");
103 - if (!f) {
 129+ if (!child) {
 130+ Open();
 131+ if (!child) {
104132 std::cerr << "Unable to restart pipe: " << command << std::endl;
105133 // Try again later
106134 alarm(RESTART_INTERVAL);
@@ -108,3 +136,71 @@
109137 }
110138 }
111139 }
 140+
 141+void PipeProcessor::Close()
 142+{
 143+ if (child) {
 144+ int status = 0;
 145+ waitpid(child, &status, 0);
 146+ GetPipe().Close();
 147+ child = 0;
 148+ }
 149+}
 150+
 151+void PipeProcessor::Open()
 152+{
 153+ if (child) {
 154+ throw std::runtime_error("PipeProcessor::Open called but the pipe is already open");
 155+ }
 156+ pipes = PipePairPointer(new PipePair);
 157+ child = fork();
 158+ if (!child) {
 159+ // This is the child process
 160+ pipes->writeEnd.Close();
 161+ pipes->readEnd.Dup2(STDIN_FILENO);
 162+ pipes->readEnd.Close();
 163+
 164+ // Run the command, similar to how popen() does it
 165+ std::string fullCommand = std::string("exec ") + command;
 166+ execl("/bin/sh", "sh", "-c", fullCommand.c_str(), NULL);
 167+ throw libc_error("PipeProcessor::Open");
 168+ }
 169+
 170+ if (child == -1) {
 171+ // Fork failed
 172+ child = 0;
 173+ throw libc_error("PipeProcessor::Open");
 174+ }
 175+
 176+ // This is the parent process
 177+ pipes->readEnd.Close();
 178+ pipes->writeEnd.SetDescriptorFlags(FD_CLOEXEC);
 179+ if (!blocking) {
 180+ pipes->writeEnd.SetStatusFlags(O_NONBLOCK);
 181+ }
 182+}
 183+
 184+void PipeProcessor::CopyFromPipe(Pipe & source, size_t dataLength)
 185+{
 186+ if (!child) {
 187+ return;
 188+ }
 189+
 190+ int flags = 0;
 191+ if (!blocking) {
 192+ flags |= SPLICE_F_NONBLOCK;
 193+ }
 194+ try {
 195+ if (blocking && dataLength > PIPE_BUF) {
 196+ // Write large packets in blocking mode to preserve data integrity
 197+ GetPipe().SetStatusFlags(0);
 198+ source.Tee(GetPipe(), dataLength, 0);
 199+ GetPipe().SetStatusFlags(O_NONBLOCK);
 200+ } else {
 201+ source.Tee(GetPipe(), dataLength, flags);
 202+ }
 203+ } catch (libc_error & e) {
 204+ HandleError(e);
 205+ }
 206+}
 207+
Index: trunk/udplog/udp2log/Udp2LogConfig.cpp
@@ -4,13 +4,14 @@
55
66 #include "Udp2LogConfig.h"
77
 8+PosixFile Udp2LogConfig::devNull("/dev/null", O_WRONLY);
 9+
810 Udp2LogConfig::Udp2LogConfig()
911 : reload(false)
1012 {}
1113
1214 void Udp2LogConfig::Open(const std::string & name)
13 -{
14 - using namespace std;
 15+{
1516 fileName = name;
1617 Load();
1718 }
@@ -43,18 +44,24 @@
4445
4546 params = strtok(NULL, "");
4647 LogProcessor * processor = NULL;
47 - bool flush = false;
 48+ bool flush = false, blocking = false;
4849
4950 if (!strcmp(type, "flush")) {
5051 flush = true;
5152 type = strtok(params, " \t");
5253 params = strtok(NULL, "");
5354 }
 55+ if (!strcmp(type, "blocking")) {
 56+ blocking = true;
 57+ type = strtok(params, " \t");
 58+ params = strtok(NULL, "");
 59+ }
5460
5561 if (!strcmp(type, "file")) {
 62+ // TODO: support blocking for FileProcessor
5663 processor = FileProcessor::NewFromConfig(params, flush);
5764 } else if (!strcmp(type, "pipe")) {
58 - processor = PipeProcessor::NewFromConfig(params, flush);
 65+ processor = PipeProcessor::NewFromConfig(params, flush, blocking);
5966 } else {
6067 throw ConfigError("Unrecognised log type");
6168 }
@@ -94,11 +101,75 @@
95102 }
96103 }
97104
98 -void Udp2LogConfig::ProcessLine(char *buffer, size_t size)
 105+void Udp2LogConfig::ProcessBuffer(const char *data, size_t dataLength)
99106 {
100 - boost::ptr_vector<LogProcessor>::iterator i;
101 - for (i = processors.begin(); i != processors.end(); i++) {
102 - i->ProcessLine(buffer, size);
 107+ boost::ptr_vector<LogProcessor>::iterator iter;
 108+ int numPipes = 0;
 109+
 110+ for (iter = processors.begin(); iter != processors.end(); iter++) {
 111+ if (iter->IsUnsampledPipe() && iter->IsOpen()) {
 112+ numPipes++;
 113+ }
103114 }
 115+
 116+ if (numPipes >= 2) {
 117+ // Efficiently send the packet to the unsampled pipes.
 118+ // First load the data into a pipe buffer.
 119+ // It just so happens that the size of the pipe buffer is the same
 120+ // as the maximum size of a UDP packet, so this won't block.
 121+ bufferPipe.writeEnd.Write(data, dataLength);
 122+
 123+ // Now send it out to all unsampled pipes using the zero-copy tee() syscall.
 124+ for (iter = processors.begin(); iter != processors.end(); iter++) {
 125+ if (iter->IsUnsampledPipe() && iter->IsOpen()) {
 126+ PipeProcessor & p = dynamic_cast<PipeProcessor&>(*iter);
 127+ p.CopyFromPipe(bufferPipe.readEnd, dataLength);
 128+ }
 129+ }
 130+
 131+ // Finally flush the pipe buffer by splicing it out to /dev/null
 132+ bufferPipe.readEnd.Splice(Udp2LogConfig::devNull, dataLength, 0);
 133+ }
 134+
 135+ // For the sampled processors, split the buffer into lines
 136+ const char *line1, *line2;
 137+ ssize_t bytesRemaining;
 138+
 139+ line1 = data;
 140+ bytesRemaining = dataLength;
 141+ while (bytesRemaining) {
 142+ size_t lineLength;
 143+
 144+ // Find the next line break
 145+ line2 = (char*)memchr(line1, '\n', bytesRemaining);
 146+ if (line2) {
 147+ // advance line2 to the start of the next line
 148+ line2++;
 149+ lineLength = line2 - line1;
 150+ } else {
 151+ // no more lines, process the remainder of the buffer
 152+ lineLength = bytesRemaining;
 153+ }
 154+
 155+ for (iter = processors.begin(); iter != processors.end(); iter++) {
 156+ if (numPipes >= 2 && iter->IsUnsampledPipe()) {
 157+ continue;
 158+ }
 159+
 160+ iter->ProcessLine(line1, lineLength);
 161+ }
 162+ bytesRemaining -= lineLength;
 163+ line1 = line2;
 164+ }
104165 }
105166
 167+void Udp2LogConfig::PrintLossReport(std::ostream & os)
 168+{
 169+ boost::ptr_vector<LogProcessor>::iterator iter;
 170+ for (iter = processors.begin(); iter != processors.end(); iter++) {
 171+ uint64_t lost = iter->GetNumLost();
 172+ if (lost) {
 173+ os << "Lost output chunks: " << lost << ": " << iter->GetName() << "\n";
 174+ }
 175+ }
 176+}
Index: trunk/udplog/udp2log/LogProcessor.h
@@ -3,18 +3,34 @@
44
55 #include <fstream>
66 #include <sys/time.h>
 7+#include <string>
 8+#include "../srclib/Exception.h"
79 #include "../srclib/Socket.h"
 10+#include "../srclib/Pipe.h"
811
912 class LogProcessor
1013 {
1114 public:
12 - virtual void ProcessLine(char *buffer, size_t size) = 0;
 15+ virtual void ProcessLine(const char *buffer, size_t size) = 0;
1316 virtual void FixIfBroken() {}
1417 virtual ~LogProcessor() {}
 18+ virtual bool IsOpen() = 0;
 19+ virtual std::string & GetName() = 0;
1520
 21+ virtual int IsUnsampledPipe() {
 22+ return false;
 23+ }
 24+
 25+ int GetFactor() const
 26+ {
 27+ return factor;
 28+ }
 29+
 30+ uint64_t GetNumLost() { return numLost; }
 31+
1632 protected:
1733 LogProcessor(int factor_, bool flush_)
18 - : counter(0), factor(factor_), flush(flush_)
 34+ : counter(0), factor(factor_), flush(flush_), numLost(0)
1935 {}
2036
2137
@@ -35,57 +51,80 @@
3652 int counter;
3753 int factor;
3854 bool flush;
 55+ uint64_t numLost;
3956 };
4057
4158 class FileProcessor : public LogProcessor
4259 {
4360 public:
4461 static LogProcessor * NewFromConfig(char * params, bool flush);
45 - virtual void ProcessLine(char *buffer, size_t size);
 62+ virtual void ProcessLine(const char *buffer, size_t size);
4663
47 - FileProcessor(char * filename, int factor_, bool flush_)
 64+ FileProcessor(char * fileName_, int factor_, bool flush_)
4865 : LogProcessor(factor_, flush_)
4966 {
50 - f.open(filename, std::ios::app | std::ios::out);
 67+ fileName = fileName_;
 68+ f.open(fileName_, std::ios::app | std::ios::out);
5169 }
5270
53 - bool IsOpen() {
 71+ virtual bool IsOpen() {
5472 return f.good();
5573 }
5674
 75+
 76+ virtual std::string & GetName() {
 77+ return fileName;
 78+ }
 79+
5780 std::ofstream f;
 81+ std::string fileName;
5882 };
5983
6084 class PipeProcessor : public LogProcessor
6185 {
6286 public:
63 - static LogProcessor * NewFromConfig(char * params, bool flush);
64 - virtual void ProcessLine(char *buffer, size_t size);
 87+ static LogProcessor * NewFromConfig(char * params, bool flush, bool blocking);
 88+ virtual void ProcessLine(const char *buffer, size_t size);
6589 virtual void FixIfBroken();
6690
67 - PipeProcessor(char * command_, int factor_, bool flush_)
68 - : LogProcessor(factor_, flush_)
69 - {
70 - command = strdup(command_);
71 - f = popen(command, "w");
72 - }
 91+ PipeProcessor(char * command_, int factor_, bool flush_, bool blocking_);
7392
7493 ~PipeProcessor()
7594 {
76 - free(command);
77 - if (f) {
78 - pclose(f);
79 - }
 95+ Close();
8096 }
8197
82 - bool IsOpen()
 98+ virtual bool IsOpen()
8399 {
84 - return (bool)f;
 100+ return (bool)child;
85101 }
86102
87 - FILE * f;
88 - char * command;
 103+ virtual int IsUnsampledPipe() {
 104+ return factor == 1;
 105+ }
 106+
 107+ Pipe & GetPipe() {
 108+ return pipes->writeEnd;
 109+ }
 110+
 111+ void CopyFromPipe(Pipe & source, size_t dataLength);
 112+ virtual std::string & GetName() { return command; }
 113+
 114+protected:
 115+ typedef boost::shared_ptr<PipePair> PipePairPointer;
 116+
 117+ void Open();
 118+ void Close();
 119+ void HandleError(libc_error & e);
 120+
 121+ std::string command;
 122+
 123+ PipePairPointer pipes;
 124+ pid_t child;
 125+ bool blocking;
 126+
89127 enum {RESTART_INTERVAL = 5};
 128+
90129 };
91130
92131
Index: trunk/udplog/udp2log/Udp2LogConfig.h
@@ -4,8 +4,17 @@
55 #include <string>
66 #include <boost/ptr_container/ptr_vector.hpp>
77 #include <stdexcept>
 8+#include <iostream>
89 #include "LogProcessor.h"
 10+#include "../srclib/PosixFile.h"
 11+#include "../srclib/Pipe.h"
912
 13+class ConfigWriteCallback;
 14+
 15+/**
 16+ * The configuration and current processing state for udp2log.
 17+ * TODO: rename this to Udp2LogState
 18+ */
1019 class Udp2LogConfig
1120 {
1221 public:
@@ -14,12 +23,20 @@
1524 void Load();
1625 void FixBrokenProcessors();
1726 void Reload();
18 - void ProcessLine(char *buffer, size_t size);
 27+ void ProcessBuffer(const char *data, size_t dataLength);
 28+ void PrintLossReport(std::ostream & os);
1929
 30+ inline ConfigWriteCallback GetWriteCallback();
 31+
 32+ bool reload;
 33+ bool fixBrokenProcessors;
 34+
 35+protected:
2036 std::string fileName;
2137 boost::ptr_vector<LogProcessor> processors;
22 - bool reload;
23 - bool fixBrokenProcessors;
 38+ PipePair bufferPipe;
 39+
 40+ static PosixFile devNull;
2441 };
2542
2643 class ConfigWatcher
@@ -39,4 +56,23 @@
4057 {}
4158 };
4259
 60+class ConfigWriteCallback
 61+{
 62+public:
 63+ ConfigWriteCallback(Udp2LogConfig & config_)
 64+ : config(config_)
 65+ {}
 66+
 67+ void operator()(const char* buffer, size_t bufSize) {
 68+ return config.ProcessBuffer(buffer, bufSize);
 69+ }
 70+protected:
 71+ Udp2LogConfig & config;
 72+};
 73+
 74+inline ConfigWriteCallback Udp2LogConfig::GetWriteCallback() {
 75+ return ConfigWriteCallback(*this);
 76+}
 77+
 78+
4379 #endif
Index: trunk/udplog/udp2log/SendBuffer.h
@@ -0,0 +1,43 @@
 2+#ifndef UDPLOG_SENDBUFFER_H
 3+#define UDPLOG_SENDBUFFER_H
 4+
 5+template <class Callback>
 6+class SendBuffer {
 7+public:
 8+ SendBuffer(size_t capacity_, Callback writeCallback_)
 9+ : capacity(capacity_), size(0), writeCallback(writeCallback_)
 10+ {
 11+ data = new char[capacity];
 12+ }
 13+
 14+ void Flush() {
 15+ if (!size) {
 16+ return;
 17+ }
 18+ writeCallback(data, size);
 19+ size = 0;
 20+ }
 21+
 22+ void Write(const char* buffer, size_t bufSize);
 23+protected:
 24+ char * data;
 25+ size_t capacity, size;
 26+ Callback writeCallback;
 27+};
 28+
 29+template <class Callback>
 30+void SendBuffer<Callback>::Write(const char* buffer, size_t bufSize)
 31+{
 32+ if (bufSize > capacity) {
 33+ Flush();
 34+ writeCallback(buffer, bufSize);
 35+ } else if (size + bufSize > capacity) {
 36+ Flush();
 37+ memcpy(data, buffer, bufSize);
 38+ size = bufSize;
 39+ } else {
 40+ memcpy(data + size, buffer, bufSize);
 41+ size += bufSize;
 42+ }
 43+}
 44+#endif
Property changes on: trunk/udplog/udp2log/SendBuffer.h
___________________________________________________________________
Added: svn:eol-style
145 + native
Index: trunk/udplog/udp2log/udp2log.cpp
@@ -13,10 +13,12 @@
1414 #include <fcntl.h>
1515 #include <pwd.h>
1616 #include <cstdlib>
 17+#include <functional>
1718
1819 #include "../srclib/Exception.h"
1920 #include "../srclib/Socket.h"
2021 #include "Udp2LogConfig.h"
 22+#include "SendBuffer.h"
2123
2224 std::string configFileName("/etc/udp2log");
2325 std::string logFileName("/var/log/udp2log/udp2log.log");
@@ -26,7 +28,7 @@
2729
2830 Udp2LogConfig config;
2931
30 -void OnHangup(int)
 32+void OnHangup(int)
3133 {
3234 config.reload = true;
3335 }
@@ -53,7 +55,7 @@
5456
5557 // Change user
5658 if (setgid(userData->pw_gid) == -1
57 - || setuid(userData->pw_uid) == -1)
 59+ || setuid(userData->pw_uid) == -1)
5860 {
5961 throw libc_error("Error changing user ID");
6062 }
@@ -188,6 +190,7 @@
189191 cerr << "Unable to open socket\n";
190192 return 1;
191193 }
 194+ socket.SetDescriptorFlags(FD_CLOEXEC);
192195 socket.Bind(saddr);
193196
194197 // Join a multicast group if requested
@@ -199,14 +202,18 @@
200203 return 1;
201204 }
202205 }
 206+
203207 // Process received packets
204208 boost::shared_ptr<SocketAddress> address;
205209 const size_t bufSize = 65536;
206 - char buffer[bufSize];
207 - char *line1, *line2;
208 - ssize_t bytesRemaining, bytesRead;
 210+ char receiveBuffer[bufSize];
 211+ ssize_t bytesRead;
 212+ SendBuffer<ConfigWriteCallback> sendBuffer(PIPE_BUF, config.GetWriteCallback());
 213+
 214+ time_t lossReportTime = 0;
 215+
209216 for (;;) {
210 - bytesRead = socket.RecvFrom(buffer, bufSize, address);
 217+ bytesRead = socket.RecvFrom(receiveBuffer, bufSize, address);
211218 if (bytesRead <= 0) {
212219 continue;
213220 }
@@ -219,24 +226,12 @@
220227 continue;
221228 }
222229
223 - // Split into lines and hand off to the processors
224 - line1 = buffer;
225 - bytesRemaining = bytesRead;
226 - while (bytesRemaining) {
227 - // Find the next line break
228 - line2 = (char*)memchr(line1, '\n', bytesRemaining);
229 - if (line2) {
230 - // advance line2 to the start of the next line
231 - line2++;
232 - // Process the line
233 - config.ProcessLine(line1, line2 - line1);
234 - bytesRemaining -= line2 - line1;
235 - } else {
236 - // no more lines, process the remainder of the buffer
237 - config.ProcessLine(line1, bytesRemaining);
238 - bytesRemaining = 0;
239 - }
240 - line1 = line2;
 230+ sendBuffer.Write(receiveBuffer, bytesRead);
 231+
 232+ time_t currentTime = time(NULL);
 233+ if (currentTime - lossReportTime > 60) {
 234+ config.PrintLossReport(std::cout);
 235+ lossReportTime = currentTime;
241236 }
242237 }
243238 }
Index: trunk/udplog/Makefile
@@ -1,8 +1,9 @@
22 TARGETS = log2udp udprecv delta udp2log/udp2log packet-loss
3 -HOST_OBJS = srcmisc/host.o srclib/HostEntry.o srclib/IPAddress.o
4 -LOG2UDP_OBJS = srcmisc/log2udp.o srclib/HostEntry.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o
5 -UDPRECV_OBJS = srcmisc/udprecv.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o
6 -UDP2LOG_OBJS = udp2log/udp2log.o udp2log/LogProcessor.o udp2log/Udp2LogConfig.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o
 3+SRCLIB_OBJS = srclib/HostEntry.o srclib/FileDescriptor.o srclib/IPAddress.o srclib/Socket.o srclib/SocketAddress.o
 4+HOST_OBJS = srcmisc/host.o $(SRCLIB_OBJS)
 5+LOG2UDP_OBJS = srcmisc/log2udp.o $(SRCLIB_OBJS)
 6+UDPRECV_OBJS = srcmisc/udprecv.o $(SRCLIB_OBJS)
 7+UDP2LOG_OBJS = udp2log/udp2log.o udp2log/LogProcessor.o udp2log/Udp2LogConfig.o $(SRCLIB_OBJS)
78 CFLAGS:=$(CFLAGS) -Wall
89
910 all: $(TARGETS)
Index: trunk/udplog/srclib/FileDescriptor.cpp
@@ -0,0 +1,22 @@
 2+#include "FileDescriptor.h"
 3+#include "Exception.h"
 4+
 5+void FileDescriptor::RaiseError(const char* msg)
 6+{
 7+ if (ignoreErrors.size() == 0) {
 8+ // Ignore none
 9+ throw libc_error(msg);
 10+ }
 11+
 12+ std::set<int> * curIgnore = ignoreErrors.back().get();
 13+ if (!curIgnore) {
 14+ // Ignore all
 15+ return;
 16+ }
 17+
 18+ if (!curIgnore->count(errno)) {
 19+ // Don't ignore this one
 20+ throw libc_error(msg);
 21+ }
 22+}
 23+
Property changes on: trunk/udplog/srclib/FileDescriptor.cpp
___________________________________________________________________
Added: svn:eol-style
124 + native
Index: trunk/udplog/srclib/Pipe.h
@@ -0,0 +1,63 @@
 2+#ifndef UDPLOG_PIPE_H
 3+#define UDPLOG_PIPE_H
 4+
 5+#ifndef _GNU_SOURCE
 6+#define _GNU_SOURCE
 7+#endif
 8+#include <fcntl.h>
 9+
 10+class Pipe : public FileDescriptor
 11+{
 12+public:
 13+ friend class PipePair;
 14+
 15+ ssize_t Tee(Pipe & output, size_t length, unsigned int flags)
 16+ {
 17+ ssize_t result = tee(fd, output.fd, length, flags);
 18+ if (result == -1) {
 19+ RaiseError("Pipe::Tee");
 20+ }
 21+ return result;
 22+ }
 23+
 24+ ssize_t Splice(FileDescriptor & output, size_t length, unsigned int flags)
 25+ {
 26+ ssize_t result = splice(fd, NULL, output.GetFD(), NULL, length, flags);
 27+ if (result == -1) {
 28+ RaiseError("Pipe::Splice");
 29+ }
 30+ return result;
 31+ }
 32+};
 33+
 34+class PipePair
 35+{
 36+public:
 37+
 38+ PipePair()
 39+ {
 40+ int pipefd[2];
 41+
 42+ if (pipe(pipefd) == -1) {
 43+ readEnd.RaiseError("PipePair constructor");
 44+ good = false;
 45+ } else {
 46+ good = true;
 47+ readEnd.fd = pipefd[0];
 48+ writeEnd.fd = pipefd[1];
 49+ readEnd.good = true;
 50+ writeEnd.good = true;
 51+ }
 52+ }
 53+
 54+ operator bool() {
 55+ return good;
 56+ }
 57+
 58+ Pipe readEnd, writeEnd;
 59+protected:
 60+ bool good;
 61+};
 62+
 63+
 64+#endif
Property changes on: trunk/udplog/srclib/Pipe.h
___________________________________________________________________
Added: svn:eol-style
165 + native
Index: trunk/udplog/srclib/Exception.h
@@ -9,7 +9,8 @@
1010 class libc_error : public std::runtime_error {
1111 public:
1212 libc_error(const std::string & what_arg)
13 - : std::runtime_error(std::string(what_arg).append(": ").append(std::strerror(errno)))
 13+ : std::runtime_error(std::string(what_arg).append(": ").append(std::strerror(errno))),
 14+ code(errno)
1415 {}
1516
1617 const char* what() {
@@ -19,5 +20,6 @@
2021 virtual ~libc_error() throw() {}
2122
2223 std::string msg;
 24+ int code;
2325 };
2426 #endif
Index: trunk/udplog/srclib/FileDescriptor.h
@@ -0,0 +1,136 @@
 2+#ifndef UDPLOG_FILEDESCRIPTOR_H
 3+#define UDPLOG_FILEDESCRIPTOR_H
 4+
 5+#include <set>
 6+#include <vector>
 7+#include <cerrno>
 8+#include <boost/shared_ptr.hpp>
 9+#include <unistd.h>
 10+#include <fcntl.h>
 11+
 12+class Socket;
 13+
 14+class FileDescriptor
 15+{
 16+private:
 17+ // Not copyable
 18+ FileDescriptor(const Socket & s) {}
 19+
 20+protected:
 21+ FileDescriptor()
 22+ : fd(-1), good(false), ownFd(true)
 23+ {}
 24+public:
 25+
 26+ virtual ~FileDescriptor() {
 27+ Close();
 28+ }
 29+
 30+ operator bool() {
 31+ return good;
 32+ }
 33+
 34+ int GetFD() const {
 35+ return fd;
 36+ }
 37+
 38+ // close() wrapper
 39+ int Close() {
 40+ int status = 0;
 41+ if (fd != -1 && ownFd) {
 42+ status = close(fd);
 43+ fd = -1;
 44+ good = false;
 45+ if (status == -1) {
 46+ RaiseError("FileDescriptor::Close");
 47+ }
 48+ }
 49+ return status;
 50+ }
 51+
 52+ // read() wrapper
 53+ int Read(void *buf, size_t len) {
 54+ ssize_t length = read(fd, buf, len);
 55+ if (length == (ssize_t)-1) {
 56+ RaiseError("FileDescriptor::Read");
 57+ }
 58+ return length;
 59+ }
 60+
 61+ // write() wrapper
 62+ int Write(const void *buf, size_t len) {
 63+ ssize_t length = write(fd, buf, len);
 64+ if (length == (ssize_t)-1) {
 65+ RaiseError("FileDescriptor::Write");
 66+ }
 67+ return length;
 68+ }
 69+
 70+ // fcntl(F_SETFD) wrapper
 71+ int SetDescriptorFlags(long flags) {
 72+ int status = fcntl(fd, F_SETFD, flags);
 73+ if (status == -1) {
 74+ RaiseError("FileDescriptor::SetDescriptorFlags");
 75+ }
 76+ return status;
 77+ }
 78+
 79+ // fcntl(F_GETFD) wrapper
 80+ long GetDescriptorFlags() {
 81+ int flags = fcntl(fd, F_GETFD);
 82+ if (flags == -1) {
 83+ RaiseError("FileDescriptor::GetDescriptorFlags");
 84+ }
 85+ return flags;
 86+ }
 87+
 88+ // fcntl(F_SETFL) wrapper
 89+ int SetStatusFlags(long flags) {
 90+ int status = fcntl(fd, F_SETFL, flags);
 91+ if (status == -1) {
 92+ RaiseError("FileDescriptor::SetStatusFlags");
 93+ }
 94+ return status;
 95+ }
 96+
 97+ // fcntl(F_GETFL) wrapper
 98+ long GetStatusFlags() {
 99+ int flags = fcntl(fd, F_GETFL);
 100+ if (flags == -1) {
 101+ RaiseError("FileDescriptor::GetStatusFlags");
 102+ }
 103+ return flags;
 104+ }
 105+
 106+ int Dup2(int newfd) {
 107+ int status = dup2(fd, newfd);
 108+ if (status == -1) {
 109+ RaiseError("FileDescriptor::Dup2");
 110+ }
 111+ return status;
 112+ }
 113+
 114+ // Ignore a given set of errors
 115+ void Ignore(boost::shared_ptr<std::set<int> > s) {
 116+ ignoreErrors.push_back(s);
 117+ }
 118+
 119+ // Ignore all errors
 120+ void IgnoreAll() {
 121+ ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL));
 122+ }
 123+
 124+ // Restore the previous ignore set
 125+ void PopIgnore() {
 126+ ignoreErrors.pop_back();
 127+ }
 128+
 129+ void RaiseError(const char* msg);
 130+
 131+protected:
 132+ int fd;
 133+ bool good;
 134+ bool ownFd;
 135+ std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors;
 136+};
 137+#endif
Property changes on: trunk/udplog/srclib/FileDescriptor.h
___________________________________________________________________
Added: svn:eol-style
1138 + native
Index: trunk/udplog/srclib/PosixFile.h
@@ -0,0 +1,17 @@
 2+#ifndef UDPLOG_POSIXFILE_H
 3+#define UDPLOG_POSIXFILE_H
 4+
 5+class PosixFile : public FileDescriptor
 6+{
 7+public:
 8+ PosixFile(const char* pathname, int flags, int mode = 0) {
 9+ fd = open(pathname, flags, mode);
 10+ if (fd == -1) {
 11+ RaiseError("PosixFile constructor");
 12+ } else {
 13+ good = true;
 14+ }
 15+ }
 16+};
 17+
 18+#endif
Property changes on: trunk/udplog/srclib/PosixFile.h
___________________________________________________________________
Added: svn:eol-style
119 + native
Index: trunk/udplog/srclib/Socket.cpp
@@ -1,24 +1,5 @@
22 #include "Socket.h"
33
4 -void Socket::RaiseError(const char* msg)
5 -{
6 - if (ignoreErrors.size() == 0) {
7 - // Ignore none
8 - throw libc_error(msg);
9 - }
10 -
11 - std::set<int> * curIgnore = ignoreErrors.back().get();
12 - if (!curIgnore) {
13 - // Ignore all
14 - return;
15 - }
16 -
17 - if (!curIgnore->count(errno)) {
18 - // Don't ignore this one
19 - throw libc_error(msg);
20 - }
21 -}
22 -
234 int UDPSocket::JoinMulticast(const IPAddress & addr) {
245 int level, optname;
256 void *optval;
Index: trunk/udplog/srclib/Socket.h
@@ -1,34 +1,22 @@
22 #ifndef SOCKET_H______
33 #define SOCKET_H______
44
5 -#include <set>
6 -#include <vector>
7 -#include <boost/shared_ptr.hpp>
85 #include <sys/socket.h>
9 -#include <cerrno>
 6+#include "FileDescriptor.h"
107 #include "SocketAddress.h"
118 #include "Exception.h"
129
13 -
1410 // Socket base class
1511 // Do not instantiate this directly
16 -class Socket
 12+class Socket : public FileDescriptor
1713 {
18 -private:
19 - // Not default constructible
20 - Socket(){}
21 -
22 - // Not copyable
23 - Socket(const Socket & s) {}
24 -
2514 protected:
2615 Socket(int domain, int type, int protocol)
27 - : fd(-1)
 16+ : FileDescriptor()
2817 {
2918 fd = socket(domain, type, protocol);
3019 if (fd == -1) {
3120 RaiseError("Socket constructor");
32 - good = false;
3321 } else {
3422 good = true;
3523 }
@@ -36,10 +24,6 @@
3725
3826 public:
3927
40 - operator bool() {
41 - return good;
42 - }
43 -
4428 int Connect(SocketAddress & s) {
4529 if (connect(fd, s.GetBinaryData(), s.GetBinaryLength()) < 0) {
4630 RaiseError("Socket::Connect");
@@ -122,28 +106,8 @@
123107 to = SocketAddress::NewFromBuffer();
124108 return length;
125109 }
126 -
127 - // Ignore a given set of errors
128 - void Ignore(boost::shared_ptr<std::set<int> > s) {
129 - ignoreErrors.push_back(s);
130 - }
131 -
132 - // Ignore all errors
133 - void IgnoreAll() {
134 - ignoreErrors.push_back(boost::shared_ptr<std::set<int> >((std::set<int>*)NULL));
135 - }
136 -
137 - // Restore the previous ignore set
138 - void PopIgnore() {
139 - ignoreErrors.pop_back();
140 - }
141 -
142 - void RaiseError(const char* msg);
143110 protected:
144 - int fd;
145111 boost::shared_ptr<SocketAddress> peer;
146 - bool good;
147 - std::vector<boost::shared_ptr<std::set<int> > > ignoreErrors;
148112 };
149113
150114 class UDPSocket : public Socket
@@ -162,9 +126,6 @@
163127 }
164128
165129 int JoinMulticast(const IPAddress & addr);
166 -private:
167 - int JoinMulticast4(const IPAddress & addr);
168 - int JoinMulticast6(const IPAddress & addr);
169130 };
170131
171132 #endif

Status & tagging log