Event Bus

Introduction

The jWebSocket EventBus is a messaging bus oriented component inspired on the Vertx EventBus. The EventBus allows local/distributed jWebSocket components to communicate each other using Token based message format through a simple API.

Tokens

The jWebSocket Token objects are like Maps, with a wrapping API that guarantee cross platform serialization/deserialization compatibility. Tokens objects also contains valuable information like: “namespace” and “type”, that refers to “message target address” and “message type” respective meaning, in consequence the “namespace” attribute is used as the target message destination.
The Token “namespace” value has the same meaning than the Vertx EventBus “address” attribute.

Handlers

A handler is like a listener object that receives Token objects from the EventBus. A same handler instance can be registered to multiple namespaces but only once on the same namespace, in the same way that multiple handlers can be registered to the same namespace.

Publish / subscribe messaging

See https://en.wikipedia.org/wiki/Publish-subscribe_pattern for appropiated explanation

Point to point and Request-Response messaging

See https://en.wikipedia.org/wiki/Request-response for appropiated explanation

The JMS jWebSocket based implementation

The jWebSocket default EventBus implementation is JMS based. So the JMS provider acts as final message broker. Low level EventBus messages are set to high priority level, also contains expiration data in case of a timeout is specified.

More about JMS: http://docs.oracle.com/javaee/6/tutorial/doc/bncdq.html

Instantiating the JMS EventBus on jWebSocket standalone environment:
IEventBus lEb = new JMSEventBus(aJMSConnection, aJMSDestination);

Where:

  • aJMSConnection: JMS active connection.
  • aJMSDestination: String value that represents the queue/topic destinations used to send/publish messages. Example value: "my.app.eventbus"
Note:
In production environments the access to the EventBus queue/topic destinations should be secured at broker level.
The EventBus API

The interface org.jwebsocket.api.IEventBus defines the jWebSocket EventBus. While the Java version of the EventBus uses message objects which are instances of the class Token, the JavaScript version uses pure JSON objects for developers convenience. 

Registering a handler to a namespace:

Java

// registering a handler to "test.topic" namespace
// the namespace value can optionally contain * wildcard or regular
// expressions, example: “test.t*”
lEventBus.register("test.topic", new Handler() {
  @Override
  public void OnMessage(Token aMessage) {
    
  }
});

JavaScript

// registering a handler to "test.topic" namespace
// the namespace value can optionally contain * wildcard or regular
// expressions, example: “test.t*”
EventBus.register('test.topic', {
  OnMessage: function(aMessage) {
    
  }
});

Publishing a Token to all target namespace listeners(handlers):

Java

// publishing a message to "test.topic" name-space
lEventBus.publish(TokenFactory.createToken("test.topic", "someaction"));

JavaScript

// publishing a message to "test.topic" name-space
EventBus.publish({ ns: 'test.queue', type: 'someaction'});

Sending a Token to only one target namespace listener(handler):

Java

// sending a message to "test.queue" name-space
lEventBus.send(TokenFactory.createToken("test.queue", "someaction"));

JavaScript

// sending a message to "test.queue" name-space
EventBus.send({ ns: 'test.queue', type: 'someaction'});

Request/Reply:

Request (Java)

// sending a message to "test.queue" name-space
lEventBus.send(TokenFactory.createToken("test.queue", "someaction"), 
new Handler() {

  @Override
  public void OnSuccess(Token aResponse) {
  }

  @Override
  public void OnFailure(Token aResponse) {
  }

  @Override
  public void OnTimeout(Token aResponse) {
  }
});

Request(JavaScript)

// sending a message to "test.queue" name-space
EventBus.send({ ns: 'test.queue', type: 'someaction'}, {
  
  OnSuccess: function(aResponse){
  },

  OnFailure: function(aResponse){
  },

  OnTimeout: function(aRequest){
  }
});

Replying(Java)

lEventBus.register("test.q*", new Handler() {
  
  @Override
  public void OnMessage(Token aToken) {
    Token lResponse = createResponse(aToken);

    // in case or error
    //Token lResponse = createErrorResponse(aToken);

    // ...
    reply(lResponse);
  }
});

Replying(JavaScript)

