This project has moved and is read-only. For the latest updates, please go here.

Problem with my application using simple server and client

Jul 2, 2010 at 2:00 PM
Edited Jul 2, 2010 at 2:02 PM
Hi, I'm using your fabulous lib, but i have a few problems. My app using 3 windows services and several WPF clients, 2 of the services are simple clients that serialize object and sends it to the server (which is a windows service server) (and not receiving) and all the WPF clients only receiving messages, each one has his own messages arrived to him (the server does the logic). its a medium-high stress app... the server could get something like 200 messages per sec. its runs OK for an hour or two but then the RAM memory that the server takes goes way up! (could get to 2 GB) and the messages does not get sent anymore here is my code its a bit messy but i hope u can help me: code: TcpClient: public class MonitorTcpClient : IMonitorClient, IDisposable { ActionThread th; /// <summary> /// The socket that connects to the server. This is null if ClientSocketState is SocketState.Closed. /// </summary> private SimpleClientTcpSocket ClientSocket; /// <summary> /// The connected state of the socket. If this is SocketState.Closed, then ClientSocket is null. /// </summary> SocketState ClientSocketState; public event NewCallbackMessageArrived NewMessageArrived; public delegate void NewCallbackMessageArrived(object sender, CallbackMessageArrivedEventArgs e); int _port; string _serverAddress; /// <summary> /// Closes and clears the socket, without causing exceptions. /// </summary> public void ResetSocket() { // Close the socket ClientSocket.Close(); ClientSocket = null; // Indicate there is no socket connection ClientSocketState = SocketState.Closed; th.Join(); Start(); } private void ClientSocket_ConnectCompleted(AsyncCompletedEventArgs e) { try { // Check for errors if (e.Error != null) { ResetSocket(); return; } // Adjust state ClientSocketState = SocketState.Connected; // Display the connection information } catch (Exception ex) { ResetSocket(); SendException(ex); } finally { } } void ClientSocket_WriteCompleted(object sender, AsyncCompletedEventArgs e) { // Check for errors if (e.Error != null) { // Note: WriteCompleted may be called as the result of a normal write or a keepalive packet. ResetSocket(); // If you want to get fancy, you can tell if the error is the result of a write failure or a keepalive // failure by testing e.UserState, which is set by normal writes. } else { object mes = e.UserState; if (mes != null) { try { MonitorCallbackQueueMessage mess = mes as MonitorCallbackQueueMessage; } catch { } } } } void ClientSocket_ShutdownCompleted(AsyncCompletedEventArgs e) { // Check for errors if (e.Error != null) { ResetSocket(); } else { // Close the socket and set the socket state ResetSocket(); } } public MonitorTcpClient(int port, string serverAddress) { try { // Read the IP address IPAddress serverIPAddress; if (!IPAddress.TryParse(serverAddress, out serverIPAddress)) { return; } // Read the port number _port = port; _serverAddress = serverAddress; // Begin connecting to the remote IP Start(); } catch (Exception ex) { ResetSocket(); } } private void Start() { th = new ActionThread(); th.Start(); th.Do(() => { ClientSocket = new SimpleClientTcpSocket(); ClientSocket.ConnectCompleted += ClientSocket_ConnectCompleted; ClientSocket.WriteCompleted += (args) => ClientSocket_WriteCompleted(ClientSocket, args); ClientSocket.ShutdownCompleted += ClientSocket_ShutdownCompleted; ClientSocket.PacketArrived += new Action<AsyncResultEventArgs<byte[]>>(ClientSocket_PacketArrived); ClientSocket.ConnectAsync(IPAddress.Parse(_serverAddress), _port); ClientSocketState = SocketState.Connecting; }); } void ClientSocket_PacketArrived(AsyncResultEventArgs<byte[]> e) { try { // Check for errors if (e.Error != null) { ResetSocket(); } else if (e.Result == null) { // PacketArrived completes with a null packet when the other side gracefully closes the connection // Close the socket and handle the state transition to disconnected. ResetSocket(); } else { // At this point, we know we actually got a message. // Deserialize the message object message = Monitor.Common.Serialization.Deserialize(e.Result); MonitorCallbackQueueMessage complexMessage = message as MonitorCallbackQueueMessage; if (complexMessage != null) { CallbackMessageArrivedEventArgs callback = new CallbackMessageArrivedEventArgs(); callback.MonitorCallbackMessage = complexMessage; NewMessageArrived(this, callback); return; } } } catch (Exception ex) { ResetSocket(); SendException(ex); } } #region IMonitorClient Members public void SendCallbackMessage(MonitorCallbackQueueMessage message) { try { // Create the message to send // Serialize the message to a binary array if (message != null) { byte[] binaryMessage = Monitor.Common.Serialization.Serialize(message); // Send the message; the state is used by ClientSocket_WriteCompleted to display an output to the log if (ClientSocket != null) { ClientSocket.WriteAsync(binaryMessage); } else { th.Join(); Start(); } } } catch (Exception exp) { ResetSocket(); SendException(exp); } } public void SendConnectionMessage(ConnectionMessage connMessage) { try { // Serialize the message to a binary array if (connMessage != null) { byte[] binaryMessage = Monitor.Common.Serialization.Serialize(connMessage); // Send the message; the state is used by ClientSocket_WriteCompleted to display an output to the log; if (ClientSocket != null) { ClientSocket.WriteAsync(binaryMessage); } else { ResetSocket(); ClientSocket.WriteAsync(binaryMessage); } } } catch (Exception exp) { ResetSocket(); SendException(exp); } } public void SendException(Exception exp) { Monitor.Common.HelperMethods.SendException(exp, "MonitorClient", "MonitorClient Error"); } #endregion #region IDisposable Members public void Dispose() { try { ClientSocket.Dispose(); th.Join(); } catch { } } #endregion } } ################################################################ TcpServer public class MonitorServer : IMonitorServer { /// <summary> /// The socket that listens for connections. This is null if we are not listening. /// </summary> SimpleServerTcpSocket ListeningSocket; /// <summary> /// A mapping of sockets (with established connections) to their state. /// </summary> Dictionary<SimpleServerChildTcpSocket, ChildSocketState> ChildSockets; Dictionary<string, SimpleServerChildTcpSocket> Receivers; Dictionary<string, SimpleServerChildTcpSocket> Senders; int _port; int _backLog; public MonitorServer(int port, int backLog) { ChildSockets = new Dictionary<SimpleServerChildTcpSocket, ChildSocketState>(); Receivers = new Dictionary<string, SimpleServerChildTcpSocket>(); Senders = new Dictionary<string, SimpleServerChildTcpSocket>(); _port = port; _backLog = backLog; ListeningSocket = new SimpleServerTcpSocket(); ListeningSocket.ConnectionArrived += new Action<AsyncResultEventArgs<SimpleServerChildTcpSocket>>(ListeningSocket_ConnectionArrived); } public void StartServer() { try { // Define the socket, bind to the port, and start accepting connections ListeningSocket.Listen(_port, _backLog); } catch (Exception ex) { SendException(ex); } } public void RestartServer() { ResetListeningSocket(); } public void CloseServer() { ResetListeningSocket(); ListeningSocket.Dispose(); } /// <summary> /// Closes and clears the listening socket and all connected sockets, without causing exceptions. /// </summary> void ResetListeningSocket() { // Close all child sockets foreach (KeyValuePair<SimpleServerChildTcpSocket, ChildSocketState> socket in ChildSockets) socket.Key.Close(); ChildSockets.Clear(); foreach (KeyValuePair<string, SimpleServerChildTcpSocket> socket in Senders) { socket.Value.Close(); } Senders.Clear(); foreach (KeyValuePair<string, SimpleServerChildTcpSocket> socket in Receivers) { socket.Value.Close(); } Receivers.Clear(); // Close the listening socket ListeningSocket.Close(); ListeningSocket = null; } /// <summary> /// Closes and clears a child socket (established connection), without causing exceptions. /// </summary> /// <param name="childSocket">The child socket to close. May be null.</param> void ResetChildSocket(SimpleServerChildTcpSocket childSocket) { // Remove it from the list of child sockets foreach (KeyValuePair<string, SimpleServerChildTcpSocket> item in Receivers) { if (item.Value == childSocket) { if (item.Value != null) item.Value.Close(); Receivers.Remove(item.Key); } } foreach (KeyValuePair<string, SimpleServerChildTcpSocket> item in Senders) { if (item.Value == childSocket) { if (item.Value != null) item.Value.Close(); Senders.Remove(item.Key); } } // Close the child socket if possible if (childSocket != null) childSocket.Close(); if (ChildSockets.ContainsKey(childSocket)) { ChildSockets.Remove(childSocket); } } void ListeningSocket_ConnectionArrived(AsyncResultEventArgs<SimpleServerChildTcpSocket> e) { // Check for errors if (e.Error != null) { ResetListeningSocket(); return; } SimpleServerChildTcpSocket socket = e.Result; try { // Save the new child socket connection ChildSockets.Add(socket, ChildSocketState.Connected); socket.PacketArrived += (args) => ChildSocket_PacketArrived(socket, args); socket.WriteCompleted += (args) => ChildSocket_WriteCompleted(socket, args); socket.ShutdownCompleted += (args) => ChildSocket_ShutdownCompleted(socket, args); } catch (Exception ex) { ResetChildSocket(socket); SendException(ex); } finally { } } void ChildSocket_PacketArrived(SimpleServerChildTcpSocket socket, AsyncResultEventArgs<byte[]> e) { try { // Check for errors if (e.Error != null) { ResetChildSocket(socket); } else if (e.Result == null) { // PacketArrived completes with a null packet when the other side gracefully closes the connection // Close the socket and remove it from the list ResetChildSocket(socket); } else { // At this point, we know we actually got a message. // Deserialize the message object message = Monitor.Common.Serialization.Deserialize(e.Result); ConnectionMessage connMessage = message as ConnectionMessage; if (connMessage != null) { try { string queueString = GetQueueString(connMessage); switch (connMessage.ConnectionType) { case ConnectionType.Receiver: { if (Receivers.ContainsKey(queueString) == false) { Receivers.Add(GetQueueString(connMessage), socket); } else { Receivers[GetQueueString(connMessage)] = socket; } break; } case ConnectionType.Sender: { if (Senders.ContainsKey(queueString) == false) { Senders.Add(GetQueueString(connMessage), socket); } else { Senders[queueString] = socket; } break; } } } catch (Exception exp) { SendException(exp); } return; } // Handle the message MonitorCallbackQueueMessage callbackMessage = message as MonitorCallbackQueueMessage; if (callbackMessage != null) { string queueString = GetQueueString(callbackMessage); try { InsertIntoMonitoringHistory(callbackMessage); if (Receivers.ContainsKey(queueString)) { // Keep a list of all errors for child sockets Dictionary<SimpleServerChildTcpSocket, Exception> SocketErrors = new Dictionary<SimpleServerChildTcpSocket, Exception>(); foreach (KeyValuePair<SimpleServerChildTcpSocket, ChildSocketState> item in ChildSockets) { if (item.Key == Receivers[queueString] && item.Value == ChildSocketState.Connected) { try { item.Key.WriteAsync(Monitor.Common.Serialization.Serialize(callbackMessage)); } catch (Exception exp) { SocketErrors.Add(item.Key, exp); } } } foreach (KeyValuePair<SimpleServerChildTcpSocket, Exception> error in SocketErrors) { ResetChildSocket(error.Key); } } } catch { } return; } } } catch (Exception ex) { SendException(ex); ResetChildSocket(socket); } finally { } } void ChildSocket_ShutdownCompleted(object sender, AsyncCompletedEventArgs e) { SimpleServerChildTcpSocket socket = (SimpleServerChildTcpSocket)sender; // Check for errors if (e.Error != null) { SendException(e.Error); ResetChildSocket(socket); } else { // Close the socket and remove it from the list ResetChildSocket(socket); } } void ChildSocket_WriteCompleted(SimpleServerChildTcpSocket socket, AsyncCompletedEventArgs e) { // Check for errors if (e.Error != null) { // Note: WriteCompleted may be called as the result of a normal write (SocketPacketizer.WritePacketAsync), // or as the result of a call to SocketPacketizer.WriteKeepaliveAsync. However, WriteKeepaliveAsync // will never invoke WriteCompleted if the write was successful; it will only invoke WriteCompleted if // the keepalive packet failed (indicating a loss of connection). // If you want to get fancy, you can tell if the error is the result of a write failure or a keepalive // failure by testing e.UserState, which is set by normal writes. if (e.UserState is string) SendException(e.Error); else ResetChildSocket(socket); } else { string description = (string)e.UserState; } } #region IMonitorServer Members public void SendException(Exception exp) { Monitor.Common.HelperMethods.SendException(exp, "MonitorServer", "Error"); } public int? GetQueueID(Location location, Department department) { return Monitor.Common.HelperMethods.GetQueueID(location, department); } public void InsertIntoMonitoringHistory(MonitorCallbackQueueMessage msg) { if (msg != null) { string connectionString = ConfigurationManager.ConnectionStrings["co"].ConnectionString; using (SqlConnection conn = new SqlConnection(connectionString)) { int duration = 0; int.TryParse(msg.Duration, out duration); using (SqlCommand cmd = new SqlCommand()) { cmd.Connection = conn; cmd.CommandType = System.Data.CommandType.StoredProcedure; cmd.CommandText = "[P_UpdateMonitoring]";; try { conn.Open(); cmd.ExecuteNonQuery(); } catch (Exception exp) { SendException(exp); } } } } } public string GetQueueString(ConnectionMessage message) { return message.Location.ToString() + message.Department.ToString(); } public string GetQueueString(MonitorCallbackQueueMessage message) { return message.Location.ToString() + message.Department.ToString(); } #endregion } } if you want my services code either (they using this classes to deploy the system) tell me and i will post it.
Jul 5, 2010 at 10:29 PM

