Atavism Version 2018.1AGIS API

atavism.msgsys
Class MessageAgent

java.lang.Object
  extended by atavism.msgsys.MessageAgent
All Implemented Interfaces:
MessageIO.Callback, ResponseCallback, TcpAcceptCallback

public class MessageAgent
extends java.lang.Object
implements MessageIO.Callback, TcpAcceptCallback, ResponseCallback

Application interface to the atavism message system. The message system provides both publish/subscribe and RPC communication within a message domain.

Each process has a single MessageAgent connected to the DomainServer. Agents are identified in the domain with a unique self-assigned name.

Agents must advertise the message types they intend to publish. The advertisements are used to distribute subscriptions to those agents that might publish a matching message.

Joining the message domain follows the pattern:

    MessageAgent agent = new MessageAgent("my-agent-name");
    List myMessageTypes = getMyAdvertisements();
    agent.addAdvertisements(myMessageTypes);
    try {
        agent.openListener();
        agent.connectToDomain(domainHostName, domainPort);
        agent.waitForRemoteAgents();
    }
    catch (Exception ex) {
        System.err.println("Join message domain: "+ex);
        System.exit(1);
    }
Agents subscribe to messages using a message Filter. A filter is typically one or more message types and optional matching criteria. The subscription matches messages of the same message type and criteria. The criteria may be anything in the message content. Filter matching is done by both the message sender and recipient. The sender need only match a single subscription to deliver a message to a subscribing agent.

A subscription may be a RESPONDER indicating that matching messages will return a response message. Message RPCs only match responder subscriptions. An RPC responder may return only one response message. A broadcast RPC sender may receive multiple responses, one from each agent with matching RPC responder.

Messages are delivered in the order sent. Message callbacks are invoked by a single thread. Subsequent messages are not delivered until the callback returns. If parallel message processing is desired, the application can implement a thread pool.

Non-blocking RPC response messages are delivered by a fixed size pool of threads.


Nested Class Summary
 class MessageAgent.BlockingRPCState
          This class helps handle blocking broadcast rpc.
 class MessageAgent.DomainClient
           
protected  class MessageAgent.RemoteAgent
           
 
Field Summary
static int BLOCKING
          Block until subscription is delivered and acknowledged by all producing agents.
static int COMPLETION_CALLBACK
          Not implemented.
static int DEFERRED
          Not implemented.
static int DOMAIN_FLAG_TRANSIENT
           
static int NO_FLAGS
          No flags for createSubscription()
static int NON_BLOCKING
          Return after queueing subscription to producing agents.
static int RESPONDER
          Subscription is an RPC responder.
 
Constructor Summary
MessageAgent()
          Use this constructor if you're only going to use the domain APIs.
MessageAgent(java.lang.String name)
          Create a MessageAgent.
 
Method Summary
 void addAdvertisement(MessageType msgType)
          Add a single message type to the list of those this agent will publish
 void addAdvertisements(java.util.List<MessageType> typeIds)
          Add message types this agent will publish.
 void addNoProducersExpected(MessageType messageType)
           
 boolean applyFilterUpdate(long subId, FilterUpdate update)
          Update a subscription filter.
 boolean applyFilterUpdate(long subId, FilterUpdate update, int flags)
          Update a subscription filter.
 boolean applyFilterUpdate(long subId, FilterUpdate update, int flags, Message excludeSender)
          Update a subscription filter.
protected  boolean applyFilterUpdate(long subId, FilterUpdate update, int flags, MessageAgent.RemoteAgent excludeAgent)
           
 void connectToDomain(java.lang.String domainServerHost, java.lang.Integer domainServerPort)
          Connect to the message domain server.
 long createSubscription(IFilter filter, MessageCallback callback)
          Subscribe using filter and message callback.
 long createSubscription(IFilter filter, MessageCallback callback, int flags)
          Subscribe using filter, flags, and message callback.
 long createSubscription(IFilter filter, MessageCallback callback, int flags, MessageTrigger trigger)
          Subscribe using filter, trigger, flags, and message callback.
 java.lang.Integer getAgentId()
          Get the domain-assigned unique agent id.
 long getAppMessageCount()
          Get the number of application messages received by this agent.
 int getDefaultSubscriptionFlags()
          Get the default subscription flags.
 MessageAgent.DomainClient getDomainClient()
           
 int getDomainConnectRetries()
          Number of retries performed by connectToDomain().
 int getDomainFlags()
           
 long getDomainStartTime()
          Get the domain server start time.
 int getListenerPort()
          Get the agent network listener port number.
 java.lang.String getName()
          Get the agent name.
 java.util.Set<MessageType> getNoProducersExpected()
           
 java.util.concurrent.ExecutorService getResponseThreadPool()
          Get the response handling thread pool.
 long getSystemMessageCount()
          Get the number of system messages received by this agent.
 void handleMessageData(int length, AOByteBuffer messageData, AgentInfo agentInfo)
           
 void handleResponse(ResponseMessage message)
           
 void onTcpAccept(java.nio.channels.SocketChannel agentSocket)
           
 void openListener()
          Open network listener to accept connections from other agents.
 void removeAdvertisements(java.util.List<MessageType> typeIds)
          Remove message types this agent will no longer publish.
 boolean removeSubscription(long subId)
          Remove subscription by subscription id.
 boolean removeSubscriptions(java.util.Collection<java.lang.Long> subIds)
          Remove subscriptions by subscription id.
