Dear david.
<br /><br />
Excuse me for being late in response.
<br /><br />
Here is settings in config files :
<br /><br />
1) Connector definition:
<br /><br />
<code>
<br />
<tcp:connector name="myConnector" sendTcpNoDelay="true"
<br />
sendBufferSize="8192" receiveBufferSize="8192" createMultipleTransactedReceivers="true"
<br />
numberOfConcurrentTransactedReceivers="30" keepSendSocketOpen="true">
<br />
<spring:property name="tcpProtocol" ref="myProtocol"/>
<br />
</tcp:connector>
<br />
</code>
<br /><br />
2) Dispatcher endpoint :
<br /><br />
<code>
<br />
<tcp:endpoint name="dispatchEndpoint" encoding="8859_1"
<br />
address="tcp://127.0.0.1:5002" connector-ref="myConnector" remoteSync="true"
<br />
remoteSyncTimeout="6000">
<br />
</tcp:endpoint>
<br />
</code>
<br /><br />
3) Dispatcher class :
<br /><br />
<code>
<br />
public class MyTcpMessageDispatcher extends AbstractMessageDispatcher {
<br /><br />
private Socket socket;
<br />
private final TcpConnector connector;
<br /><br />
public MyTcpMessageDispatcher(OutboundEndpoint endpoint) {
<br />
super(endpoint);
<br />
this.connector = (TcpConnector) endpoint.getConnector();
<br />
SocketListener listener = new SocketListener();
<br />
new Thread(listener).start();
<br />
}
<br /><br />
protected void doConnect() throws Exception {
<br />
disposing.set(false);
<br />
}
<br /><br />
protected void doDisconnect() throws ConnectException {
<br />
disposing.set(true);
<br />
try {
<br />
if (socket != null) {
<br />
if (logger.isDebugEnabled()) {
<br />
logger.debug("Closing: " + socket);
<br />
}
<br />
socket.close();
<br />
}
<br />
} catch (IOException e) {
<br />
logger.warn("Failed to close server socket: " + e.getMessage(), e);
<br />
}
<br />
}
<br /><br />
protected synchronized MuleMessage doSend(MuleEvent event) throws Exception {
<br />
if(socket == null || !socket.isConnected()) {
<br />
socket = getSocket(event.getEndpoint());
<br />
}
<br />
dispatchToSocket(socket, event);
<br />
if (useRemoteSync(event)) {
<br />
return receiveAndProcessSyncResponseFromSocket(event.getTimeout(), event.getMessage(),
<br />
event.getEndpoint().getEndpointURI());
<br />
}
<br />
else {
<br />
return event.getMessage();
<br />
}
<br />
}
<br /><br />
private MuleMessage receiveAndProcessSyncResponseFromSocket(int timeout, MuleMessage muleMessage,
<br />
EndpointURI endpointURI) throws Exception {
<br />
try {
<br />
Object result = receiveFromSocket(socket, timeout, endpoint);
<br />
if (!isResponseMessage(result)) {
<br />
byte[] response = createServerResponseMessage((byte[]) result);
<br />
write(socket, response);
<br />
return null;
<br />
}
<br />
else {
<br />
if (result == null) {
<br />
return null;
<br />
}
<br />
if (result instanceof MuleMessage) {
<br />
return (MuleMessage) result;
<br />
}
<br />
return MuleMessageUtil.createDefaultMessageFromMessage(muleMessage,
<br />
connector.getMessageAdapter(result));
<br />
}
<br />
}
<br />
catch (SocketTimeoutException e) {
<br />
logger.info("Socket timed out normally while doing a synchronous receive on endpointUri: "
<br />
+ endpointURI);
<br />
throw new ResponseTimeoutException("Remote system timeout happened!");
<br />
}
<br />
catch (NullPointerException ex) {
<br />
logger.info("Nothing received from endpointUri: "
<br />
+ endpointURI);
<br />
return null;
<br />
}
<br />
catch (Exception ex) {
<br />
logger.info("Error receiving synch response from endpointUri: "
<br />
+ endpointURI);
<br />
throw new EndpointException(ex);
<br />
}
<br />
}
<br /><br />
private boolean isResponseMessage(Object receivedMessage) {
<br />
//check if received data is a response sent by remote server or is a request received from server;
<br />
return true;
<br />
}
<br /><br />
protected synchronized void doDispatch(MuleEvent event) throws Exception {
<br />
if(socket == null || !socket.isConnected()) {
<br />
socket = getSocket(event.getEndpoint());
<br />
}
<br />
try {
<br />
dispatchToSocket(socket, event);
<br />
}
<br />
finally {
<br />
}
<br />
}
<br /><br />
private void dispatchToSocket(Socket socket, MuleEvent event) throws Exception {
<br />
Object payload = event.transformMessage();
<br />
write(socket, payload);
<br />
}
<br /><br />
private void write(Socket socket, Object data) throws IOException, TransformerException {
<br />
BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
<br />
connector.getTcpProtocol().write(bos, data);
<br />
bos.flush();
<br />
}
<br /><br />
protected static Object receiveFromSocket(final Socket socket, int timeout, final ImmutableEndpoint endpoint)
<br />
throws IOException {
<br />
final TcpConnector connector = (TcpConnector) endpoint.getConnector();
<br />
DataInputStream underlyingIs = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
<br />
if (timeout >= 0) {
<br />
socket.setSoTimeout(timeout);
<br />
}
<br /><br />
try {
<br />
return connector.getTcpProtocol().read(underlyingIs);
<br />
}
<br />
finally {
<br />
}
<br />
}
<br /><br />
protected void doDispose() {
<br />
try {
<br />
if (socket != null && !socket.isClosed()) {
<br />
if (logger.isDebugEnabled()) {
<br />
logger.debug("Closing: " + socket);
<br />
}
<br />
socket.close();
<br />
}
<br />
socket = null;
<br />
} catch (Exception e) {
<br />
logger.error(new DisposeException(TcpMessages.failedToCloseSocket(), e, this));
<br />
}
<br />
logger.info("Closed Tcp port");
<br />
}
<br /><br />
protected Socket getSocket(ImmutableEndpoint endpoint) throws IOException {
<br />
return new Socket(endpoint.getEndpointURI().getHost(),
<br />
endpoint.getEndpointURI().getPort());
<br />
}
<br /><br />
private byte[] createServerResponseMessage(byte[] requestMessage) throws Exception {
<br />
// create a response appropriate for remote server requests
<br />
return new byte[20];
<br /><br />
}
<br /><br />
protected class SocketListener implements Runnable {
<br />
public void run() {
<br />
while (true) {
<br />
try {
<br />
Thread.sleep(30000);
<br />
receiveAndProcessSyncResponseFromSocket(endpoint.getRemoteSyncTimeout(), null, endpoint.getEndpointURI());
<br />
}
<br />
catch (InterruptedException e) {
<br />
e.printStackTrace();
<br />
}
<br />
catch (Exception e) {
<br />
e.printStackTrace();
<br />
}
<br />
}
<br />
}
<br />
}
<br />
}
<br />
</code>
<br /><br />
I'm still waiting for your recommendations.
<br /><br />
Sincerely.
---------------------------------------------------------------------
To unsubscribe from this list, please visit:
http://xircles.codehaus.org/manage_email