I believe the problem is in your error handling code. Remember that all of the socket methods are called within the context of the ActionThread in your class. So if an error ever happened, you close the socket connection (which is correct) but then wait for the thread to exit. Since this is running in the context of the ActionThread, it will wait until the current thread exits (i.e., forever).

In the meantime, the sockets will continue communicating using ThreadPool threads, trying to send messages to the ActionThread (which will never respond). This is why the memory usage increases.

I recommend not stopping the ActionThread on an error. Just closing the connection that had the error should be sufficient.

        -Steve

Jul 6, 2010 at 8:08 AM

Hi Steve,

Many thanks for your answer, but if you could help clear some issues....

  1. About the closing of the ActionThread. You probably mean the TCP client  at first i didn't joined the ActionThread but the memory still spikes (at the server service) , if i will close the connection and want to open it again instantly (without joining the thread) should i do something like this?
      public    void ResetSocket()
            {
                // Close the socket
                try
                {
                    ClientSocket.Close();
                }
                catch { }
                ClientSocket = null;
                // Indicate there is no socket connection
                ClientSocketState = SocketState.Closed;
                Start();
            }
            private void Start()
            {
                if (th == null)
                {
                    th = new ActionThread();
                    th.Start();
                    th.Do(() =>
                    {
                        ClientSocket = new SimpleClientTcpSocket();
                        ClientSocket.ConnectCompleted += ClientSocket_ConnectCompleted;
                        ClientSocket.WriteCompleted += (args) => ClientSocket_WriteCompleted(ClientSocket, args);
                        ClientSocket.ShutdownCompleted += ClientSocket_ShutdownCompleted;
                        ClientSocket.PacketArrived += new Action<AsyncResultEventArgs<byte[]>>(ClientSocket_PacketArrived);
                        ClientSocket.ConnectAsync(IPAddress.Parse(_serverAddress), _port);
                        ClientSocketState = SocketState.Connecting;
                    });
                }
                else
                {
                    ClientSocket = new SimpleClientTcpSocket();
                    ClientSocket.ConnectCompleted += ClientSocket_ConnectCompleted;
                    ClientSocket.WriteCompleted += (args) => ClientSocket_WriteCompleted(ClientSocket, args);
                    ClientSocket.ShutdownCompleted += ClientSocket_ShutdownCompleted;
                    ClientSocket.PacketArrived += new Action<AsyncResultEventArgs<byte[]>>(ClientSocket_PacketArrived);
                    ClientSocket.ConnectAsync(IPAddress.Parse(_serverAddress), _port);
                    ClientSocketState = SocketState.Connecting;
                }
                
            }
  2. 2. Its seems that after a while the clients stops to send the messages and if i restart (dispose and then create a new instance) the services its coming back to send... any solution you have in mind for this problem? (probably its connected to the first issue)