static boolean responseExpected(int flags)
          Check if a handleMessage flag indicates a response is expected.
 void sendBooleanResponse(Message message, java.lang.Boolean booleanVal)
          Boolean response to an RPC message.
 int sendBroadcast(Message message)
          Publish message to subscribing agents.
 int sendBroadcastRPC(Message message, ResponseCallback callback)
          Broadcast a remote procedure call (RPC) message and invoke callback when responses arrive.
 int sendBroadcastRPCBlocking(Message message, ResponseCallback callback)
          Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives.
 int sendBroadcastRPCBlocking(Message message, ResponseCallback callback, long timeout)
          Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives.
 boolean sendDirect(Message message, AgentHandle destination, SubscriptionHandle runTriggers)
          Send message directly to an agent.
 void sendIntegerResponse(Message message, java.lang.Integer intVal)
          Integer response to an RPC message.
 void sendLongResponse(Message message, java.lang.Long longVal)
          Long response to an RPC message.
 void sendObjectResponse(Message message, java.lang.Object object)
          Object response to an RPC message.
 void sendOIDResponse(Message message, OID oidVal)
          Long response to an RPC message.
 void sendResponse(ResponseMessage message)
          Respond to an RPC message.
 Message sendRPC(Message message)
          Send a remote procedure call (RPC) message and wait for response.
 Message sendRPC(Message message, long timeout)
          Send a remote procedure call (RPC) message and wait for response.
 void sendRPC(Message message, ResponseCallback callback)
          Send a remote procedure call (RPC) message and invoke callback when the response arrives.
 java.lang.Boolean sendRPCReturnBoolean(Message message)
          Send a remote procedure call (RPC) message and wait for boolean response.
 java.lang.Integer sendRPCReturnInt(Message message)
          Send a remote procedure call (RPC) message and wait for integer response.
 java.lang.Long sendRPCReturnLong(Message message)
          Send a remote procedure call (RPC) message and wait for long response.
 java.lang.Object sendRPCReturnObject(Message message)
          Send a remote procedure call (RPC) message and wait for object response.
 OID sendRPCReturnOID(Message message)
          Send a remote procedure call (RPC) message and wait for OID response.
 java.lang.String sendRPCReturnString(Message message)
          Send a remote procedure call (RPC) message and wait for string response.
 void sendStringResponse(Message message, java.lang.String stringVal)
          String response to an RPC message.
 void setAdvertisementFileName(java.lang.String fileName)
          Name used in error log when process publishes a message it does not advertise.
 void setAdvertisements(java.util.Collection<MessageType> typeIds)
          Advertise the message types this agent will publish.
 void setDefaultSubscriptionFlags(int flags)
          Set the default subscription flags.
 void setDomainConnectRetries(int retries)
          Set the number of retries performed by connectToDomain().
 void setDomainFlags(int flags)
           
 void setResponseThreadPool(java.util.concurrent.ExecutorService threadPool)
          Set the response handling thread pool.
 void startStatsThread()
           
 void waitForRemoteAgents()
          Wait until all known agents are connected and we have their advertisements.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DOMAIN_FLAG_TRANSIENT

public static final int DOMAIN_FLAG_TRANSIENT
See Also:
Constant Field Values

NO_FLAGS

public static final int NO_FLAGS
No flags for createSubscription()

See Also:
Constant Field Values

BLOCKING

public static final int BLOCKING
Block until subscription is delivered and acknowledged by all producing agents.

See Also:
Constant Field Values

COMPLETION_CALLBACK

public static final int COMPLETION_CALLBACK
Not implemented.

See Also:
Constant Field Values

DEFERRED

public static final int DEFERRED
Not implemented.

See Also:
Constant Field Values

RESPONDER

