Skip to content

Commit

Permalink
Fix TCP-search bug in server and client
Browse files Browse the repository at this point in the history
  • Loading branch information
kasemir committed Jun 18, 2024
1 parent f23fcdb commit 1b6fd22
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 25 deletions.
10 changes: 6 additions & 4 deletions src/core/com/cosylab/epics/caj/cas/handlers/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,16 @@ private void searchResponse(InetSocketAddress responseFrom, CASTransport tcp, sh
try
{
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), (short)dataCount, cid);
// UDP includes payload (version) in reply, TCP has no payload
if (tcp == null)
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, true, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "UDP EXISTS_HERE search reply");
context.getBroadcastTransport().send(searchRequest, responseFrom);
}
else
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, false, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "TCP EXISTS_HERE search reply");
tcp.send(searchRequest.getRequestMessage());
}
Expand All @@ -155,18 +157,18 @@ private void searchResponse(InetSocketAddress responseFrom, CASTransport tcp, sh
}
else if (completion.doesExistElsewhere())
{
// send back
// Same comments as for EXISTS_HERE
try
{
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), (short)dataCount, cid);
if (tcp == null)
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), true, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "UDP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
context.getBroadcastTransport().send(searchRequest, responseFrom);
}
else
{
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), false, (short)dataCount, cid);
context.getLogger().log(Level.FINE, "TCP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
tcp.send(searchRequest.getRequestMessage());
}
Expand Down
30 changes: 12 additions & 18 deletions src/core/com/cosylab/epics/caj/cas/requests/SearchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,14 @@ public class SearchRequest extends AbstractCARequest {

private static final Logger logger = Logger.getLogger(SearchRequest.class.getName());

/**
* @param transport Transport to use
* @param clientMinorVersion
* @param cid Channel ID
*/
public SearchRequest(Transport transport, short clientMinorVersion, int cid)
{
this(transport, null, clientMinorVersion, cid);
}

/**
* @param transport Transport to use
* @param other_address Optional other address to report, null to use this transport
* @param with_version Include minor version in payload (for UDP), or no payload (for TCP)?
* @param clientMinorVersion
* @param cid Channel ID
*/
public SearchRequest(Transport transport, InetSocketAddress other_address, short clientMinorVersion, int cid) {
public SearchRequest(Transport transport, InetSocketAddress other_address, boolean with_version, short clientMinorVersion, int cid) {
super(transport);

// add minor version payload (aligned by 8)
Expand All @@ -72,17 +63,20 @@ public SearchRequest(Transport transport, InetSocketAddress other_address, short
: other_address.getAddress();
if (serverAddress != null && !serverAddress.isAnyLocalAddress())
serverIP = InetAddressUtil.ipv4AddressToInt(serverAddress);
logger.log(Level.FINE, "Replying to search with " + serverAddress + ":" + port);
}

requestMessage = insertCAHeader(transport, requestMessage,
(short)6, (short)8, (short)port, 0,
(short)6, (short)(with_version ? 8 : 0), (short)port, 0,
serverIP, cid);

requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
// clear rest of the message (security)
requestMessage.putShort((short)0);
requestMessage.putInt(0);
if (with_version)
{
requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
// clear rest of the message (security)
requestMessage.putShort((short)0);
requestMessage.putInt(0);
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", minor " + CAConstants.CAS_MINOR_PROTOCOL_REVISION);
}
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", no payload");
}

