Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import org.openmrs.module.drools.session.DroolsExecutionResult;
import org.openmrs.module.drools.session.DroolsSessionException;
import org.openmrs.module.drools.session.DroolsSessionConfig;
import org.openmrs.module.drools.session.SessionLease;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public interface DroolsEngineService extends OpenmrsService {
Expand Down Expand Up @@ -95,4 +98,23 @@ public interface DroolsEngineService extends OpenmrsService {

public DroolsSessionConfig getSessionConfig(String sessionId);

/**
* Register an auto-startable session in the registry.
*
* @param sessionId the session identifier
* @param session the KieSession to register
* @return true if registration was successful, false if session already exists
*/
public boolean registerAutoStartSession(String sessionId, KieSession session);

/**
* Check out an auto-startable session from the registry with exclusive lock.
*
* @param sessionId the session identifier
* @param timeout the timeout duration
* @param unit the timeout unit
* @return Optional containing SessionLease if session exists, empty otherwise
*/
public Optional<SessionLease> checkOutAutoStartSession(String sessionId, long timeout, TimeUnit unit);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -32,6 +33,9 @@ public class DroolsEngineServiceImpl extends BaseOpenmrsService implements Drool
@Autowired
private DroolsConfig droolsConfig;

@Autowired
private SessionRegistry sessionRegistry;

private Map<String, DroolsSessionConfig> ruleConfigs;

private DroolsEventsManager eventsManager = new DroolsEventsManager();
Expand All @@ -51,6 +55,9 @@ public KieSession requestSession(String sessionId) {
session = CommonUtils.createKieSession(kieContainer, ruleConfigs.get(sessionId), droolsConfig.getExternalEvaluatorManager(), globalBindings);
eventsManager.subscribeSessionEventListenersIfNecessary(sessionId, session, ruleConfigs);

// Note: Registration moved to DroolsEngineRunner.run() for auto-startable sessions
// This method now only creates sessions without registering them
Comment on lines +58 to +59

@samuelmale samuelmale Oct 13, 2025

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should invoke the registry if necessary, ie, if the requested session is auto-startable, we pick the session from the registry.


return session;
} else {
throw new DroolsSessionException("Can't find session configuration for: " + sessionId);
Expand All @@ -77,7 +84,9 @@ public DroolsExecutionResult evaluate(String sessionId, Collection<Object> facts
facts.forEach(currentSession::insert);
int fired = currentSession.fireAllRules(getSessionAgendaFilter(currentSession, ruleConfigs.get(sessionId)));
List<?> results = getSessionObjects(currentSession, resolveClass(resultClassName, currentSession.getKieBase()));
result = new DroolsExecutionResult(sessionId, fired, (List<Object>) results);
@SuppressWarnings("unchecked")
List<Object> objectResults = (List<Object>) results;
result = new DroolsExecutionResult(sessionId, fired, objectResults);
currentSession.dispose();

} else {
Expand Down Expand Up @@ -249,4 +258,38 @@ public void setDroolsConfig(DroolsConfig droolsConfig) {
this.droolsConfig = droolsConfig;
}

/**
* Register an auto-startable session in the registry.
*
* @param sessionId the session identifier
* @param session the KieSession to register
* @return true if registration was successful, false if session already exists
*/
@Override
public boolean registerAutoStartSession(String sessionId, KieSession session) {
return sessionRegistry.registerSession(sessionId, session);
}

/**
* Check out an auto-startable session from the registry with exclusive lock.
*
* @param sessionId the session identifier
* @param timeout the timeout duration
* @param unit the timeout unit
* @return Optional containing SessionLease if session exists, empty otherwise
*/
@Override
public Optional<SessionLease> checkOutAutoStartSession(String sessionId, long timeout, TimeUnit unit) {
try {
SessionLease lease = sessionRegistry.checkOutSession(sessionId, timeout, unit);
return Optional.of(lease);
} catch (IllegalArgumentException e) {
// Session doesn't exist
return Optional.empty();
} catch (InterruptedException | java.util.concurrent.TimeoutException e) {
// Could not acquire lock within timeout
return Optional.empty();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package org.openmrs.module.drools.session;

import org.kie.api.runtime.KieSession;

/**
* Represents a checked-out session lease that ensures exclusive access to a KieSession.
* Implements AutoCloseable to support try-with-resources pattern for automatic lock release.
*
* <p>This class prevents Thread1/Thread2 data race conditions by ensuring only one thread
* can access a session at a time. When acquired, the session is locked until the lease
* is closed (either explicitly or via try-with-resources).</p>
*
* <p>Example usage:</p>
* <pre>
* try (SessionLease lease = registry.checkOutSession("mySession", 5, TimeUnit.SECONDS)) {
* KieSession session = lease.getSession();
* // Perform operations on session
* } // Lock automatically released
* </pre>
*/
public class SessionLease implements AutoCloseable {

private final KieSession session;
private final String sessionId;
private final Runnable releaseCallback;
private volatile boolean released = false;

/**
* Creates a new session lease.
*
* @param sessionId the session identifier
* @param session the KieSession instance
* @param releaseCallback callback to invoke when lease is closed (releases the lock)
*/
public SessionLease(String sessionId, KieSession session, Runnable releaseCallback) {
this.sessionId = sessionId;
this.session = session;
this.releaseCallback = releaseCallback;
}

/**
* Gets the KieSession associated with this lease.
*
* @return the KieSession instance
* @throws IllegalStateException if the lease has already been released
*/
public KieSession getSession() {
if (released) {
throw new IllegalStateException("Session lease for '" + sessionId + "' has already been released");
}
return session;
}

/**
* Gets the session identifier.
*
* @return the session ID
*/
public String getSessionId() {
return sessionId;
}

/**
* Releases the session lock. This method is idempotent - calling it multiple times
* has no effect after the first call.
*/
@Override
public void close() {
if (!released) {
released = true;
if (releaseCallback != null) {
releaseCallback.run();
}
}
}

/**
* Checks if this lease has been released.
*
* @return true if the lease has been released, false otherwise
*/
public boolean isReleased() {
return released;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.openmrs.module.drools.session;

import org.kie.api.runtime.KieSession;

import java.util.concurrent.TimeUnit;

/**
* Registry for managing stateful KieSession instances with thread-safe access.
*
* <p>This registry uses a check-out/return pattern to prevent concurrent access issues.
* Sessions should be checked out using {@link #checkOutSession(String, long, TimeUnit)},
* used within a try-with-resources block, and automatically returned when the lease closes.</p>
*/
public interface SessionRegistry {

/**
* Registers a KieSession in the registry.
*
* @param sessionId the unique identifier for the session
* @param session the KieSession instance to register
* @return true if registration was successful, false if session already exists
*/
boolean registerSession(String sessionId, KieSession session);

/**
* Checks out a session for exclusive use, blocking if necessary until the lock is available.
* Returns a SessionLease that MUST be closed (preferably via try-with-resources) to release the lock.
*
* <p>This method prevents Thread1/Thread2 data race conditions by ensuring only one thread
* can access the session at a time.</p>
*
* <p>Example usage:</p>
* <pre>
* try (SessionLease lease = registry.checkOutSession("mySession", 5, TimeUnit.SECONDS)) {
* KieSession session = lease.getSession();
* // Perform thread-safe operations on session
* } // Lock automatically released
* </pre>
*
* @param sessionId the unique identifier for the session
* @param timeout the maximum time to wait for the lock
* @param unit the time unit of the timeout argument
* @return a SessionLease that provides access to the session and releases the lock when closed
* @throws IllegalArgumentException if session doesn't exist
* @throws InterruptedException if the thread is interrupted while waiting
* @throws java.util.concurrent.TimeoutException if the lock cannot be acquired within the timeout
*/
SessionLease checkOutSession(String sessionId, long timeout, TimeUnit unit)
throws InterruptedException, java.util.concurrent.TimeoutException;

/**
* Checks if a session exists in the registry.
*
* @param sessionId the unique identifier to check
* @return true if the session exists, false otherwise
*/
boolean sessionExists(String sessionId);

/**
* Cleans up expired or stale sessions based on implementation-specific criteria.
* This method can be called manually when needed.
*/
void cleanupExpiredSessions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.openmrs.module.drools.session.impl;

import org.kie.api.runtime.KieSession;
import org.openmrs.module.drools.session.SessionLease;
import org.openmrs.module.drools.session.SessionRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;

/**
* Thread-safe registry implementation for managing stateful KieSession instances.
*
* <p>Uses a check-out/return pattern with ReentrantLock to prevent Thread1/Thread2
* data race conditions. Sessions must be checked out via {@link #checkOutSession(String, long, TimeUnit)}
* which returns a SessionLease that MUST be closed to release the lock.</p>
*/
@Component
public class SessionRegistryImpl implements SessionRegistry {

private static final Logger log = LoggerFactory.getLogger(SessionRegistryImpl.class);

// Session storage
private final ConcurrentHashMap<String, KieSession> sessions = new ConcurrentHashMap<>();

// Session locks for check-out mechanism
private final ConcurrentHashMap<String, ReentrantLock> sessionLocks = new ConcurrentHashMap<>();

@Override
public boolean registerSession(String sessionId, KieSession session) {
if (sessionId == null || sessionId.trim().isEmpty()) {
log.debug("Registration failed: Session ID cannot be null or empty");
return false;
}
if (session == null) {
log.debug("Registration failed: KieSession cannot be null");
return false;
}

// Attempt to register session atomically
KieSession existingSession = sessions.putIfAbsent(sessionId, session);
if (existingSession != null) {
log.debug("Session with ID '{}' already exists in registry. Registration ignored.", sessionId);
return false;
}

// Create lock for this session
sessionLocks.putIfAbsent(sessionId, new ReentrantLock(true)); // fair lock

log.info("Registered session '{}'", sessionId);
return true;
}

@Override
public SessionLease checkOutSession(String sessionId, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (sessionId == null) {
throw new IllegalArgumentException("Session ID cannot be null");
}

// Verify session exists
KieSession session = sessions.get(sessionId);
if (session == null) {
throw new IllegalArgumentException("Session '" + sessionId + "' not found in registry");
}

// Get the lock for this session
ReentrantLock lock = sessionLocks.get(sessionId);
if (lock == null) {
throw new IllegalStateException("Lock not found for session '" + sessionId + "'");
}

// Try to acquire the lock with timeout
boolean acquired = lock.tryLock(timeout, unit);
if (!acquired) {
throw new TimeoutException("Could not acquire lock for session '" + sessionId +
"' within " + timeout + " " + unit);
}

log.debug("Checked out session '{}' (lock acquired by thread '{}')",
sessionId, Thread.currentThread().getName());

// Return lease with callback to release lock
return new SessionLease(sessionId, session, () -> returnSession(sessionId, lock));
}

/**
* Internal method to return (unlock) a session.
* Called automatically by SessionLease.close().
*/
private void returnSession(String sessionId, ReentrantLock lock) {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug("Returned session '{}' (lock released by thread '{}')",
sessionId, Thread.currentThread().getName());
} else {
log.warn("Attempted to return session '{}' but lock not held by current thread", sessionId);
}
}

@Override
public boolean sessionExists(String sessionId) {
return sessionId != null && sessions.containsKey(sessionId);
}

@Override
public void cleanupExpiredSessions() {
// Manual cleanup method - can be called when needed
// Implementation can be added later if specific cleanup logic is required
log.debug("Manual cleanup invoked - no automatic cleanup configured");
}
}
Loading