public static final int RESPONDER
Subscription is an RPC responder.

See Also:
Constant Field Values

NON_BLOCKING

public static final int NON_BLOCKING
Return after queueing subscription to producing agents.

See Also:
Constant Field Values
Constructor Detail

MessageAgent

public MessageAgent(java.lang.String name)
Create a MessageAgent.

Parameters:
name - Agent name, unique within the domain.

MessageAgent

public MessageAgent()
Use this constructor if you're only going to use the domain APIs.

Method Detail

getName

public java.lang.String getName()
Get the agent name.


getAgentId

public java.lang.Integer getAgentId()
Get the domain-assigned unique agent id.


getDomainStartTime

public long getDomainStartTime()
Get the domain server start time.


getListenerPort

public int getListenerPort()
Get the agent network listener port number.

Returns:
Port number or -1 if the listener is not open.

openListener

public void openListener()
                  throws java.io.IOException
Open network listener to accept connections from other agents. Must be called before connecting to the domain. The agent will not actually accept connections until connectToDomain(java.lang.String, java.lang.Integer) is called. Listening port is selected by the operating system.

Throws:
java.io.IOException

setAdvertisements

public void setAdvertisements(java.util.Collection<MessageType> typeIds)
Advertise the message types this agent will publish. Should be called before connecting to the domain.


addAdvertisement

public void addAdvertisement(MessageType msgType)
Add a single message type to the list of those this agent will publish


addAdvertisements

public void addAdvertisements(java.util.List<MessageType> typeIds)
Add message types this agent will publish.


removeAdvertisements

public void removeAdvertisements(java.util.List<MessageType> typeIds)
Remove message types this agent will no longer publish.


setAdvertisementFileName

public void setAdvertisementFileName(java.lang.String fileName)
Name used in error log when process publishes a message it does not advertise.


addNoProducersExpected

public void addNoProducersExpected(MessageType messageType)

getNoProducersExpected

public java.util.Set<MessageType> getNoProducersExpected()

getDomainFlags

public int getDomainFlags()

setDomainFlags

public void setDomainFlags(int flags)

getDomainConnectRetries

public int getDomainConnectRetries()
Number of retries performed by connectToDomain().


setDomainConnectRetries

public void setDomainConnectRetries(int retries)
Set the number of retries performed by connectToDomain(). The default is Integer.MAX_VALUE.


connectToDomain

public void connectToDomain(java.lang.String domainServerHost,
                            java.lang.Integer domainServerPort)
                     throws java.io.IOException,
                            java.net.UnknownHostException,
                            AORuntimeException
Connect to the message domain server. Should be called before creating subscriptions or sending messages. If successful, the agent will accept connections from other agents and begin contacting other agents.

Throws:
java.io.IOException
java.net.UnknownHostException
AORuntimeException

getDomainClient

public MessageAgent.DomainClient getDomainClient()

waitForRemoteAgents

public void waitForRemoteAgents()
Wait until all known agents are connected and we have their advertisements. Should be called after connectToDomain(java.lang.String, java.lang.Integer).


setDefaultSubscriptionFlags

public void setDefaultSubscriptionFlags(int flags)
Set the default subscription flags. These flags are combined (bit-wise OR) with the flags passed to createSubscription. If the default flags includes NON_BLOCKING then the BLOCKING flag is removed.


getDefaultSubscriptionFlags

public int getDefaultSubscriptionFlags()
Get the default subscription flags.


createSubscription

public long createSubscription(IFilter filter,
                               MessageCallback callback)
Subscribe using filter and message callback. The subscription uses the default subscription flags and has no message trigger. Matching messages are delivered to the message callback.

Parameters:
filter - Messages matching the filter are delivered to the callback
callback - MessageCallback.handleMessage(atavism.msgsys.Message, int) is called for each matching message.
Returns:
Subscription id

createSubscription

public long createSubscription(IFilter filter,
                               MessageCallback callback,
                               int flags)
Subscribe using filter, flags, and message callback. The given flags are combined with the default subscription flags (see setDefaultSubscriptionFlags(int)). Matching messages are delivered to the message callback.

Parameters:
filter - Messages matching the filter are delivered to the callback
callback - MessageCallback.handleMessage(atavism.msgsys.Message, int) is called for each matching message.
flags - Subscription flags (example: RESPONDER)
Returns:
Subscription id

createSubscription

public long createSubscription(IFilter filter,
                               MessageCallback callback,
                               int flags,
                               MessageTrigger trigger)
Subscribe using filter, trigger, flags, and message callback. The given flags are combined with the default subscription flags (see setDefaultSubscriptionFlags(int)). Matching messages are delivered to the message callback.