/**
Expand Down
55 changes: 52 additions & 3 deletions src/core/com/cosylab/epics/caj/impl/handlers/SearchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.cosylab.epics.caj.impl.CATransport;
import com.cosylab.epics.caj.impl.Transport;
import com.cosylab.epics.caj.util.InetAddressUtil;
import com.cosylab.epics.caj.util.HexDump;

/**
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
Expand Down Expand Up @@ -53,21 +54,69 @@ protected void internalHandleResponse(
//

short minorVersion = CAConstants.CA_UNKNOWN_MINOR_PROTOCOL_REVISION;

// Search response via UDP: Only response[0]
// Payload size 8, with only a 'short minor' in payload
//
// Hexdump [response[0] @ 16] size = 24
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// 00 0B 00 00 00 00 00 00
// or
// Hexdump [response[0] @ 32] size = 40
// 00 00 00 00 00 01 00 0D 00 00 00 01 00 00 00 00 .... .... .... ....
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// 00 0D 00 00 00 00 00 00 .... ....

// Search response via TCP: Two buffers
// No payload, second buffer is empty
//
// Hexdump [response[0] @ 16] size = 16
// 00 06 00 00 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// Hexdump [response[1] @ 0] size = 0

// Earlier CAJ server server added the 8 byte payload to TCP response,
// which caused client to crash.
// Now handle that as well in client.
// Hexdump [response[0] @ 16] size = 16
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
// Hexdump [response[1] @ 0] size = 8
// 00 0B 00 00 00 00 00 00

// System.out.println("Client received search response");
// System.out.println(response[0]);
// if (response.length > 1)
// System.out.println(response[1]);
//
// byte[] data = new byte[response[0].limit()];
// for (int i=0; i<response[0].limit(); ++i)
// data[i] = response[0].get(i);
// HexDump.hexDump("response[0] @ " + response[0].position(), data, 0, response[0].limit());
// if (response.length > 1)
// {
// data = new byte[response[1].limit()];
// for (int i=0; i<response[1].limit(); ++i)
// data[i] = response[1].get(i);
// HexDump.hexDump("response[1] @ " + response[1].position(), data, 0, response[1].limit());
// }

// Starting with CA V4.1 the minor version number is
// appended to the end of each search reply.
int payloadStart = response[0].position();
if (payloadSize >= 2 /* short size = 2 bytes */)
{
// UDP response (all in buffer 0)
minorVersion = response[0].getShort();
// UDP response (all in buffer 0), or TCP (payload in buffer 1)?
if (response.length == 1)
minorVersion = response[0].getShort();
else
minorVersion = response[1].getShort();
} else if(transport instanceof CATransport) {
// for TCP transport use already provided version
minorVersion = transport.getMinorRevision();
}

// read rest of the playload (needed for UDP)
response[0].position(payloadStart + payloadSize);
if (response.length == 1)
response[0].position(payloadStart + payloadSize);

// signed short conversion -> signed int
int port = dataType & 0xFFFF;
Expand Down
1 change: 1 addition & 0 deletions test/com/cosylab/epics/caj/cas/test/CANameServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* via UDP port 5064, while the IOC runs on 9876 (UDP and TCP).
*
* Now run this CANameServer on the same host,
* java -cp target/classes:target/test-classes -DCAJ_DEBUG=true com.cosylab.epics.caj.cas.test.CANameServer
* and try `caget ramp` again.
* The client will reach the name server via UDP 5064.
* The name server replies with 127.0.0.1, port 9876
Expand Down
48 changes: 48 additions & 0 deletions test/com/cosylab/epics/caj/test/caget.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.cosylab.epics.caj.test;

import gov.aps.jca.JCALibrary;
import gov.aps.jca.Context;
import com.cosylab.epics.caj.CAJContext;
import gov.aps.jca.Channel;
import gov.aps.jca.dbr.DBR;
import gov.aps.jca.dbr.DBRType;
import gov.aps.jca.dbr.DBR_Double;

/** Bad but simple 'caget' demo
* Better implementation would use listeners
* instead of fixed wait times,
* handle all data types,
* and properly close channel and context.
*
* Example invocation:
* java -cp target/classes:target/test-classes -DCAJ_DEBUG=true -Djca.use_env=true com.cosylab.epics.caj.test.caget PV_name
*/
public class caget
{
public static void main(String[] args) throws Exception
{
if (args.length != 1)
{
System.err.println("USAGE: caget PV_name");
System.exit(1);
}
final String name = args[0];

final JCALibrary jca = JCALibrary.getInstance();
final Context context = new CAJContext();
final Channel channel = context.createChannel(name);
context.pendIO(2.0);

DBR data = channel.get(DBRType.DOUBLE, 1);
context.pendIO(2.0);

if (data instanceof DBR_Double)
{
DBR_Double value = (DBR_Double) data;
System.out.println(name + " = " + value.getDoubleValue()[0]);
}
else
System.out.println(data);
System.exit(0);
}
}
6 changes: 6 additions & 0 deletions test/resources/ramp.db
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
record(calc, "ramp")
{
field(SCAN, "1 second")
field(CALC, "A+1")
field(INPA, "ramp")
}

0 comments on commit 1b6fd22

Please sign in to comment.