Discussion:
WaitForSingleObject
(too old to reply)
Larry
2010-01-27 11:23:44 UTC
Permalink
Hi,

I am getting this close to finish my tiny streaming server...having said
that I have a problem with the following code. It basically fires a runtime
error when I disconect from the sever! (closing the telnet window)

I wound up finding out that the error may be fired because of this line:

WaitForSingleObject(eventi[threadid], INFINITE);

If I replaceit with: Sleep(1000) everything goes ok....


/*
*
* Streaming Server v1.0 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;

int main()
{
// Launch Producer
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Set up server (port: 8000, maxconn: 10)
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;
}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event & push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
eventi.insert(make_pair(threadid,hevent));

// Prepare & add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

users.insert(make_pair(threadid, c));

//
// TODO:
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);
eventi.erase(threadid);

// Remove buffer from the map
users.erase(threadid);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;
s->Close();
delete s;
return 0;
}

// Producer
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime,
30, BUFF_DONE));
SetEvent(eventi[uit->first]);
cout << "Producer is writing to: " << uit->first << endl;
}
}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}

// thanks
Ulrich Eckhardt
2010-01-27 13:19:48 UTC
Permalink
It basically fires a runtime error when I disconect from the sever!
(closing the telnet window)
Which error exactly?
WaitForSingleObject(eventi[threadid], INFINITE);
If I replaceit with: Sleep(1000) everything goes ok....
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;
s->SendBytes("Hello World!" + CRLF);
int threadid = (int)GetCurrentThreadId();
// Create Event & push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
eventi.insert(make_pair(threadid,hevent));
[...]
while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}
// Close & remove event from event map
CloseHandle(eventi[threadid]);
eventi.erase(threadid);
[...]
}
If you close the connection, you will break out of the while loop and then
destroy the event and erase it from the map...
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(
buffer((unsigned char*)szTime, 30, BUFF_DONE));
SetEvent(eventi[uit->first]);
cout << "Producer is writing to: " << uit->first << endl;
}
}
return 0;
}
... while the producer is still reading it.

Generally, you didn't protect access to shared data in any way, it can
happen that one thread is reading the map with users while the other is
writing it. This is a no-go for multithreading. Take a look at boost::mutex
or win32's CRITICAL_SECTION.

Uli
--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
Larry
2010-01-28 00:03:34 UTC
Permalink
Post by Ulrich Eckhardt
Which error exactly?
Expression: map/set iterator not incrementable

by the way, is it true that under windows system I should shy away form
using callback event rather I should be using boost::thread?

thanks
Ulrich Eckhardt
2010-01-28 08:31:30 UTC
Permalink
Post by Larry
Post by Ulrich Eckhardt
Which error exactly?
Expression: map/set iterator not incrementable
This can have three causes:
0. You are simply walking off the end of the container with an iterator.
1. You are accessing the container from different threads. This can lead to
all kinds of corruption, and getting a clear error description like the
above is luck, but not impossible.
2. You have a shared container and iterator into the container. When
removing an element, the iterator becomes invalid if it points to that
element. Forgetting to reset it and then using it is what can cause this
problem.
Post by Larry
by the way, is it true that under windows system I should shy away form
using callback event rather I should be using boost::thread?
Regardless of the programming environment, you should make an informed
decision. This statement above could only be the result of weighing
different aspects of each approach against each other, but without those
the statement can neither be called true nor false.

That said, you can do a few things with directly using the win32 API that
you can't do with Boost.Thread, precisely because those things are not
portable. Most notably WaitForMultipleObjects() is not portable but very
useful.

Uli
--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
Larry
2010-01-28 01:29:53 UTC
Permalink
Post by Ulrich Eckhardt
Generally, you didn't protect access to shared data in any way, it can
happen that one thread is reading the map with users while the other is
writing it. This is a no-go for multithreading. Take a look at
boost::mutex
or win32's CRITICAL_SECTION.
so, do you think I should be using win32 mutex? bu where should I put it in
the code?

thanks
Scott McPhillips [MVP]
2010-01-28 03:57:32 UTC
Permalink
Post by Larry
Post by Ulrich Eckhardt
Generally, you didn't protect access to shared data in any way, it can
happen that one thread is reading the map with users while the other is
writing it. This is a no-go for multithreading. Take a look at boost::mutex
or win32's CRITICAL_SECTION.
so, do you think I should be using win32 mutex? bu where should I put it
in the code?
thanks
No, use CRITICAL_SECTION. It has less overhead.

After initializing it (InitializeCriticalSection(&cs)), surround all
accesses to data shared between threads like this:

EnterCriticalSection(&cs);
...access or change shared data
LeaveCriticalSection(&cs);

The first statement suspends the calling thread if another thread is
"inside" a similar code block. When the other thread does the Leave..,.
call the first thread is allowed to proceed.
--
Scott McPhillips [VC++ MVP]
Larry
2010-01-28 14:28:15 UTC
Permalink
Post by Scott McPhillips [MVP]
No, use CRITICAL_SECTION. It has less overhead.
After initializing it (InitializeCriticalSection(&cs)), surround all
EnterCriticalSection(&cs);
...access or change shared data
LeaveCriticalSection(&cs);
The first statement suspends the calling thread if another thread is
"inside" a similar code block. When the other thread does the Leave..,.
call the first thread is allowed to proceed.
Ok! I am going to need a little help from you though...here's a typical
scenario of my app:

// Buffer structs "circular"

struct buffer
{
unsigned char data[1024]; // binary data container
int bytesRecorded; // bytes actually written (to read)
int flag; // Empty/filled (enum)
buffer(...); // constructor()
};

struct circular
{
circular_buffer<buffer> cb; // Cicular buffer based on "buffer"
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;

int main()
{
// TODO: launch Producer thread.
(...)

// TODO: Set up server and accept incoming
// requests. Each time a client connects
// spawn a new Consumer thread
(...)

return 0; // never for the moment!
}

// Thread Consumer is launched
// anytime a client conencts.

thread Consumer
{
int threadid = (int)GetCurrentThreadId();

// Create event & push it in the "eventi" map
//
HANDLE hevent = CreateEvent(NULL,0,0,NULL);
eventi.insert(make_pair(threadid, hevent));

// Prepare & add circular buffer to the "users" map
//
circular c;
users.insert(make_pair(threadid, c));

// Loop...
//
while(1)
{
// Callback event
WaitForSingleObject(eventi[threadid], INFINITE);

// TODO: read buffer from "users" map
// and send it to the client. If the
// client disconnect break the while loop
}

// Close & remove event from event map
//
CloseHandle(eventi[threadid]);
eventi.erase(threadid);

// Remove buffer from the map
//
users.erase(threadid);

return 0;
}

// Thread Producer is launched
// once. Its main goal is to write
// the same data to all the Consumer
// threads.

thread Producer
{
while(1)
{
// TODO: write on every
// element of map "users"
//
for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer(...));

// Signal event in the Cosumer
// thread (buffer written).
//
if(eventi[uit->first])
SetEvent(eventi[uit->first]);
}
}
return 0;
}

thanks
Larry
2010-01-28 16:43:26 UTC
Permalink
Post by Larry
Ok! I am going to need a little help from you though...here's a typical
wow...I am getting a lot of unhandle exception (writing location)


/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;
map<int, HANDLE>::iterator eit;

// Declare Procuder && Cosumer CS

CRITICAL_SECTION csProducer;
CRITICAL_SECTION csConsumer;

int main()
{
// Initialize the critical section
InitializeCriticalSection(&csProducer);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Initialize the critical section
InitializeCriticalSection(&csConsumer);

// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);
eventi.insert(make_pair(threadid,hevent));

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

users.insert(make_pair(threadid, c));

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);

if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

// Request ownership of the critical section.
EnterCriticalSection(&csConsumer);

eventi.erase(threadid);

// Release ownership of the critical section.
LeaveCriticalSection(&csConsumer);

// Remove buffer from the map
users.erase(threadid);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;
s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
// Request ownership of the critical section.
EnterCriticalSection(&csProducer);

users[uit->first].cb.push_back(buffer((unsigned char*)szTime,
30, BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

// Release ownership of the critical section.
LeaveCriticalSection(&csProducer);

cout << "Producer is writing to: " << uit->first << endl;
}

}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}
Ulrich Eckhardt
2010-01-28 17:32:52 UTC
Permalink
Post by Larry
// Declare Procuder && Cosumer CS
CRITICAL_SECTION csProducer;
CRITICAL_SECTION csConsumer;
Okay. Nice. Now, the 1M$ question is: What data structures are those
critical sections supposed to guard? Defining this is crucial, because
whether you use them correctly depends on exactly that.
Post by Larry
// Initialize the critical section
InitializeCriticalSection(&csProducer);
// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;
// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);
Wrapped in init/delete calls.
Post by Larry
// Initialize the critical section
InitializeCriticalSection(&csConsumer);
// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;
// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
Wrapped in init/delete calls again, this time for the other CS.

Sorry, but both wraps are wrong. You have to initialise the critical
sections on startup and delete them when you don't need them any more (here
before main() returns). In between, you can use them, e.g. you have to lock
one (EnterCriticalSection) whenever you want to access (read or write) the
shared data it guards. Then, when you're done, you unlock it again
(LeaveCriticalSection), so other threads can access the data.

Uli
--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
Larry
2010-01-28 17:50:16 UTC
Permalink
Post by Ulrich Eckhardt
Sorry, but both wraps are wrong. You have to initialise the critical
sections on startup and delete them when you don't need them any more (here
before main() returns). In between, you can use them, e.g. you have to lock
one (EnterCriticalSection) whenever you want to access (read or write) the
shared data it guards. Then, when you're done, you unlock it again
(LeaveCriticalSection), so other threads can access the data.
I tried that, but it is not working like I was expecting...I still get a run
time error when I disconnect from the server (I'm using telnet)

/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
map<int, circular>::iterator uit;
map<int, HANDLE> eventi;
map<int, HANDLE>::iterator eit;

// Declare Procuder && Cosumer CS

CRITICAL_SECTION csProducer;
CRITICAL_SECTION csConsumer;

int main()
{
// Initialize the critical section
InitializeCriticalSection(&csProducer);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Initialize the critical section
InitializeCriticalSection(&csConsumer);

// Spawn a new Consumr Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

}

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);

EnterCriticalSection(&csConsumer);
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&csConsumer);

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

EnterCriticalSection(&csConsumer);
users.insert(make_pair(threadid, c));
LeaveCriticalSection(&csConsumer);

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
if(eventi[threadid])
{
WaitForSingleObject(eventi[threadid], INFINITE);
} else {
DeleteCriticalSection(&csConsumer);
return 0;
}

EnterCriticalSection(&csConsumer);
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
LeaveCriticalSection(&csConsumer);
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

EnterCriticalSection(&csConsumer);
eventi.erase(threadid);
LeaveCriticalSection(&csConsumer);

// Remove buffer from the map
EnterCriticalSection(&csConsumer);
users.erase(threadid);
LeaveCriticalSection(&csConsumer);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;

// Release resources used by the critical section object.
DeleteCriticalSection(&csConsumer);
s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);

// Request ownership of the critical section.
EnterCriticalSection(&csProducer);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30,
BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

cout << "Producer is writing to: " << uit->first << endl;
}

// Release ownership of the critical section.
LeaveCriticalSection(&csProducer);
}
// Release resources used by the critical section object.
DeleteCriticalSection(&csProducer);
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}
Mihajlo Cvetanović
2010-01-28 18:41:54 UTC
Permalink
First, you need only one critical section, and its name should relate to
the data it guards, not to the entities that use it:

CRITICAL_SECTION csMySharedData;

Your current solution does not prevent one consumer and one producer to
access the same data at the same time.

Second, the life of critical section should be the same as the useful
life of data it's supposed to guard. You're safe to call
DeleteCriticalSection only when you know for sure that no one will
access data anymore. This is tricky to implement. You must wait for all
spawned threads to finish, and delete CS after that.

Third, it makes little sense to enter CS, change one bit of guarded
data, leave CS, enter it again, change another bit, leave again. Rule of
thumb is this: it's safe to leave critical section only if guarded data
is consistent again, like it was when you entered CS. Double push_back
must be in the same CS. Same as with double erase.

And fourth, never sleep or wait for something inside critical section.
You haven't done it, but I have a few times. This rule may complicate
the code to circumvent it, but it's essential.
Larry
2010-01-28 21:03:09 UTC
Permalink
Post by Mihajlo Cvetanović
First, you need only one critical section, and its name should relate to
CRITICAL_SECTION csMySharedData;
Your current solution does not prevent one consumer and one producer to
access the same data at the same time.
Second, the life of critical section should be the same as the useful life
of data it's supposed to guard. You're safe to call DeleteCriticalSection
only when you know for sure that no one will access data anymore. This is
tricky to implement. You must wait for all spawned threads to finish, and
delete CS after that.
Perfect! I totally got that now. Also, I have made some changes to the
previous code so that it should make more sense (it is working!)

/*
*
* Streaming Server v1.1 by THEARTOFWEB Software
*
*/

