DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Windows Socket Demo Program -- Uses Threads And Stuff -- For Stress Testing The TCP Stack

08.26.2006
| 8691 views |
  • submit to reddit
        // I wrote this to find out if the claims made at http://laurentszyster.be/blog/tcp-stack-flaking-out/ is 
// valid.  I disagree with the opinions of the person who posted that TCP flakes out on Windows. I think
// his program has obvious bugs.  This program proves that as long as you write your code properly
// the TCP connections do not break randomly. Similar bugs existed in Bittorrent apparently, and they fixed them.
// For more information visit http://sparebandwidth.blogspot.com/2006/08/more-on-tcp-flaking-out.html

#define BUFFER_SIZE 1000
#define SERVER_ADDR "127.0.0.1"
#define SERVER_PORT 9999
#define NUM_THREADS 1000
#define SEND_SOCK_BUFFER_SIZE 34000
#define RECV_SOCK_BUFFER_SIZE  34000
#define SERVER_RECV_SOCK_BUFFER_SIZE  34000
#define SERVER_SEND_SOCK_BUFFER_SIZE 34000
#define CLIENT_LIMIT 1000	/* how many times to send/recv in client */

#include <stdio.h>     
#define FD_SETSIZE  NUM_THREADS
#include <winsock.h>   
#include <stdlib.h>    

#define fatal_error(arg) {\
		fprintf(stderr,"total in %d out %d, %s: %d\n",		\
			total_bytes_received, total_bytes_sent,		\
			arg, WSAGetLastError()); fflush(stderr); exit(1); }
#define fatal_error2(arg)  { fprintf(stderr, "%s", arg); fflush(stderr); exit(1);}

int threads_all_done = 0;

struct thread_args {
	int id;
	int server_sock;
	struct sockaddr_in server_addr;
};

