|
|||||||||
Atavism Version 2018.1 | AGIS API | ||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectatavism.msgsys.MessageAgent
public class MessageAgent
Application interface to the atavism message system. The message system provides both publish/subscribe and RPC communication within a message domain.
Each process has a single MessageAgent connected to the DomainServer
.
Agents are identified in the domain with a unique self-assigned name.
Agents must advertise the message types they intend to publish. The advertisements are used to distribute subscriptions to those agents that might publish a matching message.
Joining the message domain follows the pattern:
MessageAgent agent = new MessageAgent("my-agent-name"); ListAgents subscribe to messages using a messagemyMessageTypes = getMyAdvertisements(); agent.addAdvertisements(myMessageTypes); try { agent.openListener(); agent.connectToDomain(domainHostName, domainPort); agent.waitForRemoteAgents(); } catch (Exception ex) { System.err.println("Join message domain: "+ex); System.exit(1); }
Filter
. A filter is
typically one or more message types and optional matching criteria.
The subscription matches messages of the same message type and criteria.
The criteria may be anything in the message content. Filter matching
is done by both the message sender and recipient. The sender need only
match a single subscription to deliver a message to a subscribing agent.
A subscription may be a RESPONDER
indicating that matching
messages will return a response message. Message RPCs only match
responder subscriptions. An RPC responder may return only one response
message. A broadcast RPC sender may receive multiple responses, one from
each agent with matching RPC responder.
Messages are delivered in the order sent. Message callbacks are invoked by a single thread. Subsequent messages are not delivered until the callback returns. If parallel message processing is desired, the application can implement a thread pool.
Non-blocking RPC response messages are delivered by a fixed size pool of threads.
Nested Class Summary | |
---|---|
class |
MessageAgent.BlockingRPCState
This class helps handle blocking broadcast rpc. |
class |
MessageAgent.DomainClient
|
protected class |
MessageAgent.RemoteAgent
|
Field Summary | |
---|---|
static int |
BLOCKING
Block until subscription is delivered and acknowledged by all producing agents. |
static int |
COMPLETION_CALLBACK
Not implemented. |
static int |
DEFERRED
Not implemented. |
static int |
DOMAIN_FLAG_TRANSIENT
|
static int |
NO_FLAGS
No flags for createSubscription() |
static int |
NON_BLOCKING
Return after queueing subscription to producing agents. |
static int |
RESPONDER
Subscription is an RPC responder. |
Constructor Summary | |
---|---|
MessageAgent()
Use this constructor if you're only going to use the domain APIs. |
|
MessageAgent(java.lang.String name)
Create a MessageAgent. |
Method Summary | |
---|---|
void |
addAdvertisement(MessageType msgType)
Add a single message type to the list of those this agent will publish |
void |
addAdvertisements(java.util.List<MessageType> typeIds)
Add message types this agent will publish. |
void |
addNoProducersExpected(MessageType messageType)
|
boolean |
applyFilterUpdate(long subId,
FilterUpdate update)
Update a subscription filter. |
boolean |
applyFilterUpdate(long subId,
FilterUpdate update,
int flags)
Update a subscription filter. |
boolean |
applyFilterUpdate(long subId,
FilterUpdate update,
int flags,
Message excludeSender)
Update a subscription filter. |
protected boolean |
applyFilterUpdate(long subId,
FilterUpdate update,
int flags,
MessageAgent.RemoteAgent excludeAgent)
|
void |
connectToDomain(java.lang.String domainServerHost,
java.lang.Integer domainServerPort)
Connect to the message domain server. |
long |
createSubscription(IFilter filter,
MessageCallback callback)
Subscribe using filter and message callback. |
long |
createSubscription(IFilter filter,
MessageCallback callback,
int flags)
Subscribe using filter, flags, and message callback. |
long |
createSubscription(IFilter filter,
MessageCallback callback,
int flags,
MessageTrigger trigger)
Subscribe using filter, trigger, flags, and message callback. |
java.lang.Integer |
getAgentId()
Get the domain-assigned unique agent id. |
long |
getAppMessageCount()
Get the number of application messages received by this agent. |
int |
getDefaultSubscriptionFlags()
Get the default subscription flags. |
MessageAgent.DomainClient |
getDomainClient()
|
int |
getDomainConnectRetries()
Number of retries performed by connectToDomain() . |
int |
getDomainFlags()
|
long |
getDomainStartTime()
Get the domain server start time. |
int |
getListenerPort()
Get the agent network listener port number. |
java.lang.String |
getName()
Get the agent name. |
java.util.Set<MessageType> |
getNoProducersExpected()
|
java.util.concurrent.ExecutorService |
getResponseThreadPool()
Get the response handling thread pool. |
long |
getSystemMessageCount()
Get the number of system messages received by this agent. |
void |
handleMessageData(int length,
AOByteBuffer messageData,
AgentInfo agentInfo)
|
void |
handleResponse(ResponseMessage message)
|
void |
onTcpAccept(java.nio.channels.SocketChannel agentSocket)
|
void |
openListener()
Open network listener to accept connections from other agents. |
void |
removeAdvertisements(java.util.List<MessageType> typeIds)
Remove message types this agent will no longer publish. |
boolean |
removeSubscription(long subId)
Remove subscription by subscription id. |
boolean |
removeSubscriptions(java.util.Collection<java.lang.Long> subIds)
Remove subscriptions by subscription id. |
static boolean |
responseExpected(int flags)
Check if a handleMessage flag indicates a response is
expected. |
void |
sendBooleanResponse(Message message,
java.lang.Boolean booleanVal)
Boolean response to an RPC message. |
int |
sendBroadcast(Message message)
Publish message to subscribing agents. |
int |
sendBroadcastRPC(Message message,
ResponseCallback callback)
Broadcast a remote procedure call (RPC) message and invoke callback when responses arrive. |
int |
sendBroadcastRPCBlocking(Message message,
ResponseCallback callback)
Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives. |
int |
sendBroadcastRPCBlocking(Message message,
ResponseCallback callback,
long timeout)
Broadcast a remote procedure call (RPC) message and invoke callback when each response arrives. |
boolean |
sendDirect(Message message,
AgentHandle destination,
SubscriptionHandle runTriggers)
Send message directly to an agent. |
void |
sendIntegerResponse(Message message,
java.lang.Integer intVal)
Integer response to an RPC message. |
void |
sendLongResponse(Message message,
java.lang.Long longVal)
Long response to an RPC message. |
void |
sendObjectResponse(Message message,
java.lang.Object object)
Object response to an RPC message. |
void |
sendOIDResponse(Message message,
OID oidVal)
Long response to an RPC message. |
void |
sendResponse(ResponseMessage message)
Respond to an RPC message. |
Message |
sendRPC(Message message)
Send a remote procedure call (RPC) message and wait for response. |
Message |
sendRPC(Message message,
long timeout)
Send a remote procedure call (RPC) message and wait for response. |
void |
sendRPC(Message message,
ResponseCallback callback)
Send a remote procedure call (RPC) message and invoke callback when the response arrives. |
java.lang.Boolean |
sendRPCReturnBoolean(Message message)
Send a remote procedure call (RPC) message and wait for boolean response. |
java.lang.Integer |
sendRPCReturnInt(Message message)
Send a remote procedure call (RPC) message and wait for integer response. |
java.lang.Long |
sendRPCReturnLong(Message message)
Send a remote procedure call (RPC) message and wait for long response. |
java.lang.Object |
sendRPCReturnObject(Message message)
Send a remote procedure call (RPC) message and wait for object response. |
OID |
sendRPCReturnOID(Message message)
Send a remote procedure call (RPC) message and wait for OID response. |
java.lang.String |
sendRPCReturnString(Message message)
Send a remote procedure call (RPC) message and wait for string response. |
void |
sendStringResponse(Message message,
java.lang.String stringVal)
String response to an RPC message. |
void |
setAdvertisementFileName(java.lang.String fileName)
Name used in error log when process publishes a message it does not advertise. |
void |
setAdvertisements(java.util.Collection<MessageType> typeIds)
Advertise the message types this agent will publish. |
void |
setDefaultSubscriptionFlags(int flags)
Set the default subscription flags. |
void |
setDomainConnectRetries(int retries)
Set the number of retries performed by connectToDomain() . |
void |
setDomainFlags(int flags)
|
void |
setResponseThreadPool(java.util.concurrent.ExecutorService threadPool)
Set the response handling thread pool. |
void |
startStatsThread()
|
void |
waitForRemoteAgents()
Wait until all known agents are connected and we have their advertisements. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
public static final int DOMAIN_FLAG_TRANSIENT
public static final int NO_FLAGS
public static final int BLOCKING
public static final int COMPLETION_CALLBACK
public static final int DEFERRED
public static final int RESPONDER
public static final int NON_BLOCKING
Constructor Detail |
---|
public MessageAgent(java.lang.String name)
name
- Agent name, unique within the domain.public MessageAgent()
Method Detail |
---|
public java.lang.String getName()
public java.lang.Integer getAgentId()
public long getDomainStartTime()
public int getListenerPort()
public void openListener() throws java.io.IOException
connectToDomain(java.lang.String, java.lang.Integer)
is called. Listening port is selected by the operating system.
java.io.IOException
public void setAdvertisements(java.util.Collection<MessageType> typeIds)
public void addAdvertisement(MessageType msgType)
public void addAdvertisements(java.util.List<MessageType> typeIds)
public void removeAdvertisements(java.util.List<MessageType> typeIds)
public void setAdvertisementFileName(java.lang.String fileName)
public void addNoProducersExpected(MessageType messageType)
public java.util.Set<MessageType> getNoProducersExpected()
public int getDomainFlags()
public void setDomainFlags(int flags)
public int getDomainConnectRetries()
connectToDomain()
.
public void setDomainConnectRetries(int retries)
connectToDomain()
. The default is Integer.MAX_VALUE.
public void connectToDomain(java.lang.String domainServerHost, java.lang.Integer domainServerPort) throws java.io.IOException, java.net.UnknownHostException, AORuntimeException
java.io.IOException
java.net.UnknownHostException
AORuntimeException
public MessageAgent.DomainClient getDomainClient()
public void waitForRemoteAgents()
connectToDomain(java.lang.String, java.lang.Integer)
.
public void setDefaultSubscriptionFlags(int flags)
createSubscription
.
If the default flags includes NON_BLOCKING
then the
BLOCKING
flag is removed.
public int getDefaultSubscriptionFlags()
public long createSubscription(IFilter filter, MessageCallback callback)
filter
- Messages matching the filter are delivered to the
callbackcallback
- MessageCallback.handleMessage(atavism.msgsys.Message, int)
is called for
each matching message.
public long createSubscription(IFilter filter, MessageCallback callback, int flags)
setDefaultSubscriptionFlags(int)
).
Matching messages are delivered to the message callback.
filter
- Messages matching the filter are delivered to the
callbackcallback
- MessageCallback.handleMessage(atavism.msgsys.Message, int)
is called for
each matching message.flags
- Subscription flags (example: RESPONDER
)
public long createSubscription(IFilter filter, MessageCallback callback, int flags, MessageTrigger trigger)
setDefaultSubscriptionFlags(int)
).
Matching messages are delivered to the message callback.
The caller should not modify the filter without a matching FilterUpdate.
filter
- Messages matching the filter are delivered to the
callback.callback
- MessageCallback.handleMessage(atavism.msgsys.Message, int)
is called for
each matching message.flags
- Subscription flags (example: RESPONDER
)trigger
- Message trigger to run on the producing agent
each time a message matches the filter.
public boolean removeSubscription(long subId)
public boolean removeSubscriptions(java.util.Collection<java.lang.Long> subIds)
public static boolean responseExpected(int flags)
handleMessage
flag indicates a response is
expected. Useful for messages that may be published with
or without RPC.
public boolean applyFilterUpdate(long subId, FilterUpdate update)
subId
- Subscription to updateupdate
- Filter update instructions
public boolean applyFilterUpdate(long subId, FilterUpdate update, int flags)
subId
- Subscription to updateupdate
- Filter update instructions
public boolean applyFilterUpdate(long subId, FilterUpdate update, int flags, Message excludeSender)
excludeSender
. The local instance of the filter
is not modified.
subId
- Subscription to updateupdate
- Filter update instructionsexcludeSender
- Do not send update to the sender of this message.
protected boolean applyFilterUpdate(long subId, FilterUpdate update, int flags, MessageAgent.RemoteAgent excludeAgent)
public int sendBroadcast(Message message)
MessageTriggers
are run prior to
queueing the message. Returns after the message is queued.
public boolean sendDirect(Message message, AgentHandle destination, SubscriptionHandle runTriggers)
message
- The message.destination
- The agent.runTriggers
- Subscription from which to run triggers prior
to sending the message.
public Message sendRPC(Message message)
message
- the message with the rpc information
RPCTimeoutException
- If we time out waiting for a response
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public Message sendRPC(Message message, long timeout)
message
- the message with the rpc informationtimeout
- the timeout for the call or 0 to wait indefinitely
RPCTimeoutException
- If we time out waiting for a response
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public java.lang.Boolean sendRPCReturnBoolean(Message message)
sendBooleanResponse
or a
BooleanResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public java.lang.Integer sendRPCReturnInt(Message message)
sendIntegerResponse
or a
IntegerResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public java.lang.Long sendRPCReturnLong(Message message)
sendLongResponse
or a
LongResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public OID sendRPCReturnOID(Message message)
sendOIDResponse
or a
OIDResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public java.lang.String sendRPCReturnString(Message message)
sendStringResponse
or a
StringResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public java.lang.Object sendRPCReturnObject(Message message)
sendObjectResponse
or a
GenericResponseMessage
.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.
RPCException
- If the remote RPC handler threw an exception
while handling the message.public void sendRPC(Message message, ResponseCallback callback)
If the RPC handler throws an exception, the callback will
receive an ExceptionResponseMessage
instead of the expected
message.
message
- RPC message.callback
- ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage)
is called
with the response message.
NoRecipientsException
- If there are no message subscribers
MultipleRecipientsException
- If there is more than one message subscriber.public int sendBroadcastRPC(Message message, ResponseCallback callback)
If the RPC handler throws an exception, the callback will
receive an ExceptionResponseMessage
instead of the expected
message.
message
- RPC message.callback
- ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage)
is called
for each response message.
public int sendBroadcastRPCBlocking(Message message, ResponseCallback callback)
If the RPC handler throws an exception, the callback will
receive an ExceptionResponseMessage
instead of the expected
message.
message
- RPC message.callback
- ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage)
is called
for each response message.
public int sendBroadcastRPCBlocking(Message message, ResponseCallback callback, long timeout)
If the RPC handler throws an exception, the callback will
receive an ExceptionResponseMessage
instead of the expected
message.
message
- RPC message.callback
- ResponseCallback.handleResponse(atavism.msgsys.ResponseMessage)
is called
for each response message.timeout
- the amount of time to wait for all of our responses,
or 0 to wait indefinitely
public void sendResponse(ResponseMessage message)
public void sendBooleanResponse(Message message, java.lang.Boolean booleanVal)
BooleanResponseMessage
.
message
- The request message.booleanVal
- Boolean response.public void sendIntegerResponse(Message message, java.lang.Integer intVal)
IntegerResponseMessage
.
message
- The request message.intVal
- Integer response.public void sendLongResponse(Message message, java.lang.Long longVal)
LongResponseMessage
.
message
- The request message.longVal
- Long response.public void sendOIDResponse(Message message, OID oidVal)
LongResponseMessage
.
message
- The request message.longVal
- Long response.public void sendStringResponse(Message message, java.lang.String stringVal)
StringResponseMessage
.
message
- The request message.stringVal
- String response.public void sendObjectResponse(Message message, java.lang.Object object)
GenericResponseMessage
.
message
- The request message.object
- Object response.public java.util.concurrent.ExecutorService getResponseThreadPool()
public void setResponseThreadPool(java.util.concurrent.ExecutorService threadPool)
public void handleResponse(ResponseMessage message)
handleResponse
in interface ResponseCallback
public long getAppMessageCount()
public long getSystemMessageCount()
public void startStatsThread()
public void onTcpAccept(java.nio.channels.SocketChannel agentSocket)
onTcpAccept
in interface TcpAcceptCallback
public void handleMessageData(int length, AOByteBuffer messageData, AgentInfo agentInfo)
handleMessageData
in interface MessageIO.Callback
|
Copyright © 2018 Dragonsan Studios Sp. z o.o. |
||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |