Discussion:
Bidirectional Tcp connector
Rassul Hessampour
2012-06-20 07:09:32 UTC
Permalink
asdasd

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

http://xircles.codehaus.org/manage_email
Rassul Hessampour
2012-06-20 12:12:32 UTC
Permalink
Dear david.
<br /><br />
Excuse me for being late in response.
<br /><br />
Here is settings in config files :
<br /><br />
1) Connector definition:
<br /><br />
<code>
<br />
&lt;tcp:connector name=&quot;myConnector&quot; sendTcpNoDelay=&quot;true&quot;
<br />
sendBufferSize=&quot;8192&quot; receiveBufferSize=&quot;8192&quot; createMultipleTransactedReceivers=&quot;true&quot;
<br />
numberOfConcurrentTransactedReceivers=&quot;30&quot; keepSendSocketOpen=&quot;true&quot;&gt;
<br />
&lt;spring:property name=&quot;tcpProtocol&quot; ref=&quot;myProtocol&quot;&#47;&gt;
<br />
&lt;&#47;tcp:connector&gt;
<br />
</code>
<br /><br />
2) Dispatcher endpoint :
<br /><br />
<code>
<br />
&lt;tcp:endpoint name=&quot;dispatchEndpoint&quot; encoding=&quot;8859_1&quot;
<br />
address=&quot;tcp:&#47;&#47;127&#46;0&#46;0&#46;1:5002&quot; connector-ref=&quot;myConnector&quot; remoteSync=&quot;true&quot;
<br />
remoteSyncTimeout=&quot;6000&quot;&gt;
<br />
&lt;&#47;tcp:endpoint&gt;
<br />
</code>
<br /><br />
3) Dispatcher class :
<br /><br />
<code>
<br />
public class MyTcpMessageDispatcher extends AbstractMessageDispatcher {
<br /><br />
private Socket socket;
<br />
private final TcpConnector connector;
<br /><br />
public MyTcpMessageDispatcher(OutboundEndpoint endpoint) {
<br />
super(endpoint);
<br />
this&#46;connector = (TcpConnector) endpoint&#46;getConnector();
<br />
SocketListener listener = new SocketListener();
<br />
new Thread(listener)&#46;start();
<br />
}
<br /><br />
protected void doConnect() throws Exception {
<br />
disposing&#46;set(false);
<br />
}
<br /><br />
protected void doDisconnect() throws ConnectException {
<br />
disposing&#46;set(true);
<br />
try {
<br />
if (socket != null) {
<br />
if (logger&#46;isDebugEnabled()) {
<br />
logger&#46;debug(&quot;Closing: &quot; + socket);
<br />
}
<br />
socket&#46;close();
<br />
}
<br />
} catch (IOException e) {
<br />
logger&#46;warn(&quot;Failed to close server socket: &quot; + e&#46;getMessage(), e);
<br />
}
<br />
}
<br /><br />
protected synchronized MuleMessage doSend(MuleEvent event) throws Exception {
<br />
if(socket == null || !socket&#46;isConnected()) {
<br />
socket = getSocket(event&#46;getEndpoint());
<br />
}
<br />
dispatchToSocket(socket, event);
<br />
if (useRemoteSync(event)) {
<br />
return receiveAndProcessSyncResponseFromSocket(event&#46;getTimeout(), event&#46;getMessage(),
<br />
event&#46;getEndpoint()&#46;getEndpointURI());
<br />
}
<br />
else {
<br />
return event&#46;getMessage();
<br />
}
<br />
}
<br /><br />
private MuleMessage receiveAndProcessSyncResponseFromSocket(int timeout, MuleMessage muleMessage,
<br />
EndpointURI endpointURI) throws Exception {
<br />
try {
<br />
Object result = receiveFromSocket(socket, timeout, endpoint);
<br />
if (!isResponseMessage(result)) {
<br />
byte[] response = createServerResponseMessage((byte[]) result);
<br />
write(socket, response);
<br />
return null;
<br />
}
<br />
else {
<br />
if (result == null) {
<br />
return null;
<br />
}
<br />
if (result instanceof MuleMessage) {
<br />
return (MuleMessage) result;
<br />
}
<br />
return MuleMessageUtil&#46;createDefaultMessageFromMessage(muleMessage,
<br />
connector&#46;getMessageAdapter(result));
<br />
}
<br />
}
<br />
catch (SocketTimeoutException e) {
<br />
logger&#46;info(&quot;Socket timed out normally while doing a synchronous receive on endpointUri: &quot;
<br />
+ endpointURI);
<br />
throw new ResponseTimeoutException(&quot;Remote system timeout happened!&quot;);
<br />
}
<br />
catch (NullPointerException ex) {
<br />
logger&#46;info(&quot;Nothing received from endpointUri: &quot;
<br />
+ endpointURI);
<br />
return null;
<br />
}
<br />
catch (Exception ex) {
<br />
logger&#46;info(&quot;Error receiving synch response from endpointUri: &quot;
<br />
+ endpointURI);
<br />
throw new EndpointException(ex);
<br />
}
<br />
}
<br /><br />
private boolean isResponseMessage(Object receivedMessage) {
<br />
&#47;&#47;check if received data is a response sent by remote server or is a request received from server;
<br />
return true;
<br />
}
<br /><br />
protected synchronized void doDispatch(MuleEvent event) throws Exception {
<br />
if(socket == null || !socket&#46;isConnected()) {
<br />
socket = getSocket(event&#46;getEndpoint());
<br />
}
<br />
try {
<br />
dispatchToSocket(socket, event);
<br />
}
<br />
finally {
<br />
}
<br />
}
<br /><br />
private void dispatchToSocket(Socket socket, MuleEvent event) throws Exception {
<br />
Object payload = event&#46;transformMessage();
<br />
write(socket, payload);
<br />
}
<br /><br />
private void write(Socket socket, Object data) throws IOException, TransformerException {
<br />
BufferedOutputStream bos = new BufferedOutputStream(socket&#46;getOutputStream());
<br />
connector&#46;getTcpProtocol()&#46;write(bos, data);
<br />
bos&#46;flush();
<br />
}
<br /><br />
protected static Object receiveFromSocket(final Socket socket, int timeout, final ImmutableEndpoint endpoint)
<br />
throws IOException {
<br />
final TcpConnector connector = (TcpConnector) endpoint&#46;getConnector();
<br />
DataInputStream underlyingIs = new DataInputStream(new BufferedInputStream(socket&#46;getInputStream()));
<br />
if (timeout &gt;= 0) {
<br />
socket&#46;setSoTimeout(timeout);
<br />
}
<br /><br />
try {
<br />
return connector&#46;getTcpProtocol()&#46;read(underlyingIs);
<br />
}
<br />
finally {
<br />
}
<br />
}
<br /><br />
protected void doDispose() {
<br />
try {
<br />
if (socket != null &amp;&amp; !socket&#46;isClosed()) {
<br />
if (logger&#46;isDebugEnabled()) {
<br />
logger&#46;debug(&quot;Closing: &quot; + socket);
<br />
}
<br />
socket&#46;close();
<br />
}
<br />
socket = null;
<br />
} catch (Exception e) {
<br />
logger&#46;error(new DisposeException(TcpMessages&#46;failedToCloseSocket(), e, this));
<br />
}
<br />
logger&#46;info(&quot;Closed Tcp port&quot;);
<br />
}
<br /><br />
protected Socket getSocket(ImmutableEndpoint endpoint) throws IOException {
<br />
return new Socket(endpoint&#46;getEndpointURI()&#46;getHost(),
<br />
endpoint&#46;getEndpointURI()&#46;getPort());
<br />
}
<br /><br />
private byte[] createServerResponseMessage(byte[] requestMessage) throws Exception {
<br />
&#47;&#47; create a response appropriate for remote server requests
<br />
return new byte[20];
<br /><br />
}
<br /><br />
protected class SocketListener implements Runnable {
<br />
public void run() {
<br />
while (true) {
<br />
try {
<br />
Thread&#46;sleep(30000);
<br />
receiveAndProcessSyncResponseFromSocket(endpoint&#46;getRemoteSyncTimeout(), null, endpoint&#46;getEndpointURI());
<br />
}
<br />
catch (InterruptedException e) {
<br />
e&#46;printStackTrace();
<br />
}
<br />
catch (Exception e) {
<br />
e&#46;printStackTrace();
<br />
}
<br />
}
<br />
}
<br />
}
<br />
}
<br />
</code>
<br /><br />
I'm still waiting for your recommendations.
<br /><br />
Sincerely.

---------------------------------------------------------------------
To unsubscribe from this list, please visit:

http://xircles.codehaus.org/manage_email

Loading...