void *client_thread(void *args)
{
	struct thread_args *client_thread_arg;
	struct sockaddr_in server_addr;
	int server_sock;
	int client_sock;
	int on = 1;
	int bytes_received, bytes_sent;
	int num_to_read;
	int total_bytes_received, total_bytes_sent;
	char buffer[BUFFER_SIZE];
	int size;
	int i;

	total_bytes_received = total_bytes_sent = 0;
	
	client_thread_arg = (struct thread_args *)args;
	memcpy(&server_addr, &client_thread_arg->server_addr,
	       sizeof(struct sockaddr_in));
	server_sock = client_thread_arg->server_sock;

	if ((client_sock = socket(PF_INET, SOCK_STREAM,
				  IPPROTO_TCP)) < 0)
		fatal_error("socket() error in client thread");

	if (connect(client_sock, (struct sockaddr *)&server_addr,
		    sizeof(server_addr)) < 0)
		fatal_error("connect() error in client thread");

	/* wait for the server to send startup message */
	if ((bytes_received =
	     recv(client_sock, buffer, BUFFER_SIZE, 0)) < 0)
		fatal_error("recv() error in client thread");

	if (ioctlsocket(client_sock, FIONBIO, &on) < 0)
		fatal_error("FIONBIO error in client thread");

	size = SEND_SOCK_BUFFER_SIZE;
	if (setsockopt(client_sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0)
		fatal_error("SO_SNDBUF error in client thread");
	size = RECV_SOCK_BUFFER_SIZE;
	if (setsockopt(client_sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) < 0)
		fatal_error("SO_RCVBUF error in client thread");
	
	for (i = 0; i < CLIENT_LIMIT ; i++) {
		int errno;
		
		if ((bytes_sent =
		     send(client_sock, buffer,
			  BUFFER_SIZE, 0)) != BUFFER_SIZE) {
			errno = WSAGetLastError();
			if (errno == WSAEWOULDBLOCK ||
				errno == WSAENOBUFS)
				continue;
			
			fatal_error("send() error in client thread");
		}
		total_bytes_sent += bytes_sent;

		num_to_read = 0;
		if (ioctlsocket(client_sock, FIONREAD, &num_to_read) < 0) 
			fatal_error("FIONREAD error in client thread");
		
		if (num_to_read == 0)
			continue;
		
		if ((bytes_received =
		     recv(client_sock, buffer, 
			  num_to_read >= BUFFER_SIZE ?
			  BUFFER_SIZE - 1 : num_to_read, 0)) <= 0)
			fatal_error("recv() error in client thread");
		total_bytes_received += bytes_received; 		
	}

	printf("thread %d total bytes received %d sent %d\n",
	       client_thread_arg->id, total_bytes_received, total_bytes_sent);

#if 0
	closesocket(client_sock);
#endif
	free(args);
}

void *starter_thread(void *args)
{
	HANDLE thread_handles[NUM_THREADS];
	DWORD thread_ids[NUM_THREADS];
	struct thread_args *client_thread_arg;
	int i;
	
	for (i = 0; i < NUM_THREADS; i++) {
		if ((client_thread_arg = 
		     (struct thread_args *)
		     malloc(sizeof(struct thread_args))) == 0) 
			fatal_error2("malloc client_thread_arg error");
		
		memcpy(client_thread_arg, args, sizeof(struct thread_args));
		client_thread_arg->id = i;

		if ((thread_handles[i] =
		     CreateThread(0, 0,
				  (LPTHREAD_START_ROUTINE) client_thread,
				  client_thread_arg, 0,
				  (LPDWORD)&thread_ids[i])) == 0) 
			fatal_error2("CreateThread for clients error");

		printf("created client thread %d\n", i); fflush(stdout);
	}
	free(args);
	args = 0;

	printf("%d threads created\n", NUM_THREADS); fflush(stdout);
	
	for (i = 0; i < NUM_THREADS; i++) 
		WaitForSingleObject(thread_handles[i], INFINITE);
	
	printf("all threads terminated\n"); fflush(stdout);

	threads_all_done = 1;
	
	ExitThread(0);
}

int main(void)
{
	int server_sock, client_sock;    
	struct sockaddr_in server_addr, client_addr; 
	int client_addr_len;
	unsigned short server_port;   
	char buffer[BUFFER_SIZE];    
	int bytes_received, total_bytes_received; 
	int bytes_sent, total_bytes_sent;
	int on = 1;
	unsigned long num_to_read;
	struct timeval timeout;
	struct thread_args *starter_thread_arg;
	WSADATA wsa_data;     
	HANDLE starter_thread_handle;
	DWORD starter_thread_id;
	int client_sockets[NUM_THREADS];
	fd_set socket_fds;
	int size;
	int i;

	total_bytes_received = total_bytes_sent = 0;
	
	if (WSAStartup(MAKEWORD(2, 0), &wsa_data) != 0) {
		fprintf(stderr, "WSAStartup() error");
		exit(1);
	}
	
	total_bytes_received = 0;
	total_bytes_sent = 0;
	
	if ((server_sock = socket(PF_INET, SOCK_STREAM,
				  IPPROTO_TCP)) < 0)
		fatal_error("socket() error");
	
	memset(&client_addr, 0, sizeof(client_addr));     
	memset(&server_addr, 0, sizeof(server_addr));     
	server_addr.sin_family  = AF_INET;       
	server_addr.sin_addr.s_addr = htonl(INADDR_ANY); 
	server_addr.sin_port = htons(SERVER_PORT);
	
	if (bind(server_sock, (struct sockaddr *) &server_addr,
		 sizeof(server_addr)) < 0)
		fatal_error ("bind() error ");
	if (listen(server_sock, 5) < 0)
		fatal_error("listen() error");
	
	if ((starter_thread_arg =
	     (struct thread_args *)
	     malloc(sizeof(struct thread_args))) == 0)
		fatal_error("malloc starter_thread_arg error");

	starter_thread_arg->server_sock = server_sock;
	server_addr.sin_addr.s_addr = inet_addr(SERVER_ADDR);
	memcpy(&starter_thread_arg->server_addr , &server_addr,
	       sizeof(server_addr));
	starter_thread_arg->id = -1;
	
	if ((starter_thread_handle =
	     CreateThread(0,0,
			  (LPTHREAD_START_ROUTINE)starter_thread,
			  starter_thread_arg, 0,
			  (LPDWORD)&starter_thread_id)) == 0)
		fatal_error("CreateThread for starter thread error");
	
	for (i = 0; i < NUM_THREADS; i++) {
		client_addr_len = sizeof(client_addr);
		if ((client_sock = accept(server_sock,
					  (struct sockaddr *) &client_addr,
					  &client_addr_len)) < 0)
			fatal_error("accept() error");
#if 0
		printf("connection accepted from %s\n",
		       inet_ntoa(client_addr.sin_addr));
#endif
		client_sockets[i] = client_sock;
	}
	printf("%d clients accepted\n", NUM_THREADS);

	/* send startup message to all clients */
	for (i = 0; i < NUM_THREADS; i++) {
		int startup_msg_len ;

		client_sock = client_sockets[i];
		memset(buffer, 0, sizeof(buffer));
		strcpy(buffer, "startup");
		startup_msg_len = strlen(buffer);
		if ((bytes_sent =
		     send(client_sock, buffer,
			  startup_msg_len, 0)) != startup_msg_len)
			fatal_error("sending start signal failed");

		size = SERVER_SEND_SOCK_BUFFER_SIZE;
		if (setsockopt(client_sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0)
			fatal_error("SO_SNDBUF error  ");
		size = SERVER_RECV_SOCK_BUFFER_SIZE;
		if (setsockopt(client_sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) < 0)
			fatal_error("SO_RCVBUF error ");
	
		if (ioctlsocket(client_sock, FIONBIO, &on) < 0)
			fatal_error("FIONBIO error");
	}

	printf("sent startup messages\n"); fflush(stdout);
	for (;;)  {
		int half = BUFFER_SIZE / 2;

		if (threads_all_done) break;

		Sleep(100);	/* slow down server artificially */
		
		FD_ZERO(&socket_fds);
		for (i = 0; i < NUM_THREADS; i++)
			FD_SET(client_sockets[i], &socket_fds);
		timeout.tv_sec = 0;
		timeout.tv_usec = 10000;

		if (select(0, &socket_fds, 0, 0, &timeout) == 0)
			continue;

		for (i = 0; i < NUM_THREADS; i++) {
			int errno;
			
			if (! FD_ISSET(client_sockets[i], &socket_fds))
				continue;

			client_sock = client_sockets[i];

			/* send half as much */
			if ((bytes_sent =
			     send(client_sock, buffer,
				 half, 0)) != half) {
				errno = WSAGetLastError();
				if (errno == WSAEWOULDBLOCK ||
				    errno == WSAENOBUFS)
					continue;
				fatal_error("send() error");
			}
			total_bytes_sent += bytes_sent;
			
			num_to_read = 0;
			if (ioctlsocket(client_sock, FIONREAD, &num_to_read) < 0) 
				fatal_error("FIONREAD error");
			
			if (num_to_read == 0)
				continue;
			
			/* read half as much */
			num_to_read = num_to_read >= half ? half - 1 : num_to_read;
			if ((bytes_received =
			     recv(client_sock, buffer, num_to_read, 0)) <= 0)
				fatal_error("recv() error ");
			total_bytes_received += bytes_received;
		}
	}
	
	closesocket(server_sock);
	closesocket(client_sock);
	WSACleanup();
	WaitForSingleObject(starter_thread_handle, INFINITE);

	printf("all threads finished. total bytes received %d sent %d\n",
	       total_bytes_received, total_bytes_sent);
	
	exit(0);
}

    

Comments

Peter Cooper replied on Thu, 2006/08/03 - 11:23pm

Congrats! 2000th public post!