Database notifications

Using a database queue for distributing CPU intensive workload might be a good idea. But there is one problem. How consumers find that there is some work to be done? Of course, we can repeatedly check the database queue, but it consumes database resources. There is one simple solution. Database notifications.

After putting work to the queue, we can notify our workers via database (PostgreSQL syntax):

NOTIFY my_notification;

You can find more information in PostgreSQL documentation. Let’s focus on Java now.

Send event

pgNotifyService.notify("myEvent", "myEventParameter");

Listener

The listeners may be running on another machine in the network. Try to start several backends simultaneously. Notifications will be shared between backends 🙂

private final PgCallback eventListener = (eventParameter) -> {
 System.out.println("Event received, parameter: " + eventParameter);
};

Complete Java Example

package com.goodbackend;

import com.goodbackend.notify.PgCallback;
import com.goodbackend.notify.PgNotifyService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@SpringBootApplication
public class ExampleNotify {

@Autowired
private PgNotifyService pgNotifyService;

private final String EVENT_NAME = "myEvent";

private final PgCallback eventListener = (eventParameter) -> {
//this code can be run in different java backend, different machine over network
System.out.println("Event received with parameter: " + eventParameter);
};

@EventListener(ApplicationReadyEvent.class)
public void start() {
//send event notification. All java backend connected to the same database will receive this notification
pgNotifyService.notify("myEvent", "myEventParameter");
System.out.println("Event sent..");
}

@PostConstruct
private void init() {
//register listener
pgNotifyService.addServiceListener(EVENT_NAME,eventListener);
}

@PreDestroy
private void destroy() {
//unregister listener
pgNotifyService.removeServiceListener(eventListener);
}

public static void main(final String[] args) {
SpringApplication.run(ExampleNotify.class, args);
}

}

Here you can download complete PostgreSQL java notification implementation.

User Context

Sometimes we have a set of variables attached to the request. For example userId or requestId. We need these variables very often, at the API level, at the service level, at the repository level or at the logging service. It is not practical to pass these variables as method parameters.

It is a good idea to create UserContext bean. What variables can be stored there?

  • userId – The unique identification of the user after authentication. If the userId is present, we trust this value.
  • requestId – Each request gets a unique identification. Then we can easily track all calls, from API level to final error with the stacktrace.
  • authorizationMethod – We can distinguish if the request comes from API or it is our internal call.
  • testMode – We can distinguish if it is a normal call, or call from automated tests. We can behave differently – for example, we do not send emails during performance tests.
  • and others you need..

Someone might consider using SpringBoot request Bean with SCOPE_REQUEST. Do not use it. Firstly, it’s magic how it works. There is a hidden proxy bean and you can run into troubles. Secondly, you will not be able to use this bean outside of web requests – for example in the asynchronous processing or network calls.

In asynchronous processing, we need to have access to the original UserContext. This is very important – all backend code can run in the same way – as a direct call from API, or indirect from the queue.

The UserContext is created when a new request comes to our backend. Then we can attach UserContext to the current thread and use it in our backend. If there is some asynchronous processing, we can serialize the user context. When needed, we can deserialize UserContext back and attach it to the thread that is different than the original one.

Example, how UserContext can be used in asynchronous way

Multi-master database problem

Most database installations are based on one master (primary) and zero to N slaves (secondaries). Master is responsible for reading/writing and slaves are ready to become primary if necessary – typically when the original master is down. Slaves can be also used for read-only queries if the synchronization delay is not an issue.

But if we want to scale?

A lot of developers and software architects assume it’s easy to switch to a multi-master database. They assume that if we pay enough money for the database, everything will resolve itself. But that’s not how it works.

There is a big fundamental problem (described as CAP theorem).
We have to choose between availability and consistency.

For example, you have a multi-master configuration with node A and B. If we want to make a hotel room reservation on the node A, we need to be sure that the room is available on the node B as well. But if the node B does not respond we have two options:

  • Wait until B responds and we are going to lose availability. If B is stuck, the A is also.
  • Confirm reservation and we are going to lose consistency.

If the B does not respond, we do not know if B is down or if there is some network problem. We must assume the worse variant, that is, the connection is only broken and the other node is running and accepting requests.

We can sacrifice availability, but that means we have to abandon JOINS, database transactions, database locks and etc. It can be painful especially if we come from the RDBMS world.

We can sacrifice consistency but it can be used only for special rare cases. We have to implement processes to correct the data.

It is why we should avoid master-master configuration if we want to keep consistency and availability at the same time.

But we really need to scale

The solution is not at the database level, but at the application level. We have to split our data into independent shards.

For example, we have a cloud-based accounting system. We can split our customers geographically. Customers come from the USA and the EU. We can set up two independent environments. Because our customers do not have to see the data from other customers, the shards are completely independent and we are safe. We can have two master databases at the same time. We can take advantage of the RDBMS database.

HIME principle

We must be very careful if we want to call third-party services. Especially if the network call goes through different firewalls, VPNs, security components. Although the third party promises SLA, it must not appease us.

I recommend following the HIME principle. We should be careful about:

  • Health
  • Integrity
  • Monitoring
  • Evidence

Health

Our service must remain healthy. The connection must not be stuck too long, neither the threads nor the memory must be exhausted. The service must be at least partially functional even if the third party behaves incorrectly.

A typical error is not setting the timeout, which leads to a gradual overload of our system.

Integrity

Data should remain consistent. If we call a third-party service that changes the data and the timeout occurs, we do not know whether the data was updated correctly or not.
A typical error is not sending a UUID in the request or missing additional check whether data has been changed.

Monitoring

If the third-party service is behaving incorrectly, we should be notified as soon as possible.

Evidence

We should log everything – when the service was called, the input data, the result, the processing time. If we claim a problem with a third-party service, we need to provide a detailed event log.

Testing

Monitoring and Evidence is easy to setup. We can use some logging system and setup ELK stack (Elastic search, Logstash and Kibana).

Health and Integrity is more complicated. In-depth testing is very important. We can use Spoiler Proxy to simulate some common network and software errors.

These rules do not apply only to calls to third-party services. The rules can also be generalized to micro-service architecture.

Database transactions

The database transaction is very important part of your application because it defines consistency. It is important to have control over it and do not rely on some frameworks like Spring.

But you can use Spring and have better transaction control.

There is an example, how to control database transaction in Java. DbTransaction class calls ROLLBACK automatically if COMMIT was not called.

@Service
public class ExampleDbTransaction {
@Autowired
DbTransaction dbTransaction;

public void exampleDatabaseTransaction() {
 //outside of transaction if the transaction is not nested
 try(DbTransaction.Status status = dbTransaction.openNewTransaction()){
        //inside transaction
        status.commitTransaction();
    }
 }
}

Sometimes we want to nest transaction. A method defines a transaction but the inner method also defines a transaction. Mostly what we need is the transaction nesting – only one transaction is active (the outer transaction).

The implementation of DbTransaction service:

@Service
public class DbTransaction {

@Autowired
private PlatformTransactionManager transactionManager;

//each thread has counter of nested transactions
private static ThreadLocal<AtomicInteger> transactionCheck = ThreadLocal.withInitial(()->new AtomicInteger());

public class Status implements AutoCloseable {

private TransactionStatus transactionStatus;

private Status(TransactionStatus transactionStatus) {
this.transactionStatus = transactionStatus;
}

/**
* Transaction commit
*/
public void commitTransaction() {
transactionManager.commit(this.transactionStatus);
}

/**
* Always call this method in final block
*/
@Override
public void close() {
transactionCheck.get().decrementAndGet();
if(!this.transactionStatus.isCompleted()) {
//commit was not called, so rollback it now
transactionManager.rollback(this.transactionStatus);
}
}
}

/**
* Starts a new database transaction. Always call close on returned status.
* @return
*/
public Status openNewTransaction() {
Status status = new Status(transactionManager.getTransaction(new DefaultTransactionDefinition()));
transactionCheck.get().incrementAndGet();
return status;
}

/**
* Returns true if current thread is in database transaction
* @return
*/
public static boolean isInTransaction() {
return transactionCheck.get().intValue() != 0;
}
}

The advantage of this approach is that we have an overview of how the transaction works.

However there is a danger. When a developer forgets to use try block, the database transaction is not closed (commit nor rollback is called). This leads to errors that are difficult to detect. I recommend to check transaction status in @Aspect class. When developer forgets to close the transaction, the error is written to the console.

//test if we have all database transaction closed
if (DbTransaction.isInTransaction()) {
//we should not be in database transaction now
log.error("STILL IN DATABASE TRANSACTION!! method:" + method.getName());
}

Bad transaction design in Spring

Why does Spring have a bad design of database transactions? Spring suggests using @Transactional annotation to annotate methods which should be run inside the database transaction. This is a bad design. Why?

The database transaction is very important. It is about data consistency. Everything must be absolutely clear when we are working with the database transaction. However, there is a magic in Spring’s @Transactional annotation. And magic is dangerous in software development.