#include <iostream>
#include <string>
#include <map>
#include <algorithm>
#include <process.h>
#include <cstdlib>
#include <ctime>
#include "socket.h"
#include <boost/circular_buffer.hpp>
using namespace std;
using namespace boost;

const string CRLF = "\r\n";
const int numbuff = 3;

unsigned int __stdcall Consumer(void* sock);
unsigned int __stdcall Producer(void*);

void getDateTime(char * szTime);

enum buffer_status
{
BUFF_DONE = 1,
BUFF_EMPTY = 0
};

struct buffer
{
unsigned char data[1024];
int bytesRecorded;
int flag;
buffer(const unsigned char * data_, const int bytesRecorded_, const int
flag_) :
bytesRecorded(bytesRecorded_), flag(flag_)
{
copy(data_, data_ + bytesRecorded_, data);
}
};

struct circular
{
circular_buffer<buffer> cb;
};

// Global maps

map<int, circular> users;
CRITICAL_SECTION users_mutex;

map<int, HANDLE> eventi;
CRITICAL_SECTION eventi_mutex;

int main()
{
// Initialize all critical sections
InitializeCriticalSection(&users_mutex);
InitializeCriticalSection(&eventi_mutex);

// Launch Producer Thread
unsigned int prodRet;
_beginthreadex(0,0,Producer,NULL,0,&prodRet);
if(prodRet)
cout << "Launched Producer Thread!" << endl;

// Server.
// Set up server (port: 8000, maxconn: 10)
//
SocketServer sockIn(8000, 10);

while(1)
{
// ...wait for incoming connections...
Socket* s = sockIn.Accept();

// Spawn a new Consumer Thread each
// time a client connects.
unsigned int sockRet;
_beginthreadex(0,0,Consumer,s,0,&sockRet);
if(sockRet)
cout << "Spawned a new thread!" << endl;

}

DeleteCriticalSection(&users_mutex);
DeleteCriticalSection(&eventi_mutex);

sockIn.Close();

return EXIT_SUCCESS;
}

