Back to Home

Overview

This page covers a guide to Vertx3

  • verticle - a self contained components you can deploy to Vert.x.

Starting Vertx (Web)

There are a number of means to start Vertx.

  • vm class and vm arguments
  • command line

A verticle can be run via the vertx command

 vertx <verticle main class>

or built as a fat jar and run from their main method

 public static void main(String args[]) {
        Vertx vertx = Vertx.vertx(new VertxOptions().setClustered(false));
        vertx.deployVerticle(new MyVerticle());
    }

And running a clustered version is

public static void main(String[] args) {
    Vertx.clusteredVertx(new VertxOptions().setClustered(true), cluster -> {
        if (cluster.succeeded()) {
            final Vertx result = cluster.result();
            result.deployVerticle(UsersReadFromMongo.class.getName());
        }
    }
}

Vertx class

public class MongoLoader implements Verticle
 
// this will implement a start method 
public void start() throws Exception {
   vertx.createHttpServer().requestHandler(router.&accept).listen(port);
}

In the ide the vm arguements are then:

io.vertx.core.Starter run MongoLoader 

Vertx can also be started with the Service proxy via http

vertx run https://myserver.net/myverticle.zip::my-service

HttpClients

Vertx supports an async http client. This can be used to put data on the event bus for async push to web and other clients

 HttpClientRequest httpClientResponse = client.getAbs("http://www.cnn.com", response -> {

            if (response.statusCode() == 200) {
                response.bodyHandler(buffer -> {
                    String content = buffer.toString();
                    System.out.println("got content content ");
                });
            } else {
                System.out.println("HTTP FAILURE IN CRAWLER. Status code: " + response.statusCode() + " for URL: ");
            }
        });

        httpClientResponse.exceptionHandler(throwable -> {
            System.out.println(" HTTP CLIENT ERROR!");
            throwable.printStackTrace();
        });

        httpClientResponse.end();

Vertx Jdbc

Setup. Vertx can use a range of connection pools with c3po as the default but also

 h2Client = JDBCClient.createNonShared(vertx, new JsonObject()
                .put("provider_class", "io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider")
                .put("jdbcUrl", "jdbc:h2:tcp://localhost/~/wcs;MODE=DB2")
                .put("username", "root")
                .put("password", "pragman")
                .put("driver_class", "org.h2.Driver")
                .put("max_pool_size", 5));
        if (h2Client != null) {
            System.err.println("Database connected .. ");
        }
        fut.complete();

From the examples

  h2Client = JDBCClient.createNonShared(vertx, new JsonObject()
                .put("provider_class", "io.vertx.ext.jdbc.spi.impl.HikariCPDataSourceProvider")
                .put("jdbcUrl", "jdbc:h2:tcp://localhost/~/wcs;MODE=DB2")
                .put("username", "root")
                .put("password", "xxxxx")
                .put("driver_class", "org.h2.Driver")
                .put("max_pool_size", 5));
        if (h2Client != null) {
            System.err.println("Database connected .. ");
        }
        fut.complete();

Note:.HikariCP doesn't seem to work with h2

and if we have a connection we can execute a query

  final SQLConnection connection = conn.result();
      connection.execute("create table test(id int primary key, name varchar(255))", res -> {
        if (res.failed()) {
          throw new RuntimeException(res.cause());
        }

Blocking

Vert.x has worker verticles that can be used to perform blocking vertx tasks.. Basically you can initialize a worker verticle which will perform the blocking task and then return the results to the verticle which needs

Auth

  • vertx-pac4j v2.0 (https://github.com/pac4j/vertx-pac4j) based on pac4j v1.8 (https://github.com/pac4j/pac4j) for any Vert.x 3 web application. It's now a full security library, easy and powerful, which supports authentication and authorization, but also application logout and advanced features like CSRF protection.
    It supports most authentication mechanisms: OAuth (Facebook, Twitter, Google, Yahoo…), CAS, HTTP (form, basic auth…), OpenID, SAML, Google App Engine, OpenID Connect, JWT, LDAP, RDBMS, MongoDB, and Stormpath and authorization checks (role/permission, CSRF token…
  • Vertx 3.4 provides support for keycloak
compile "io.vertx:vertx-auth-oauth2:${vertx.version}"

In Keycloak a microservice can be setup as a client, allow them to authorize and setup api tokens

  • the client can have Direct Access Grants and Authorization in the Settings section of newly created client.
  • Access Type should be set to confidential and Valid Redirect URIs to the callback address routed inside application

We can call keycloak with

  • default realm (1).
  • realm public key (2) which is available in the Realm Settings section under Keys tab
  • Keycloak Client ID (3) as resource
  • client secret as credentials (4).
JsonObject keycloakJson = new JsonObject()
    .put("realm", "master") // (1)
    .put("realm-public-key", "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1xVBifXfS1uVM8S14JlyLpXck+0+hBQX258IiL5Fm2rZpkQ5lN9N1tadQdXBKk8V/0SxdTyoX7cpYQkcOs0Rj0XXmX7Lnk56euZwel+3MKAZWA20ld8BCfmDtX4/+VP311USUqR/W8Fd2p/gugKWF6VDMkri92qob1DdrcUiRlD8XYC0pwHwSvyW/3JvE5HeTy3U4vxC+19wHcwzLGNlVOlYPk9mzJHXN+LhZr/Tc7HeAsvVxYDXwOOh+/UWweMkvKy+OSNKG3aWLb92Ni3HejFn9kd4TRHfaapwWg1m5Duf3uqz8WDHbS/LeS4g3gQS0SvcCYI0huSoG3NA/z4K7wIDAQAB") // (2)
    .put("auth-server-url", "http://192.168.99.100:38080/auth")
    .put("ssl-required", "external")
    .put("resource", "vertx-account") // (3)
    .put("credentials", new JsonObject().put("secret", "73b55e04-e562-41ea-b39c-263b7b36945d")); // (4)
 
OAuth2Auth oauth2 = KeycloakAuth.create(vertx, OAuth2FlowType.PASSWORD, keycloakJson);

The server side api can

router.post("/login").produces("application/json").handler(rc -> {
    User u = Json.decodeValue(rc.getBodyAsString(), User.class);
    oauth2.getToken(u.toJson(), res -> {
        if (res.failed()) {
            LOGGER.error("Access token error: {}", res.cause().getMessage());
            rc.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
        } else {
            AccessToken token = res.result();
            LOGGER.info("Access Token: {}", KeycloakHelper.rawAccessToken(token.principal()));
            User user = new User(KeycloakHelper.rawAccessToken(token.principal()));
            rc.response().end(user.toString());
        }
    });
});
  • POST /login method return access token inside JSON response.
  • That token should be passed as Authorization header parameter for every call of a protected resource.

Event Bus

@todo add basic dociumenation and expand into registering a service here https://github.com/vert-x3/vertx-service-proxy

Dropwizard Metrics

Configuration

ConfigStoreOptions httpStore = new ConfigStoreOptions()
  .setType("http")
  .setConfig(new JsonObject()
    .put("host", "localhost").put("port", 8080).put("path", "/conf"));

Documentation

Vertx and Rx java/Groovy

RxJava is a NetFlix open source library that they developed as part of optimizing their architecture. The library is related to the “ Reactive programming” pattern:

  • RxJava is a popular library for managing asynchronous and event based programs using observable sequences for the Java VM.

* RxGroovy is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure functions to be used

public static void hello(String... names) {
      Observable.from(names).subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              System.out.println("Hello " + s + "!");
          }
      });
}

Without Rx even in java you can have a call back hell (pyramid of doom) type looking code

private void createSomeData(AsyncResult<SQLConnection> result,
    Handler<AsyncResult<Void>> next, Future<Void> fut) {
    if (result.failed()) {
      fut.fail(result.cause());
    } else {
      SQLConnection connection = result.result();
      connection.execute(
          "CREATE TABLE IF NOT EXISTS Whisky (id INTEGER IDENTITY, name varchar(100), " +
          "origin varchar(100))",
          ar -> {
            if (ar.failed()) {
              fut.fail(ar.cause());
              connection.close();
              return;
            }
            connection.query("SELECT * FROM Whisky", select -> {
              if (select.failed()) {
                fut.fail(ar.cause());
                connection.close();
                return;
              }
              if (select.result().getNumRows() == 0) {
                insert(
                    new Whisky("Bowmore 15 Years Laimrig", "Scotland, Islay"),
                    connection,
                    (v) -> insert(new Whisky("Talisker 57° North", "Scotland, Island"),
                        connection,
                        (r) -> {
                          next.handle(Future.<Void>succeededFuture());
                          connection.close();
                        }));                                                    
              } else {
                next.handle(Future.<Void>succeededFuture());
                connection.close();
              }
            });
          });
    }
  }

A more observable looking code would look like:

  jdbc.getConnectionObservable().subscribe(
        conn -> {
 
          // Now chain some statements using flatmap composition
          Observable<ResultSet> resa = conn.updateObservable("CREATE TABLE test(col VARCHAR(20))").
              flatMap(result -> conn.updateObservable("INSERT INTO test (col) VALUES ('val1')")).
              flatMap(result -> conn.updateObservable("INSERT INTO test (col) VALUES ('val2')")).
              flatMap(result -> conn.queryObservable("SELECT * FROM test"));
 
          // Subscribe to the final result
          resa.subscribe(resultSet -> {
            System.out.println("Results : " + resultSet.getRows());
          }, err -> {
            System.out.println("Database problem");
            err.printStackTrace();
          }, conn::close);
        },
 
        // Could not connect
        err -> {
          err.printStackTrace();
        }
    );

As we see form the above code, once we start an Observable we can subscribe, chain and control the results

Subscription subscribe()
Subscription subscribe(Action1<? super T> onNext)
Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription subscribe(Observer<? super T> observer)
Subscription subscribe(Subscriber<? super T> subscriber)

Other Rx Methods

  • flatMap transforms the Observable<HttpClientResponse> → Observable<Buffer>
  • reduce merge all response buffers in a single buffer
  • map transform the buffer to a string
  • subscribe delivers the response content

See also the github https://github.com/Froussios/Intro-To-RxJava And Rx Tutorials

Vertx Orchestration

Vertx on Pi

Vertx in Docker

Vertx Messaging - MQTT

vertx-mqtt-broker is an open-source implementation of MQTT server. It implements protocol versions 3.1.1 and 3.1, supports QoS 2, and uses OAuth2 for autentication.

It uses vert.x as library for tcp managemnet, non-blocking / actor-model, clustering and auth plugin system.

EC2 Deployment

Vertx on Kubernets https://dzone.com/articles/event-driven-microservices-with-vertx-and-kubernet

Testing

 
vertx.txt · Last modified: 2017/12/05 03:32 by root
 
RSS - 200 © CrosswireDigitialMedia Ltd