How to implement pessimistic locking with Aerospike and Project Reactor ?

1. Introduction

Pessimistic locking is general pattern commonly used to maintain data consistency.

PessimisticSketch

In this post I will describe how to implement pessimistic locking with No-SQL storage, that has reactive client.

2. Pessimistic lock interface

We will support following operations: tryAcquire and release

public interface PessimisticLock {

    Mono<Boolean> tryAcquire(String key);

    Mono<Boolean> release(String key);
}

3. Implementation

To implement pessimistic locking we will use special bin with no value and short expiration (to prevent hanged locks). The idea is simple:

  • if there is a record in storage for given key, then lock is acquired by some other concurrent service

  • if there is no record, then lock is free and current service can acquire it

@Slf4j
@RequiredArgsConsturctor
public class DefaultPessimisticLock implements PessimisticLock {

    private final IAerospikeReactorClient client;
    private final RetryProperties retryProperties;
    private final LockProperties lockProperties;
    private final AerospikeProperties aerospikeProperties;

    @Override
    public Mono<Boolean> tryAcquire(String key) {

        Key lockKey = toLockKey(key);
        Bin lockBin = toLockBin(key);

        return Mono.defer(() -> client.put(acquirePolicy(), lockKey, lockBin).map(Objects::nonNull))
                .retryWhen(Retries.aerospikeError(retryProperties))
                .onErrorMap(error -> {
                    log.warn("Failed to acquire lock " + key, error);
                    return new PessimisticLockAcquireException("Failed to acquire lock " + key, error);
                });
    }

    @Override
    public Mono<Boolean> release(String key) {

        Key lockKey = toLockKey(key);

        return client.delete(releasePolicy(), lockKey)
                .map(Objects::nonNull)
                .onErrorResume(error -> {
                    log.warn("Failed to release lock " + key, error);
                    return Mono.just(Boolean.FALSE);
                })
                .defaultIfEmpty(Boolean.TRUE);
    }

    private Key toLockKey(String key) {
        return new Key(aerospikeProperties.getNamespace(), aerospikeProperties.getSetName(), key);
    }

    private Bin toLockBin(String key) {
        return new Bin(lockProperties.getBinName(), key);
    }

    private WritePolicy acquirePolicy() {
        WritePolicy putPolicy = new WritePolicy();
        putPolicy.recordExistsAction = RecordExistsAction.CREATE_ONLY;
        putPolicy.expiration = lockProperties.getExpirationInSeconds();
        return putPolicy;
    }

    private WritePolicy releasePolicy() {
        WritePolicy deletePolicy = new WritePolicy();
        deletePolicy.generationPolicy = GenerationPolicy.NONE;
        return deletePolicy;
    }
}

4. Testing

@RunWith(MockitoJUnitRunner.class)
public class DefaultPessimisticLockTest {

    private static final String TEST_KEY = "123";
    private static final AerospikeException AEROSPIKE_TIMEOUT_EXCEPTION = new AerospikeException(ResultCode.TIMEOUT, "Aerospike timeout");
    private static final AerospikeException AERSOPIKE_KEY_EXISTS_EXCEPTION = new AerospikeException(ResultCode.KEY_EXISTS_ERROR, "Key exists");

    @Mock
    private IAerospikeReactorClient client;
    @Spy
    private AerospikeProperties aerospikeProperties = new AerospikeProperties();
    @Spy
    private RetryProperties retryProperties = new RetryProperties();
    @Spy
    private LockProperties lockProperties = new LockProperties();
    @InjectMocks
    private DefaultPessimisticLock pessimisticLock;

    @Test
    public void lockAcquireExceptionIsThrownIfTimeoutReachedAfterRetry() {
        Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
                .thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION));

        StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
                .expectSubscription()
                .thenAwait(Duration.ofMillis(1001))
                .expectError(PessimisticLockAcquireException.class)
                .verify();
    }

    @Test
    public void lockIsAcquiredAfterRetryWithExponentialBackOff() {
        Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);

        Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
                .thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION))
                .thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION))
                .thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
                .thenReturn(Mono.just(testKey));

        StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
                .expectSubscription()
                .thenAwait(Duration.ofMillis(50))
                .thenAwait(Duration.ofMillis(100))
                .thenAwait(Duration.ofMillis(200))
                .expectNext(Boolean.TRUE)
                .expectComplete()
                .verify();
    }

    @Test
    public void lockAcquireExceptionIsThrownIfKeyExistsError() {
        Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
                .thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION));

        StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
                .expectSubscription()
                .thenAwait(Duration.ofMillis(1001))
                .expectError(PessimisticLockAcquireException.class)
                .verify();
    }

    @Test
    public void lockIsAcquiredIfKeyBecomesAvailable() {
        Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);

        Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
                .thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
                .thenReturn(Mono.error(AERSOPIKE_KEY_EXISTS_EXCEPTION))
                .thenReturn(Mono.just(testKey));

        StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
                .expectSubscription()
                .thenAwait(Duration.ofMillis(50))
                .thenAwait(Duration.ofMillis(100))
                .expectNext(Boolean.TRUE)
                .expectComplete()
                .verify();
    }

    @Test
    public void lockIsAcquiredSuccessfully() {
        Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);

        Mockito.when(client.put(Mockito.any(WritePolicy.class), Mockito.any(Key.class), Mockito.any(Bin.class)))
                .thenReturn(Mono.just(testKey));

        StepVerifier.withVirtualTime(() -> pessimisticLock.tryAcquire(TEST_KEY))
                .expectSubscription()
                .expectNext(Boolean.TRUE)
                .expectComplete()
                .verify();
    }

    @Test
    public void lockIsTreatedAsReleasedIfDoesNotExist() {
        Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
                .thenReturn(Mono.empty());

        StepVerifier.create(pessimisticLock.release(TEST_KEY))
                .expectNext(Boolean.TRUE)
                .expectComplete()
                .verify();
    }

    @Test
    public void lockIsNotReleasedIfExceptionDuringRelease() {
        Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
                .thenReturn(Mono.error(AEROSPIKE_TIMEOUT_EXCEPTION));

        StepVerifier.create(pessimisticLock.release(TEST_KEY))
                .expectNext(Boolean.FALSE)
                .expectComplete()
                .verify();
    }

    @Test
    public void lockIsReleasedSuccessfully() {
        Key testKey = new Key(aerospikeProperties.getNamespace(), transactionProperties.getSetName(), TEST_KEY);

        Mockito.when(client.delete(Mockito.any(WritePolicy.class), Mockito.any(Key.class)))
                .thenReturn(Mono.just(testKey));

        StepVerifier.create(pessimisticLock.release(TEST_KEY))
                .expectNext(Boolean.TRUE)
                .expectComplete()
                .verify();
    }
}

5. Using with Reactor

We need to emulate try-finally semantic with Reactor operators. The code below achieves that goal:

@Override
public <T, R> Mono<R> executeWithLock(String key, T data, OperationExecutor<T, R> operationExecutor) {

    return pessimisticLockOperations.tryAcquire(key)
            .flatMap(lockAcquired -> operationExecutor.execute(data)
                    .flatMap(operationResult -> pessimisticLock.release(key)
                            .map(Functions.constant(operationResult)))
                    .onErrorResume(throwable -> pessimisticLock.release(key)
                            .map(Functions.constant(operationResult)))
            );
}

6. Conclusion

Aerospike doesn’t have built-in mechanism for pessimistic locking. So to achieve required semantic one would need to implement locking directly.

Another trick in the puzzle is try-finally semantic with Reactor.

Oleksii Zghurskyi