// Consumer Thread
unsigned int __stdcall Consumer(void* sock)
{
Socket* s = (Socket*) sock;

s->SendBytes("Hello World!" + CRLF);

int threadid = (int)GetCurrentThreadId();

// Create Event && Push it in the event map
HANDLE hevent = CreateEvent(NULL,FALSE,FALSE,NULL);

EnterCriticalSection(&eventi_mutex);
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&eventi_mutex);

// Prepare && Add circular buffer to the map
circular c;
c.cb.set_capacity(numbuff);

for(int i = 0; i<numbuff; i++)
{
c.cb.push_back(buffer(NULL,0,BUFF_EMPTY));
}

EnterCriticalSection(&users_mutex);
users.insert(make_pair(threadid, c));
LeaveCriticalSection(&users_mutex);

//
// Read data from the buffer
// and send it to the client
//
// When using push_back the oldest
// element in the circular buffer
// will be in the index 0
//

Sleep(500);

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);

if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close & remove event from event map
CloseHandle(eventi[threadid]);

EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);

// Remove buffer from the users map
EnterCriticalSection(&users_mutex);
users.erase(threadid);
LeaveCriticalSection(&users_mutex);

// Say bye to the client
s->SendBytes("Bye bye!" + CRLF);

// Disconnect client
cout << "Closing thread..." << endl;

s->Close();
delete s;
return 0;
}

// Producer Thread
unsigned int __stdcall Producer(void*)
{
while(1)
{
Sleep(1000);
char szTime[30]; getDateTime(szTime);
map<int, circular>::iterator uit;

EnterCriticalSection(&users_mutex);
EnterCriticalSection(&eventi_mutex);

for(uit=users.begin(); uit!=users.end(); ++uit)
{
users[uit->first].cb.push_back(buffer((unsigned char*)szTime, 30,
BUFF_DONE));

if(eventi[uit->first])
SetEvent(eventi[uit->first]);

cout << "Producer is writing to: " << uit->first << endl;
}

LeaveCriticalSection(&eventi_mutex);
LeaveCriticalSection(&users_mutex);
}
return 0;
}

void getDateTime(char * szTime)
{
time_t rawtime = time(NULL);
struct tm timeinfo;
gmtime_s(&timeinfo, &rawtime);
strftime(szTime, 30, "%a, %d %b %Y %X GMT", &timeinfo);
}

thanks
Ulrich Eckhardt
2010-01-29 08:17:21 UTC
Permalink
Post by Larry
map<int, circular> users;
CRITICAL_SECTION users_mutex;
map<int, HANDLE> eventi;
CRITICAL_SECTION eventi_mutex;
This looks much better, it makes the association immediately clear.
Post by Larry
EnterCriticalSection(&eventi_mutex);
eventi.insert(make_pair(threadid,hevent));
LeaveCriticalSection(&eventi_mutex);
Access to 'eventi' guarded in critical section, correct.

BTW: You can also write

eventi[threadid] = hevent;
Post by Larry
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
Wait:
1. Access to 'eventi' is not inside a critical section.
2. You could simply use 'hevent'.
Post by Larry
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
Similarly, access to 'users' is not guarded here!
Post by Larry
// Close & remove event from event map
CloseHandle(eventi[threadid]);
EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);
Imagine the producer creating some content, then the consumer destroys the
event object, then the producer tries to signal the event.

EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);
CloseHandle(hevent);
Post by Larry
EnterCriticalSection(&users_mutex);
EnterCriticalSection(&eventi_mutex);
Here is one possible problem, which is called "deadlock". Imagine one thread
locking 'users_mutex' and another locking 'eventi_mutex'. Now, both threads
try to lock the other mutex. Neither thread will ever get the lock. Mihajlo
mentioned that you shouldn't sleep with a lock held. Actually, you also
shouldn't try to acquire another lock with a lock already held!

There is one easy fix: Create a single map from thread-ID to the shared
structure of both threads. The shared structure contains both the IO
buffers and the event to wake up the consumer. The consumer is also
responsible for creating the entry when it's ready and removing it when
finished.


Uli
--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
Larry
2010-01-29 09:37:48 UTC
Permalink
Post by Ulrich Eckhardt
This looks much better, it makes the association immediately clear.
Post by Larry
// CALLBACK EVENT
WaitForSingleObject(eventi[threadid], INFINITE);
1. Access to 'eventi' is not inside a critical section.
2. You could simply use 'hevent'.
Post by Larry
if(users[threadid].cb.at(0).flag == BUFF_DONE)
{
string line = (char*)users[threadid].cb.at(0).data;
Similarly, access to 'users' is not guarded here!
I think the following should be a little better...

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(hevent, INFINITE);

EnterCriticalSection(&users_mutex);
int flag = users[threadid].cb.at(0).flag;
string line = (char*)users[threadid].cb.at(0).data;
LeaveCriticalSection(&users_mutex);

if(flag == BUFF_DONE)
{
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}
Post by Ulrich Eckhardt
There is one easy fix: Create a single map from thread-ID to the shared
structure of both threads. The shared structure contains both the IO
buffers and the event to wake up the consumer. The consumer is also
responsible for creating the entry when it's ready and removing it when
finished.
Ok. I will consider that.

thanks
Mihajlo Cvetanović
2010-01-29 10:53:22 UTC
Permalink
One more bug, you need to change the ordering of two erase calls at the
end of Consumer function (or couple those two calls inside one and only
critical section). Here's explanation:

In your case you got lucky in one instance, but you may not be lucky
next time. The Producer iterates through users map, and if an element
exist then it accesses the corresponding element in eventi map. All
fine, because the Consumer creates the element in eventi map before
creating the corresponding element in users map. Had it been the other
way around (first users.insert then eventi.insert) you would have one
more opportunity for a strange and rare crash.

Actually "next time" has just come. When Consumer leaves its main loop
it erases the element from eventi map and then also from users map. But
in between the Producer may just find this element in its loop, feed
some data into users map, and call SetEvent with non-existing element!
SetEvent may or may not crash, I don't know and would dare not to test
it in working environment.

And one more thing, you need ResetEvent after Consumer "consumes",
otherwise Consumers will never wait, and will consume the same data over
and over, until Producer gives them other bone to chew. ResetEvent also
must be called inside CS, because it is the part of data consistency.
But this is tricky because you must not reset the event if there are
data to consume. So after WFSO and inside CS you must ResetEvent only
after verifying that there are no more data to consume.

And one more problem connected with previous one, a race condition:
Producer may produce faster than Consumer can consume (even with
Sleep(1000) you are not really safe). The effect: some data is not sent,
because the Consumer sends only lastly produced data. The fix: in
Consumer erase all consumed data, and consume from the end instead from
beginning, and don't ResetEvent if there are more data to consume.
Larry
2010-01-29 13:53:33 UTC
Permalink
Actually "next time" has just come. When Consumer leaves its main loop it
erases the element from eventi map and then also from users map. But in
between the Producer may just find this element in its loop, feed some
data into users map, and call SetEvent with non-existing element! SetEvent
may or may not crash, I don't know and would dare not to test it in
working environment.
Ok! what about this?

while(1)
{
// CALLBACK EVENT
WaitForSingleObject(hevent, INFINITE);

EnterCriticalSection(&users_mutex);
int flag = users[threadid].cb.at(0).flag;
string line = (char*)users[threadid].cb.at(0).data;
ResetEvent(hevent);
LeaveCriticalSection(&users_mutex);

if(flag == BUFF_DONE)
{
int ret = s->SendBytes(line + CRLF);
if(SOCKET_ERROR == ret)
break;
}
}

// Close && Remove "event" and "users" from their maps
//
EnterCriticalSection(&users_mutex);
users.erase(threadid);
LeaveCriticalSection(&users_mutex);

EnterCriticalSection(&eventi_mutex);
eventi.erase(threadid);
LeaveCriticalSection(&eventi_mutex);
CloseHandle(hevent);
Producer may produce faster than Consumer can consume (even with
Sleep(1000) you are not really safe). The effect: some data is not sent,
because the Consumer sends only lastly produced data. The fix: in Consumer
erase all consumed data, and consume from the end instead from beginning,
and don't ResetEvent if there are more data to consume.
Well, since I am using pushing back in the circula buffer what I push is the
newest data and so forth. On the Consumer I need to get oldest data...or
maybe I ma getting that wrong...
Mihajlo Cvetanović
2010-01-29 15:00:03 UTC
Permalink
Post by Larry
Ok! what about this?
Good :-)
Post by Larry
Well, since I am using pushing back in the circula buffer what I push is
the newest data and so forth. On the Consumer I need to get oldest
data...or maybe I ma getting that wrong...
When Consumer gets the oldest data (cb[0]) it goes for another spin, and
what it gets then? Same thing: cb[0]. Is this what you intended? Maybe
you want to discard oldest data after you consume it?
Larry
2010-01-29 16:06:58 UTC
Permalink
Post by Mihajlo Cvetanović
When Consumer gets the oldest data (cb[0]) it goes for another spin, and
what it gets then? Same thing: cb[0]. Is this what you intended? Maybe you
want to discard oldest data after you consume it?
well, consider this:

cb->push_back(); 1
cb->push_back(); 2
cb->push_back(); 3
cb->push_back(); 4

old->new 1,2,3,4

so I will need in order 1,2,3,4

1 is at index 0, then 2 will be at index 0 etc...

that's my view.

Also, I should consider calling pop_front() after reding from it (front is
index 0)
Larry
2010-01-30 20:41:05 UTC
Permalink
Post by Mihajlo Cvetanović
When Consumer gets the oldest data (cb[0]) it goes for another spin, and
what it gets then? Same thing: cb[0]. Is this what you intended? Maybe
you want to discard oldest data after you consume it?
by the way, I was thinking about different approach what about the
follwoing?

circular_buffer<buffer> cb_local(numbuff);

while(1)
{
WaitForSingleObject(hevent, INFINITE);

EnterCriticalSection(&users_mutex);
cb_local.swap(users[threadid]);
LeaveCriticalSection(&users_mutex);

while (!cb_local.empty())
{
int ret = s->SendBytes(cb_local.at(0).data);
if (SOCKET_ERROR == ret)
return ret;
cb_local.pop_front();
}
}

thanks
Ulrich Eckhardt
2010-02-01 08:28:06 UTC
Permalink
Post by Larry
by the way, I was thinking about different approach what about the
follwoing?
circular_buffer<buffer> cb_local(numbuff);
while(1)
{
WaitForSingleObject(hevent, INFINITE);
EnterCriticalSection(&users_mutex);
cb_local.swap(users[threadid]);
LeaveCriticalSection(&users_mutex);
while (!cb_local.empty())
{
int ret = s->SendBytes(cb_local.at(0).data);
if (SOCKET_ERROR == ret)
return ret;
cb_local.pop_front();
}
}
Yep, good approach.

Uli
--
C++ FAQ: http://parashift.com/c++-faq-lite

Sator Laser GmbH
Geschäftsführer: Thorsten Föcking, Amtsgericht Hamburg HR B62 932
Continue reading on narkive:
Loading...