EventBus.register('test.q*', {
  OnMessage: function(aMessage) {
    aMessage.reply({
      data: 'This is a response message'
    });

    // in case of error
    // aMessage.fail({
    //  msg: 'This is an error message'
    // });
});

Timeout support(Java)

// sending a message to "test.queue" name-space and wait response for 5 seconds 
lEventBus.send(TokenFactory.createToken("test.queue", "someaction"), 
new Handler(new Long(5000)) {

  ...

  @Override
  public void OnTimeout(Token aResponse) {
  }
});

Timeout support(JavaScript)

// sending a message to "test.queue" name-space and wait response for 5 seconds 
EventBus.send({ ns: 'test.queue', type: 'someaction'}, {
  
  timeout: 5000,
  OnTimeout: function(aRequest){
  }
});
Using the EventBus inside the jWebSocket server
jWebSocket plug-ins developers can access a ready singleton EventBus instance by calling:
 
getServer().getEventBus();

Script apps developers can do the same by using the global EventBus object:

EventBus.publish({ ns: 'test.queue', type: 'someaction'});
Note:
Script apps EventBus object provides an extra configuration param “cancelHandlersOnShutdown” that in case of app shutdown or reload, the EventBus cancel all handlers registration.
You can customize (NOT RECOMMENDED):
EventBus.cancelHandlersOnShutdown = false;
The JavaScript Enterprise EventBus 
In the jWebSocket enterprise edition, JavaScript applications can use an EventBus extension that comes with extra features, specially promises support using Kew and micro-services definition.

Importing the enterprise EventBus library (using default path):

App.importScript("${JWEBSOCKET_EE_HOME}/scripting-modules/eventbus");
Using promises:

To use "promise" based message sending and receiving developers should use the following API.

EventBus.send(aMessage, aTimeout) //returns promise instance
Usage example:
EventBus.send({
 ns: "file.delete",
 path: "/some/file/path"
}, 0).then(function (aMsg) { // a timeout value <= 0 means "no timeout"
  App.getLogger().info("File successfully deleted!");
}).catch(function (aError) {
  App.getLogger().info("Error deleting file: " + aError);
});
Defining EventBus based micro-services:
 
The EventBus is the core component that makes super easy the creation of event handler clusters, this is one of many benefits of using JMS message brokers.
A custom event handler can run on multiple distributed hosts. When you setup a jWebSocket cluster, where multiple mirror nodes are instantiated, the event handlers are automatically clustered and invocation is balanced.

A micro-service in the EventBus architecture is a logically related, well documented group of features which can be invoked from remote web clients or through internal EventBus requests.
Multiple services can be defined within the same JavaScript application, the default service identifier is "main". Event messages comming from the client contains their username value in a property called "username" (ie. aMsg.username).

Usage example (test app):

App.importScript("${JWEBSOCKET_EE_HOME}/scripting-modules/eventbus");

App.on("appLoaded", function () {
  
  EventBus.defaultService({
    description: "A service description ...",
    keywords: ["demo"],    
    api: {
      sayHello: {
        description: "A service action/feature description ...",
        keywords: ["echo", "hello", "helloer"],
        authenticated: true,
        validator: {
          type: "json-schema",
          requestSchema: {
            type: "object",
            properties: {
              name: {
                "type": "string"
              },
              age: {
                type: "integer"
              }
            },
            required: [
              "name",
              "age"
            ]
          },
          responseSchema: {
            type: "string"
          }
        },
        handler: function (aMsg) {
          aMsg.reply({
            data: "Hello: " + aMsg.data.name + " - " + aMsg.data.age
          });
        }
      }
    }
  });

});
IMPORTANT: EventBus services require to be always defined inside the application event "appLoaded".

Calling "EventBus.defaultService" creates the "main" service. To create custom services in the same application use:

EventBus.newService(aId, aServiceDef);
Service actions/features are defined as properties of the "api" object. Action definition attributes:
  • description: string. Action description. 
  • keywords: array. Descriptive action keywords.
  • authenticated: boolean. If true, the calling remote Web clients require to be authenticated before.
  • authority: string. Custom authority(role) required by remote Web clients before allow action invocation. If value is set, then authenticated is not considered.
  • validator: object. Defines the event["data"] attribute value validator. JSON-Schema is built-in supported. 
  • serverOnly: boolean. If true, the action is not available for remote Web clients. 
  • handler: Action handler function to process the request event. 

Invoking service actions from JavaScript Web client:

test = client.getScriptApp("test", function(App){
  // app generated successfully on the client!
})
// calling action 'sayHello'
test.main.sayHello({name: "jWebSocket Framework", age: 5}, function(aResponse){
  alert(aResponse.result.data);
});

Invoking service actions through the EventBus (server-side only):

EventBus.send({
  ns: "test.main.sayHello",
  data: {
    name: "jWebSocket Framework",
    age: 5
  }
}, 0).then(function(aReply){
  // aReply.data
});
Note:
Authentication or authority checks does not apply on EventBus based requests.

Copyright © 2013 Innotrade GmbH. All rights reserved.