For example? Do you know if you can use @Transactional annotation on private methods? The answer is not clear. It depends on the correct POM (Gradle) configuration, also it depends on the correct Transaction Manager configuration and lots of other things. And if it is not properly configured, the database transactions silently does not work.

This is very bad. After several months of running time on production, you can realize, you have lost consistency because the transactions did not work properly. And you were not aware of it all the time.

The other problem – database transaction should be as small as possible because we lock resources. And it should not be related to the method call. Probably you want to lock small pieces of code, not the whole method. The same rule is applied when we are using the synchronize keyword in Java.

Find the better way how to handle the database transaction.

UUID as a database key

Is it a good idea to use universally unique identifier (UUID) as a database key? Or is it better to use auto increment Long value?
We can easily create random UUID by using this method in Java:
UUID uuid = UUID.randomUUID();
UUID example: 41b62562-2362-11ea-978f-2e728ce88125.

UUID type is longer than Long type. UUID type has 16 bytes, Long type has 8 bytes. Using classic auto increment Long consumes less resources than UUID. However, UUID has many advantages:

  • Easy to scale. We do not need a central unit to generate UUIDs. We can combine or split components even from different environments without worry about duplication.
  • UUID can be generated in Java and we do not have to wait for the result from the database.
  • It is more secure, attackers cannot simply increment keys to get secret data. Of course we have to check right always, but this can save us in case of a developer mistake.

Tips:

  • Always store UUID in the database as UUID type, not as Varchar type.
  • It is a good idea to store also auto increment Long as well. It is useful when we want to implement pagination. It is important to have a clear sequential order defined. Otherwise, we are at risk of duplicate paging values.

Do not use UUID type in Java directly. This is a better approach:

public class TaskId {
   private final UUID id;

   private TaskId(UUID id) {
      this.id = id;
   }

   public UUID getId() {
      return id;
   }

   public static TaskId fromUUID(UUID uuid) {
      if(uuid == null) {
         return null;
      }
      return new TaskId(uuid);
   }

   public static TaskId fromString(String val) {
      if(val == null) {
         return null;
      }
      return new TaskId(UUID.fromString(val));
   }

   @Override
   public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;
      TaskId that = (TaskId) o;
      return Objects.equals(id, that.id);
   }

   @Override
   public int hashCode() {
      return Objects.hash(id);
   }

   @Override
   public String toString() {
      return id.toString();
   }
}


It looks like a lot of boilerplate code. But you really appreciate it in the case of a large project, where it is clear what type of value it is. Calling this method:

public void storeTask(DocumentId documentId, TaskId taskId) {...}

is more clear and type safe than

public void storeTask(UUID documentId, UUID taskId) {...}

Database queue

The database queue is ideal for scaling long-term activities such as exports, OCR or image processing.

But we should be very careful with database locking if there are more consumers (workers) simultaneously.

How to put item to the queue

It is just simple SQL insert into the database. Just a new record with the right state and request details. It is useful to include some additional information – user id, unique request id, date and time.

INSERT INTO my_queue ...

How to pull item from the queue

This is more tricky because we have to solve concurrency issues. Only one consumer is allowed to grab the specific item at the same time. The correct SQL is:

SELECT * FROM my_queue WHERE state='waiting' FOR UPDATE SKIP LOCKED LIMIT 1

FOR UPDATE – this is important because it locks row during the database transaction. No other consumer will be allowed to process it.

SKIP LOCKED – without this setting, consumer will be waiting until the lock is released, so we would lose parallel processing.

LIMIT 1 – we want to lock and process just only one row, not complete table.

Note: The SQL syntax comes from PostgreSQL database. Other databases may have a different syntax.

Database transaction

We have to use database transaction when pulling the item from the queue. We have two options:

  • One long transaction.
  • Two short transactions.

One long transaction

BEGIN

//find some work
SELECT * FROM my_queue WHERE state='waiting' FOR UPDATE SKIP LOCKED LIMIT 1

//do requested work - we are still in the database transaction
//no other worker has access to the selected item because of lock

//put info we are done
UPDATE my_queue SET state='done' WHERE queue_item_id=:id

COMMIT

Note: If the worker cannot finish its work, the worker sets error state. We can manually switch to the waiting state if necessary.

Two short transactions

BEGIN

//find some work
SELECT * FROM my_queue WHERE state='waiting' FOR UPDATE SKIP LOCKED LIMIT 1

//inform others - this item is ours
UPDATE my_queue SET state='processing' WHERE queue_item_id=:id

COMMIT

//do requested work - we are outside of the database transaction
//no other worker has access to the selected item because of state

BEGIN

//inform others - we are done
UPDATE my_queue SET state='done' WHERE queue_item_id=:id

COMMIT

This solution is better because we do not have a long running database transaction. However if the worker (consumer) fails during the processing – “Out of memory” for example, there will be stuck items in the processing state forever. We will need a special cleaning thread to solve this.

Queue database fields

The recommended database fields in the queue table:

  • State – The state of the process. Waiting, Done, Error, Processing.
  • Parameters – Parameters for the worker. Can be in JSON format.
  • RequestId – Unique ID of the request. It can be defined on API level.
  • UserId – ID of user who created the request.
  • CreatedDT – Date and time when request was created.
  • FinishedDT – Date and time when worker finished its job.
  • ErrorMessage – Error message if worker failed.
  • WorkerName – The name of the worker. Can be based on hostname for example.
  • Result – The result of the worker. It can be UUID of the exported file for example.
  • UserContextWhat is User Context?

But there is one problem. How consumers find that there is some work to be done? The simple answer is: database notifications.

How to scale with the queue

The queue is the ideal way how to distribute the load to more nodes. This is simple producer vs consumer scenario. The producer generates the work and puts it in the queue. Consumer pulls the work from the queue and processing it. The advantage is that there is no direct link between the producer and the consumer. Connecting a new compute node means just configure a connection to the queue. Which is simple to setup and manage.

How to implement the queue

The persistent queue should be a central element outside the backend. There are more ways how to implement the queue. One suitable way is to implement the queue in the database.

Database queue

Database queue has several advantages:

  • Consistency – We can put data in a queue in the same database transaction. There is no need to worry that the message will be lost or duplicated. This can happen with other solutions, such as RabbitMQ.
  • Simple implementation – It is just a couple of SQLs. However you need to be careful about database locking.
  • No additional dependency – You do not need additional technology installed in your infrastructure. No additional HA configurations. No additional backups. And less things can go wrong.

There are also disadvantages:

  • Database queue is not suitable for high load.
  • It is another load on the database. We should be nice on the database because the database is scaling badly.

The database queue is also suitable for email sending. Why? Imagine that we want to send an email in the middle of a database transaction. If an error occurs, the database will rollback and we will return to the previous state. However, the phantom email may have been already sent.

Therefore, it is better to push the email request to the database queue instead. After successful commit to the database, the request is processed by another thread (consumer).

Conclusion

If you want a consistent and simple solution, choose the database. If you can handle some inconsistency and need to process a high amount of messages, choose RabbitMQ or similar solutions.

How to write a monolithic application

How to write a monolithic application? Using monolithic architecture has many advantages over microservice architecture. The code is much simpler, the data is consistent, and the application can be tested well. However, when writing a monolithic application, it is necessary to follow some principles. Otherwise, we run the risk of creating a spaghetti code that does not scale well. So what to watch out for?

Level design

The application needs to be divided into several levels. I use API level, Service level, Repository level and Connector level.

  • Repository level – It is only used to access the database. It does not contain any business logic, except in exceptional cases when it is necessary to put this logic into an SQL query/update. The repository sees no other layers. This layer only takes care of the conversion between SQL and JAVA classes.
  • Connector level – Similar to repository level. It is only used to communicate with another system. It should hide third party implementation details and complexity.
  • Service level – It is the core of the whole system, it contains business logic. It sees and uses Repository and Connector levels. It is also in charge of database locking and database transactions.
  • API level – It makes the application available to the rest of the world. The layer should not contain more complex logic. Calls mostly Service level.
Application level architecture

Adhering to this principle is important, otherwise there is a risk of creating spaghetti code.

Multi instances

When writing a backend, one has to think about whether it will run well in multiple instances. This is very important for future scalability.

How to write scalable backed.

Having multiple instances is also important in order to deploy new versions of the application seamlessly. When a new version is deployed, the first instance is turned off first, and after the successful deployment, the second instance is turned off. This procedure can be automated, for example, using a script that controls HAProxy.

We can combine scaling with HAProxy and queue. All nodes contain the same backend code. There are two types of nodes:

  • API (standard) node – accepts a remote connection and processing easy synchronous tasks.
  • Worker node – accepts work from the queue and processing CPU intensive tasks – synchronous or asynchronous (OCR, Excel exports,..)

We can switch from API node to the Worker node by changing configuration settings – typically application.properties file. This simplifies DevOps a lot.

Combination of API nodes and the Worker nodes

Note that backends do not communicate with each other. This is very convenient and simplifies application development and maintenance.

The disadvantage of this architecture is that there is a central database that scales badly. More information how to scale database.