Initial implementation of HTTP Sender Plugin following TDD methodology with hexagonal architecture. All 313 tests passing (0 failures). This commit adds: - Complete domain model and port interfaces - All adapter implementations (HTTP, gRPC, file logging, config) - Application services (data collection, transmission, backpressure) - Comprehensive test suite with 18 integration tests Test fixes applied during implementation: - Fix base64 encoding validation in DataCollectionServiceIntegrationTest - Fix exception type handling in IConfigurationPortTest - Fix CompletionException unwrapping in IHttpPollingPortTest - Fix sequential batching in DataTransmissionServiceIntegrationTest - Add test adapter failure simulation for reconnection tests - Use adapter counters for gRPC verification Files added: - pom.xml with all dependencies (JUnit 5, Mockito, WireMock, gRPC, Jackson) - src/main/java: Domain model, ports, adapters, application services - src/test/java: Unit tests, integration tests, test utilities
23 KiB
Thread Safety Guidelines
Version: 1.0 Project: HTTP Sender Plugin (HSP) Last Updated: 2025-11-20
Purpose
This document defines thread safety requirements, patterns, and best practices for the HSP project. The HSP system uses Java 25 Virtual Threads for concurrent HTTP polling and must ensure thread-safe operations for shared state.
🎯 Thread Safety Requirements
1. Critical Thread-Safe Components
The following components MUST be thread-safe per requirements:
| Component | Requirement | Concurrency Pattern | Justification |
|---|---|---|---|
| BufferManager | Req-FR-26, Arch-7 | Thread-safe queue | Multiple producers (HTTP pollers), single consumer (gRPC transmitter) |
| CollectionStatistics | Req-NFR-8, Arch-8 | Atomic counters | Multiple threads updating statistics concurrently |
| DataCollectionService | Req-FR-14, Arch-6 | Virtual threads | 1000 concurrent HTTP polling tasks |
| RateLimiter | Enhancement | Thread-safe rate limiting | Multiple threads requesting rate limit permits |
| BackpressureController | Req-FR-27 | Atomic monitoring | Buffer usage checked by multiple threads |
2. Immutable Components (Thread-Safe by Design)
The following components MUST be immutable:
| Component | Type | Thread Safety |
|---|---|---|
| DiagnosticData | Value Object | Immutable (final fields, no setters) |
| Configuration | Value Object | Immutable (loaded once at startup) |
| HealthCheckResponse | Value Object | Immutable (snapshot of current state) |
| BufferStatistics | Value Object | Immutable (snapshot of buffer state) |
3. Single-Threaded Components (No Thread Safety Needed)
The following components run on dedicated threads (no concurrent access):
| Component | Thread Model | Justification |
|---|---|---|
| DataTransmissionService | Single consumer thread | Req-FR-25: One thread consumes from buffer |
| ConfigurationManager | Startup only | Loaded once before concurrent operations start |
| Adapters | Per-request isolation | Each request creates new adapter instance or uses thread-local state |
🧵 Concurrency Model Overview
System Threading Architecture
┌─────────────────────────────────────────────────────────┐
│ HTTP Sender Plugin (HSP) Threading Model │
├─────────────────────────────────────────────────────────┤
│ │
│ Main Thread │
│ └─> Startup & Configuration │
│ │
│ Virtual Thread Pool (HTTP Polling) │
│ ├─> Virtual Thread 1 → HttpPollingAdapter │
│ ├─> Virtual Thread 2 → HttpPollingAdapter │
│ ├─> Virtual Thread 3 → HttpPollingAdapter │
│ └─> ... (up to 1000 concurrent virtual threads) │
│ ↓ │
│ [Thread-Safe BufferManager] (ArrayBlockingQueue) │
│ ↓ │
│ Single Consumer Thread (gRPC Transmission) │
│ └─> DataTransmissionService → GrpcStreamAdapter │
│ │
│ Health Check HTTP Server Thread │
│ └─> HealthCheckController (embedded Jetty) │
│ │
└─────────────────────────────────────────────────────────┘
Virtual Threads (Java 25)
Why Virtual Threads?
- Requirement: Req-NFR-1: Support 1000 concurrent endpoints
- Benefit: Lightweight threads (millions possible vs. thousands of platform threads)
- Use Case: I/O-bound HTTP polling (mostly waiting for network responses)
Virtual Thread Best Practices:
- ✅ DO: Use for I/O-bound tasks (HTTP requests, file I/O)
- ✅ DO: Create one virtual thread per endpoint poll
- ✅ DO: Let virtual threads block (don't use async APIs unnecessarily)
- ❌ DON'T: Use for CPU-bound tasks (use platform threads instead)
- ❌ DON'T: Use with
synchronizedon long-running operations (useReentrantLock)
Creating Virtual Threads:
// CORRECT: Virtual thread executor for HTTP polling
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// Schedule polling tasks
for (String url : endpoints) {
executor.submit(() -> pollEndpoint(url));
}
🔒 Thread Safety Patterns
Pattern 1: Immutability (Preferred)
When to Use: Value objects, configuration, data transfer objects
Benefits:
- Thread-safe by design (no synchronization needed)
- No defensive copying required
- Easier to reason about
Implementation:
/**
* Immutable value object representing diagnostic data.
* Thread-safe by design (immutable).
*
* @requirement Req-FR-22 Immutable data representation
*/
public final class DiagnosticData {
private final String endpointUrl;
private final byte[] data;
private final Instant timestamp;
public DiagnosticData(String endpointUrl, byte[] data) {
this.endpointUrl = Objects.requireNonNull(endpointUrl);
// Defensive copy of mutable array
this.data = Arrays.copyOf(data, data.length);
this.timestamp = Instant.now();
}
// Only getters, no setters
public String getEndpointUrl() {
return endpointUrl;
}
public byte[] getData() {
// Return defensive copy
return Arrays.copyOf(data, data.length);
}
public Instant getTimestamp() {
return timestamp; // Instant is immutable
}
// equals, hashCode, toString...
}
Checklist:
- Class declared
final(cannot be subclassed) - All fields declared
final(assigned once in constructor) - No setter methods (only getters)
- Defensive copies for mutable fields (arrays, collections)
- Getters return defensive copies of mutable fields
Pattern 2: Concurrent Collections
When to Use: Shared data structures accessed by multiple threads
Preferred Collections:
ArrayBlockingQueue<T>: Fixed-size blocking queue (buffer)ConcurrentHashMap<K,V>: Thread-safe map (if needed)CopyOnWriteArrayList<T>: Thread-safe list for read-heavy workloads
Implementation (BufferManager):
/**
* Thread-safe buffer manager using ArrayBlockingQueue.
*
* <p>Supports multiple producers (HTTP polling threads) and single
* consumer (gRPC transmission thread).
*
* @requirement Req-FR-26 Thread-safe buffer
* @requirement Req-Arch-7 Concurrent collection usage
*/
public class BufferManager {
private final BlockingQueue<DiagnosticData> buffer;
private final int capacity;
// Statistics (atomic counters)
private final AtomicLong totalOffered = new AtomicLong(0);
private final AtomicLong totalDiscarded = new AtomicLong(0);
public BufferManager(int capacity) {
this.capacity = capacity;
// ArrayBlockingQueue is thread-safe
this.buffer = new ArrayBlockingQueue<>(capacity);
}
/**
* Offers data to buffer. Thread-safe.
* Discards oldest if buffer is full (FIFO).
*
* @requirement Req-FR-27 FIFO overflow handling
*/
public void offer(DiagnosticData data) {
Objects.requireNonNull(data, "data cannot be null");
totalOffered.incrementAndGet();
if (!buffer.offer(data)) {
// Buffer full, discard oldest (FIFO)
buffer.poll(); // Remove oldest
buffer.offer(data); // Add new
totalDiscarded.incrementAndGet();
}
}
/**
* Polls data from buffer. Thread-safe.
* Blocks if buffer is empty (up to timeout).
*/
public DiagnosticData poll(long timeout, TimeUnit unit)
throws InterruptedException {
return buffer.poll(timeout, unit);
}
/**
* Returns current buffer size. Thread-safe.
*/
public int size() {
return buffer.size();
}
/**
* Returns buffer statistics snapshot. Thread-safe.
*/
public BufferStatistics getStatistics() {
return new BufferStatistics(
size(),
capacity,
totalOffered.get(),
totalDiscarded.get()
);
}
}
Checklist:
- Use
BlockingQueuefor producer-consumer patterns - Use
ArrayBlockingQueuefor bounded buffers - Use
ConcurrentHashMapfor thread-safe maps - Avoid
synchronizedon collection itself (use concurrent collection)
Pattern 3: Atomic Variables
When to Use: Counters, flags, simple shared state
Atomic Classes:
AtomicInteger: Thread-safe integer counterAtomicLong: Thread-safe long counterAtomicBoolean: Thread-safe boolean flagAtomicReference<T>: Thread-safe object reference
Implementation (CollectionStatistics):
/**
* Thread-safe collection statistics using atomic variables.
*
* @requirement Req-NFR-8 Statistics tracking
* @requirement Req-Arch-8 Atomic operations
*/
public class CollectionStatistics {
// Atomic counters for thread safety
private final AtomicLong totalPolls = new AtomicLong(0);
private final AtomicLong successfulPolls = new AtomicLong(0);
private final AtomicLong failedPolls = new AtomicLong(0);
// Time-windowed metrics (last 30 seconds)
private final Queue<Long> recentPolls = new ConcurrentLinkedQueue<>();
/**
* Increments total poll count. Thread-safe.
*/
public void incrementTotalPolls() {
long count = totalPolls.incrementAndGet();
recentPolls.offer(System.currentTimeMillis());
cleanupOldMetrics();
}
/**
* Increments successful poll count. Thread-safe.
*/
public void incrementSuccessfulPolls() {
successfulPolls.incrementAndGet();
}
/**
* Increments failed poll count. Thread-safe.
*/
public void incrementFailedPolls() {
failedPolls.incrementAndGet();
}
/**
* Returns snapshot of current statistics. Thread-safe.
*/
public StatisticsSnapshot getSnapshot() {
return new StatisticsSnapshot(
totalPolls.get(),
successfulPolls.get(),
failedPolls.get(),
calculateRecentRate()
);
}
/**
* Removes metrics older than 30 seconds.
*/
private void cleanupOldMetrics() {
long cutoff = System.currentTimeMillis() - 30_000;
recentPolls.removeIf(timestamp -> timestamp < cutoff);
}
private double calculateRecentRate() {
return recentPolls.size() / 30.0; // polls per second
}
}
Checklist:
- Use
AtomicLongfor counters (notlongwithsynchronized) - Use
incrementAndGet()for atomic increment-and-read - Use
get()for atomic read - Use
compareAndSet()for atomic compare-and-swap (if needed)
Pattern 4: Locks (When Needed)
When to Use: Complex synchronized operations, multiple state updates
Lock Types:
ReentrantLock: Exclusive lock (mutual exclusion)ReentrantReadWriteLock: Read-write lock (multiple readers, one writer)StampedLock: Optimistic locking (Java 8+)
Implementation (if needed):
public class RateLimitedAdapter {
private final ReentrantLock lock = new ReentrantLock();
private long lastRequestTime = 0;
private final long minIntervalMs;
/**
* Thread-safe rate limiting with explicit lock.
*
* Prefer ReentrantLock over synchronized for virtual threads.
*/
public void acquirePermit() throws InterruptedException {
lock.lock();
try {
long now = System.currentTimeMillis();
long elapsed = now - lastRequestTime;
if (elapsed < minIntervalMs) {
Thread.sleep(minIntervalMs - elapsed);
}
lastRequestTime = System.currentTimeMillis();
} finally {
lock.unlock(); // ALWAYS unlock in finally
}
}
}
Checklist:
- Use
ReentrantLockinstead ofsynchronizedfor virtual threads - Always unlock in
finallyblock - Avoid holding locks during I/O operations (risk of pinning virtual threads)
- Document lock ordering if multiple locks used (prevent deadlock)
Pattern 5: Thread Confinement
When to Use: State that doesn't need to be shared
Strategies:
- Stack Confinement: Use local variables (method parameters, local vars)
- Thread-Local: Use
ThreadLocal<T>for thread-specific state - Instance-Per-Thread: Create new instances per thread
Implementation:
/**
* HttpPollingAdapter is thread-confined by design.
* Each virtual thread creates its own adapter instance.
* No shared mutable state → no thread safety needed.
*/
public class HttpPollingAdapter implements IHttpPollingPort {
// Immutable configuration (thread-safe)
private final Configuration config;
// Thread-confined HttpClient (one per instance)
private final HttpClient httpClient;
public HttpPollingAdapter(Configuration config) {
this.config = config; // Immutable
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(30))
.build();
}
@Override
public CompletableFuture<byte[]> pollEndpoint(String url) {
// This method is called by a single virtual thread
// No shared mutable state → thread-safe by design
return httpClient.sendAsync(buildRequest(url), BodyHandlers.ofByteArray())
.thenApply(HttpResponse::body);
}
}
Checklist:
- Prefer immutability and thread confinement over synchronization
- Document thread ownership in Javadoc
- Avoid sharing mutable state when possible
🧪 Testing Thread Safety
Test Strategy
1. Unit Tests with Concurrent Access:
@Test
void shouldBeThreadSafe_whenMultipleThreadsOfferConcurrently() {
// Given
BufferManager buffer = new BufferManager(100);
int numThreads = 50;
int offersPerThread = 100;
// When: Multiple threads offer concurrently
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(() -> {
for (int j = 0; j < offersPerThread; j++) {
buffer.offer(new DiagnosticData("url", new byte[]{1,2,3}));
}
}));
}
// Wait for completion
for (Future<?> future : futures) {
future.get();
}
executor.shutdown();
// Then: All offers processed (no data loss except overflow)
BufferStatistics stats = buffer.getStatistics();
assertThat(stats.totalOffered()).isEqualTo(numThreads * offersPerThread);
}
2. Stress Tests:
@Test
void shouldHandleHighConcurrency_with1000ProducersAndConsumers() {
BufferManager buffer = new BufferManager(300);
// 1000 producers
ExecutorService producers = Executors.newVirtualThreadPerTaskExecutor();
for (int i = 0; i < 1000; i++) {
producers.submit(() -> {
for (int j = 0; j < 1000; j++) {
buffer.offer(new DiagnosticData("url", new byte[]{1,2,3}));
}
});
}
// 1 consumer
ExecutorService consumer = Executors.newSingleThreadExecutor();
AtomicLong consumed = new AtomicLong(0);
consumer.submit(() -> {
while (consumed.get() < 1_000_000) {
try {
DiagnosticData data = buffer.poll(1, TimeUnit.SECONDS);
if (data != null) {
consumed.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
// Wait for completion and verify
producers.shutdown();
producers.awaitTermination(10, TimeUnit.MINUTES);
consumer.shutdown();
consumer.awaitTermination(1, TimeUnit.MINUTES);
// Verify: No deadlock, no data corruption
assertThat(consumed.get()).isGreaterThan(0);
}
3. Race Condition Detection:
@Test
void shouldNotHaveRaceCondition_inAtomicIncrement() {
CollectionStatistics stats = new CollectionStatistics();
int numThreads = 100;
int incrementsPerThread = 10000;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
executor.submit(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
stats.incrementTotalPolls();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
// Verify: Exact count (no lost updates)
assertThat(stats.getSnapshot().totalPolls())
.isEqualTo((long) numThreads * incrementsPerThread);
}
Thread Safety Test Checklist
- Concurrent access tests: Multiple threads accessing shared state
- Stress tests: 100+ threads, 1000+ operations per thread
- Race condition tests: Verify atomic operations (no lost updates)
- Deadlock tests: Complex locking scenarios (if applicable)
- Immutability tests: Verify no setters, defensive copies
🚨 Common Thread Safety Mistakes
Mistake 1: Non-Atomic Check-Then-Act
❌ WRONG: Race condition
public void offer(DiagnosticData data) {
if (buffer.size() < capacity) { // Check
buffer.add(data); // Act (another thread may have added meanwhile)
}
}
✅ CORRECT: Atomic operation
public void offer(DiagnosticData data) {
buffer.offer(data); // ArrayBlockingQueue handles atomicity
}
Mistake 2: Mutable Shared State
❌ WRONG: Mutable shared field
public class Statistics {
private long totalPolls = 0; // Not thread-safe!
public void increment() {
totalPolls++; // Race condition: read-modify-write
}
}
✅ CORRECT: Atomic variable
public class Statistics {
private final AtomicLong totalPolls = new AtomicLong(0);
public void increment() {
totalPolls.incrementAndGet(); // Atomic operation
}
}
Mistake 3: Exposing Mutable Internal State
❌ WRONG: Exposing internal array
public class DiagnosticData {
private final byte[] data;
public byte[] getData() {
return data; // Caller can modify internal state!
}
}
✅ CORRECT: Defensive copy
public class DiagnosticData {
private final byte[] data;
public byte[] getData() {
return Arrays.copyOf(data, data.length); // Safe copy
}
}
Mistake 4: Synchronized on Long-Running Operation
❌ WRONG: Holding lock during I/O (blocks virtual threads)
public synchronized byte[] pollEndpoint(String url) {
return httpClient.send(request).body(); // I/O while holding lock!
}
✅ CORRECT: No synchronization for thread-confined code
public byte[] pollEndpoint(String url) {
// Each thread has its own adapter instance
return httpClient.send(request).body(); // No shared state
}
Mistake 5: Inconsistent Locking
❌ WRONG: Inconsistent synchronization
public void increment() {
synchronized (this) {
count++;
}
}
public int getCount() {
return count; // Not synchronized! Can see stale value
}
✅ CORRECT: Consistent synchronization or atomic variable
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
📊 Thread Safety Review Checklist
Use this checklist during code reviews:
Immutability
- Value objects are
finalclasses - All fields are
final - No setter methods
- Defensive copies for mutable fields (arrays, collections)
- Getters return defensive copies of mutable fields
Concurrent Collections
- Use
BlockingQueuefor producer-consumer patterns - Use
ArrayBlockingQueuefor bounded buffers - Use
ConcurrentHashMapfor thread-safe maps - Avoid manual synchronization on collections
Atomic Variables
- Use
AtomicLong/AtomicIntegerfor counters - Use atomic operations (
incrementAndGet,get,compareAndSet) - No read-modify-write with plain
long/int
Locks
- Use
ReentrantLockinstead ofsynchronized(for virtual threads) - Always unlock in
finallyblock - No I/O operations while holding lock
- Lock ordering documented (if multiple locks)
Virtual Threads
- Virtual threads used for I/O-bound tasks
- No
synchronizedon long-running operations - No CPU-bound work in virtual threads
Testing
- Concurrent access tests exist (multiple threads)
- Stress tests exist (100+ threads, 1000+ operations)
- Race condition tests verify atomic operations
- No flaky tests (deterministic results)
📚 Resources
Java Concurrency References
- "Java Concurrency in Practice" by Brian Goetz (Chapter 2-5)
- JDK 25 Documentation: Virtual Threads (JEP 444)
- Java Memory Model (JLS §17.4)
Internal Documentation
🎯 Summary: Thread Safety Mindset
"Thread safety is not optional—it's correctness."
The Thread Safety Hierarchy (Prefer in Order)
- Immutability (best): No shared mutable state
- Thread Confinement: State not shared between threads
- Concurrent Collections: Use built-in thread-safe collections
- Atomic Variables: For simple shared state (counters, flags)
- Locks: For complex synchronized operations (last resort)
Key Principles
- Design for immutability first: Mutable shared state is the enemy
- Prefer composition over manual synchronization: Use
BlockingQueue,AtomicLong, etc. - Test concurrency explicitly: Don't rely on "it works in single-threaded tests"
- Document thread safety: Javadoc must state thread safety guarantees
When in doubt, ask: "What happens if two threads call this at the same time?"
Document Control:
- Version: 1.0
- Created: 2025-11-20
- Status: Active
- Review Cycle: After each sprint