Jul 6, 2010 at 2:13 PM

The updated error recovery code looks better. You don't actually need the try/catch around the ClientSocket.Close; it only throws "critical" errors which means your application is done for anyway.

Are you using the most recent version of Nito.Async, and does your server-side code have an ActionThread?

        -Steve

Jul 6, 2010 at 2:33 PM

My server side has an ActionThread although its uses it differently. The service initate the server handling class in an action thread (not like the client has the ActionThread embedded) 

   protected override void OnStart(string[] args)
        {
         
            dis = new ActionThread();
            dis.Start();
            dis.Do(() =>
                 {

                     Start();
                 });
                

            





        }

        private void Start()
        {
            try
            {
                monitorSocket = new MonitorServer.MonitorServer(port, 2000);
                monitorSocket.StartServer();

            }
            catch (Exception ex)
            {
                SendException(ex);
            }
        }



My code is a bit experimental for now and has not been re factored sorry about it.

Thanks for the helpful assistance!

Jul 8, 2010 at 8:04 AM
Well its looks like i solved the problem. Its seems to be some kind of an overflow problem, what i did is to create a queue for the messages when they arrive and not to send it instantly but to do an interval dequeue and send. BTW why does SimpleClientSocket write_completed event does not fire when a regular write is been made? and only in a case of error?.... (its will save me the interval dequeue and send). Regards, Nir.
Jul 8, 2010 at 1:42 PM
Edited Jul 8, 2010 at 1:43 PM

I'm glad to hear your problem is resolved. It still seems like you're seeing some odd behavior, though; the WriteCompleted event should fire whenever a write completes, whether it was successful or with an error.

What version of the library are you using?

  -Steve

Jul 8, 2010 at 2:06 PM

1.4

On Jul 8, 2010 3:42 PM, "shammah" <notifications@codeplex.com> wrote:

From: shammah

I'm glad to hear your problem is resolved. It still seems like you're seeing some odd behavior, though; the WriteCompleted event should fire whenever a write completes, whether it was successful or with an error. What version of the library are you using? -Steve



Read the full discussion online.

To add a post to this discussion, reply to this email (NitoAsync...

Aug 13, 2012 at 3:32 PM

I have same problem on write_completed event . Is there any solution about it ?