The caller should not modify the filter without a matching FilterUpdate.

Parameters:
filter - Messages matching the filter are delivered to the callback.
callback - MessageCallback.handleMessage(atavism.msgsys.Message, int) is called for each matching message.
flags - Subscription flags (example: RESPONDER)
trigger - Message trigger to run on the producing agent each time a message matches the filter.
Returns:
Subscription id

removeSubscription

public boolean removeSubscription(long subId)
Remove subscription by subscription id.

Returns:
False if the subscription id does not exist, true otherwise.

removeSubscriptions

public boolean removeSubscriptions(java.util.Collection<java.lang.Long> subIds)
Remove subscriptions by subscription id.

Returns:
False if any of the subscription id do not exist, true otherwise.

responseExpected

public static boolean responseExpected(int flags)
Check if a handleMessage flag indicates a response is expected. Useful for messages that may be published with or without RPC.

Returns:
True if a response is expected from the callback.

applyFilterUpdate

public boolean applyFilterUpdate(long subId,
                                 FilterUpdate update)
Update a subscription filter. The filter update is forwarded to all subscription producers. The local instance of the filter is not modified.

Parameters:
subId - Subscription to update
update - Filter update instructions
Returns:
False if the subscription does not exist, true otherwise.

applyFilterUpdate

public boolean applyFilterUpdate(long subId,
                                 FilterUpdate update,
                                 int flags)
Update a subscription filter. The filter update is forwarded to all subscription producers. The local instance of the filter is not modified.

Parameters:
subId - Subscription to update
update - Filter update instructions
Returns:
False if the subscription does not exist, true otherwise.

applyFilterUpdate

public boolean applyFilterUpdate(long subId,
                                 FilterUpdate update,
                                 int flags,
                                 Message excludeSender)
Update a subscription filter. The filter update is forwarded to all subscription producers, except the sender of excludeSender. The local instance of the filter is not modified.

Parameters:
subId - Subscription to update
update - Filter update instructions
excludeSender - Do not send update to the sender of this message.
Returns:
False if the subscription does not exist, true otherwise.

applyFilterUpdate

protected boolean applyFilterUpdate(long subId,
                                    FilterUpdate update,
                                    int flags,
                                    MessageAgent.RemoteAgent excludeAgent)

sendBroadcast

public int sendBroadcast(Message message)
Publish message to subscribing agents. MessageTriggers are run prior to queueing the message. Returns after the message is queued.

Returns:
Number of subscribing agents.

sendDirect

public boolean sendDirect(Message message,
                          AgentHandle destination,
                          SubscriptionHandle runTriggers)
Send message directly to an agent. The agent is identified by an AgentHandle. AgentHandles are passed to Filter.applyFilterUpdate() to identify the sender of the filter update. Likewise, the SubscriptionHandle is passed to Filter.applyFilterUpdate().

Parameters:
message - The message.
destination - The agent.
runTriggers - Subscription from which to run triggers prior to sending the message.
Returns:
True on success, false on failure.

sendRPC

public Message sendRPC(Message message)
Send a remote procedure call (RPC) message and wait for response.

Parameters:
message - the message with the rpc information
Returns:
Response message.
Throws:
RPCTimeoutException - If we time out waiting for a response
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPC

public Message sendRPC(Message message,
                       long timeout)
Send a remote procedure call (RPC) message and wait for response.

Parameters:
message - the message with the rpc information
timeout - the timeout for the call or 0 to wait indefinitely
Returns:
the response message
Throws:
RPCTimeoutException - If we time out waiting for a response
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnBoolean

public java.lang.Boolean sendRPCReturnBoolean(Message message)
Send a remote procedure call (RPC) message and wait for boolean response. The recipient must respond with sendBooleanResponse or a BooleanResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnInt

public java.lang.Integer sendRPCReturnInt(Message message)
Send a remote procedure call (RPC) message and wait for integer response. The recipient must respond with sendIntegerResponse or a IntegerResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnLong

public java.lang.Long sendRPCReturnLong(Message message)
Send a remote procedure call (RPC) message and wait for long response. The recipient must respond with sendLongResponse or a LongResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnOID

public OID sendRPCReturnOID(Message message)
Send a remote procedure call (RPC) message and wait for OID response. The recipient must respond with sendOIDResponse or a OIDResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnString

public java.lang.String sendRPCReturnString(Message message)
Send a remote procedure call (RPC) message and wait for string response. The recipient must respond with sendStringResponse or a StringResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPCReturnObject

public java.lang.Object sendRPCReturnObject(Message message)
Send a remote procedure call (RPC) message and wait for object response. The recipient must respond with sendObjectResponse or a GenericResponseMessage.

Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.
RPCException - If the remote RPC handler threw an exception while handling the message.

sendRPC

public void sendRPC(Message message,
                    ResponseCallback callback)
Send a remote procedure call (RPC) message and invoke callback when the response arrives. Returns after the message is queued to the producing agent.

If the RPC handler throws an exception, the callback will receive an ExceptionResponseMessage instead of the expected message.

Parameters:
message - RPC message.
callback - ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage) is called with the response message.
Throws:
NoRecipientsException - If there are no message subscribers
MultipleRecipientsException - If there is more than one message subscriber.

sendBroadcastRPC

public int sendBroadcastRPC(Message message,
                            ResponseCallback callback)
Broadcast a remote procedure call (RPC) message and invoke callback when responses arrive. Returns after the message is queued to the producing agents.

If the RPC handler throws an exception, the callback will receive an ExceptionResponseMessage instead of the expected message.

Parameters:
message - RPC message.
callback - ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage) is called for each response message.
Returns:
Number of subscribing agents.

sendBroadcastRPCBlocking

public int sendBroadcastRPCBlocking(Message message,
                                    ResponseCallback callback)
Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives. Returns after all of the producing agents have responded.

If the RPC handler throws an exception, the callback will receive an ExceptionResponseMessage instead of the expected message.

Parameters:
message - RPC message.
callback - ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage) is called for each response message.
Returns:
Number of subscribing agents.

sendBroadcastRPCBlocking

public int sendBroadcastRPCBlocking(Message message,
                                    ResponseCallback callback,
                                    long timeout)
Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives. Returns after all of the producing agents have responded.

If the RPC handler throws an exception, the callback will receive an ExceptionResponseMessage instead of the expected message.

Parameters:
message - RPC message.
callback - ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage) is called for each response message.
timeout - the amount of time to wait for all of our responses, or 0 to wait indefinitely
Returns:
Number of subscribing agents.

sendResponse

public void sendResponse(ResponseMessage message)
Respond to an RPC message. Typically called within a message callback.


sendBooleanResponse

public void sendBooleanResponse(Message message,
                                java.lang.Boolean booleanVal)
Boolean response to an RPC message. Sends a BooleanResponseMessage.

Parameters:
message - The request message.
booleanVal - Boolean response.

sendIntegerResponse

public void sendIntegerResponse(Message message,
                                java.lang.Integer intVal)
Integer response to an RPC message. Sends a IntegerResponseMessage.

Parameters:
message - The request message.
intVal - Integer response.

sendLongResponse

public void sendLongResponse(Message message,
                             java.lang.Long longVal)
Long response to an RPC message. Sends a LongResponseMessage.

Parameters:
message - The request message.
longVal - Long response.

sendOIDResponse

public void sendOIDResponse(Message message,
                            OID oidVal)
Long response to an RPC message. Sends a LongResponseMessage.

Parameters:
message - The request message.
longVal - Long response.

sendStringResponse

public void sendStringResponse(Message message,
                               java.lang.String stringVal)
String response to an RPC message. Sends a StringResponseMessage.

Parameters:
message - The request message.
stringVal - String response.

sendObjectResponse

public void sendObjectResponse(Message message,
                               java.lang.Object object)
Object response to an RPC message. Sends a GenericResponseMessage.

Parameters:
message - The request message.
object - Object response.

getResponseThreadPool

public java.util.concurrent.ExecutorService getResponseThreadPool()
Get the response handling thread pool. Defaults to a fixed pool of 10 threads.


setResponseThreadPool

public void setResponseThreadPool(java.util.concurrent.ExecutorService threadPool)
Set the response handling thread pool.


handleResponse

public void handleResponse(ResponseMessage message)
Specified by:
handleResponse in interface ResponseCallback

getAppMessageCount

public long getAppMessageCount()
Get the number of application messages received by this agent.


getSystemMessageCount

public long getSystemMessageCount()
Get the number of system messages received by this agent.


startStatsThread

public void startStatsThread()

onTcpAccept

public void onTcpAccept(java.nio.channels.SocketChannel agentSocket)
Specified by:
onTcpAccept in interface TcpAcceptCallback

handleMessageData

public void handleMessageData(int length,
                              AOByteBuffer messageData,
                              AgentInfo agentInfo)
Specified by:
handleMessageData in interface MessageIO.Callback


Copyright © 2018 Dragonsan Studios Sp. z o.o.