In our previous posts, we explored the vision behind Cipher Horizon and its architectural foundations. Today, we'll dive deep into the crucial aspects of API design and data persistence that form the backbone of our microservice ecosystem.
1. API Design Strategy: The Art of Service Communication
Understanding Our API Design Philosophy
In developing Cipher Horizon, we faced a critical decision regarding API design. The modern landscape offers multiple approaches, each with its strengths. Our analysis led us to adopt a hybrid approach, combining REST and GraphQL, each serving specific purposes in our ecosystem.
Why We Chose a Hybrid Approach
The decision to implement both REST and GraphQL wasn't made lightly. We recognized that different parts of our system had varying requirements:
-
REST for CRUD Operations
- Simple, stateless operations benefit from REST's straightforward nature
- Better caching capabilities for frequently accessed resources
- Easier to understand and implement for basic operations
- More suitable for file uploads and simple data exchanges
-
GraphQL for Complex Data Requirements
- Reduces over-fetching and under-fetching of data
- Provides flexible queries for frontend teams
- Better handles complex, nested data relationships
- Enables efficient batch queries
Here's how we implemented this strategy:
// REST Implementation for Simple Operations
@Controller('api/v1/users')
export class UserController {
constructor(private readonly userService: UserService) {}
@Get(':id')
@UseInterceptors(CacheInterceptor)
async getUser(@Param('id') id: string): Promise<UserResponse> {
// Simple, cached user retrieval
return this.userService.findById(id);
}
@Post()
@UseGuards(AuthGuard)
async createUser(@Body() dto: CreateUserDto): Promise<UserResponse> {
// Straightforward user creation
return this.userService.create(dto);
}
}
// GraphQL Implementation for Complex Queries
const typeDefs = gql`
type User {
id: ID!
profile: Profile!
transactions: [Transaction!]!
analytics: UserAnalytics!
recommendations: [Recommendation!]!
}
type Query {
userDashboard(id: ID!): UserDashboardData!
}
`;
@Resolver('User')
class UserResolver {
@Query()
async userDashboard(@Args('id') id: string) {
// Complex data aggregation in a single query
const [user, transactions, analytics, recommendations] = await Promise.all([
this.userService.findById(id),
this.transactionService.getUserTransactions(id),
this.analyticsService.getUserAnalytics(id),
this.recommendationService.getForUser(id)
]);
return {
user,
transactions,
analytics,
recommendations
};
}
}
API Versioning Strategy
We implemented a robust versioning strategy to ensure backward compatibility while allowing for evolution:
// Version Management Through URL Path
@Controller('api/v1')
class ApiV1Controller {
// V1 implementations
}
@Controller('api/v2')
class ApiV2Controller {
// V2 implementations with new features
}
// Version Management Through Content Type
@Controller('api')
class ApiController {
@Get('users/:id')
@Header('Content-Type', 'application/vnd.cipher.v1+json')
getUserV1(@Param('id') id: string): Promise<UserV1Response> {
return this.userServiceV1.findById(id);
}
@Get('users/:id')
@Header('Content-Type', 'application/vnd.cipher.v2+json')
getUserV2(@Param('id') id: string): Promise<UserV2Response> {
return this.userServiceV2.findById(id);
}
}
Pagination and Filtering Implementation
We implemented cursor-based pagination for optimal performance with large datasets:
interface PaginationParams {
cursor?: string;
limit: number;
direction: 'forward' | 'backward';
}
class CursorPagination {
async paginate<T>(
query: QueryBuilder<T>,
params: PaginationParams
): Promise<PaginatedResult<T>> {
const decodedCursor = params.cursor
? this.decodeCursor(params.cursor)
: null;
const results = await query
.where(decodedCursor ? 'id > :id' : '1=1', {
id: decodedCursor?.id
})
.take(params.limit + 1)
.getMany();
const hasMore = results.length > params.limit;
const items = hasMore ? results.slice(0, -1) : results;
return {
items,
pageInfo: {
hasNextPage: hasMore,
endCursor: this.encodeCursor(items[items.length - 1])
}
};
}
private encodeCursor(item: any): string {
return Buffer.from(JSON.stringify({
id: item.id,
timestamp: item.createdAt
})).toString('base64');
}
private decodeCursor(cursor: string): any {
return JSON.parse(
Buffer.from(cursor, 'base64').toString('utf-8')
);
}
}
Error Handling and Response Standardization
We implemented a consistent error-handling strategy across all APIs:
class ApiError extends Error {
constructor(
public readonly code: string,
public readonly message: string,
public readonly status: number,
public readonly details?: any
) {
super(message);
}
}
@Catch()
export class GlobalExceptionFilter implements ExceptionFilter {
catch(exception: unknown, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const response = ctx.getResponse();
const error = this.normalizeError(exception);
response
.status(error.status)
.json({
code: error.code,
message: error.message,
details: error.details,
timestamp: new Date().toISOString()
});
}
private normalizeError(error: unknown): ApiError {
if (error instanceof ApiError) {
return error;
}
// Handle other types of errors
return new ApiError(
'INTERNAL_ERROR',
'An unexpected error occurred',
500
);
}
}
API Documentation and Testing
We prioritized comprehensive documentation and testing:
// Swagger Documentation
@ApiTags('Users')
@Controller('api/v1/users')
export class UserController {
@ApiOperation({ summary: 'Get user by ID' })
@ApiResponse({ status: 200, type: UserResponse })
@ApiResponse({ status: 404, type: ErrorResponse })
@Get(':id')
async getUser(@Param('id') id: string): Promise<UserResponse> {
return this.userService.findById(id);
}
}
// Integration Testing
describe('UserAPI', () => {
it('should handle pagination correctly', async () => {
// Create test data
const users = await createTestUsers(15);
// First page
const firstPage = await request(app)
.get('/api/v1/users')
.query({ limit: 10 });
expect(firstPage.body.items).toHaveLength(10);
expect(firstPage.body.pageInfo.hasNextPage).toBe(true);
// Second page
const secondPage = await request(app)
.get('/api/v1/users')
.query({
limit: 10,
cursor: firstPage.body.pageInfo.endCursor
});
expect(secondPage.body.items).toHaveLength(5);
expect(secondPage.body.pageInfo.hasNextPage).toBe(false);
});
});
2. Data Persistence Strategy: The Foundation of Data Management
Understanding Our Multi-Database Approach
In Cipher Horizon, we adopted a polyglot persistence strategy, recognizing that different types of data have different requirements. This section explains our reasoning and implementation details.
Why Multiple Database Types?
Our analysis revealed distinct data patterns that benefited from different storage solutions:
-
Transactional Data (PostgreSQL)
- ACID compliance for financial transactions
- Complex relationships between entities
- Strong consistency requirements
- Structured data with clear schemas
-
Document Storage (MongoDB)
- Flexible schema for evolving data structures
- Rich querying capabilities
- Better handling of nested data
- Horizontal scaling capabilities
-
Caching Layer (Redis)
- High-performance data access
- Session management
- Real-time features
- Temporary data storage
Here's how we implemented this strategy:
// Database Configuration Management
@Injectable()
class DatabaseConfigService {
constructor(
@Inject('CONFIG')
private readonly config: Configuration
) {}
getPostgresConfig(): TypeOrmModuleOptions {
return {
type: 'postgres',
host: this.config.get('POSTGRES_HOST'),
port: this.config.get('POSTGRES_PORT'),
username: this.config.get('POSTGRES_USER'),
password: this.config.get('POSTGRES_PASSWORD'),
database: this.config.get('POSTGRES_DB'),
entities: ['dist/**/*.entity{.ts,.js}'],
migrations: ['dist/migrations/*{.ts,.js}'],
ssl: this.config.get('NODE_ENV') === 'production',
poolSize: 20,
retryAttempts: 3,
retryDelay: 3000,
logging: this.config.get('NODE_ENV') === 'development'
};
}
getMongoConfig(): MongooseModuleOptions {
return {
uri: this.config.get('MONGODB_URI'),
useNewUrlParser: true,
useUnifiedTopology: true,
maxPoolSize: 50,
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,
};
}
getRedisConfig(): RedisOptions {
return {
host: this.config.get('REDIS_HOST'),
port: this.config.get('REDIS_PORT'),
password: this.config.get('REDIS_PASSWORD'),
tls: this.config.get('NODE_ENV') === 'production' ? {} : undefined,
maxRetriesPerRequest: 3,
enableReadyCheck: true,
};
}
}
Implementation of Data Models
PostgreSQL Entities:
@Entity()
export class Transaction {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column('decimal', { precision: 10, scale: 2 })
amount: number;
@Column('jsonb')
metadata: Record<string, any>;
@ManyToOne(() => User)
@JoinColumn()
user: User;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
@Version()
version: number;
@BeforeInsert()
async beforeInsert() {
// Validation and data preparation logic
await this.validateAmount();
}
}
// Repository Pattern Implementation
@Injectable()
export class TransactionRepository {
constructor(
@InjectRepository(Transaction)
private readonly repo: Repository<Transaction>,
private readonly logger: Logger
) {}
async createWithRetry(data: CreateTransactionDto): Promise<Transaction> {
const maxRetries = 3;
let lastError: Error;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await this.repo.save(data);
} catch (error) {
lastError = error;
if (!this.isRetryableError(error)) {
throw error;
}
await this.delay(attempt * 1000); // Exponential backoff
}
}
throw new MaxRetriesExceededError(lastError);
}
}
MongoDB Schemas:
@Schema({ timestamps: true })
export class UserProfile {
@Prop({ required: true })
userId: string;
@Prop({ type: Object })
preferences: {
theme: string;
notifications: NotificationSettings;
customFields?: Record<string, any>;
};
@Prop({ type: mongoose.Schema.Types.Mixed })
metadata: Record<string, any>;
@Prop({ default: 1 })
schemaVersion: number;
}
// Service Implementation
@Injectable()
export class UserProfileService {
constructor(
@InjectModel(UserProfile.name)
private readonly profileModel: Model<UserProfile>,
private readonly migrationService: MigrationService
) {}
async findAndUpdate(
userId: string,
update: Partial<UserProfile>
): Promise<UserProfile> {
const session = await this.profileModel.startSession();
session.startTransaction();
try {
const profile = await this.profileModel
.findOneAndUpdate(
{ userId },
{ $set: update },
{ new: true, session }
)
.exec();
if (!profile) {
throw new ProfileNotFoundError(userId);
}
// Check if migration is needed
if (profile.schemaVersion < CURRENT_SCHEMA_VERSION) {
await this.migrationService.migrateProfile(
profile,
session
);
}
await session.commitTransaction();
return profile;
} catch (error) {
await session.abortTransaction();
throw error;
} finally {
session.endSession();
}
}
}
Redis Cache Implementation:
@Injectable()
export class CacheService {
constructor(
private readonly redis: Redis,
private readonly logger: Logger,
private readonly config: CacheConfig
) {}
async setCached<T>(
key: string,
data: T,
options: CacheOptions = {}
): Promise<void> {
const {
ttl = this.config.defaultTTL,
tags = []
} = options;
const multi = this.redis.multi();
// Store the main data
multi.set(
this.formatKey(key),
JSON.stringify(data),
'EX',
ttl
);
// Store tag associations for cache invalidation
for (const tag of tags) {
multi.sadd(
this.formatTagKey(tag),
key
);
}
try {
await multi.exec();
} catch (error) {
this.logger.error(
`Cache set failed for key ${key}:`,
error
);
throw new CacheOperationError(error);
}
}
async invalidateByTag(tag: string): Promise<void> {
const keys = await this.redis.smembers(
this.formatTagKey(tag)
);
if (keys.length > 0) {
const multi = this.redis.multi();
// Delete all keys associated with the tag
multi.del(...keys.map(this.formatKey));
// Delete the tag set itself
multi.del(this.formatTagKey(tag));
await multi.exec();
}
}
private formatKey(key: string): string {
return `${this.config.keyPrefix}:${key}`;
}
private formatTagKey(tag: string): string {
return `${this.config.keyPrefix}:tag:${tag}`;
}
}
Data Migration Strategy
@Injectable()
export class MigrationService {
constructor(
private readonly mongoClient: MongoClient,
private readonly postgres: Connection,
private readonly logger: Logger
) {}
async executeMigration(
migration: Migration
): Promise<MigrationResult> {
const session = await this.mongoClient.startSession();
session.startTransaction();
try {
// Execute migration steps
const result = await migration.up(session);
// Update migration metadata
await this.updateMigrationMetadata(
migration.version,
session
);
await session.commitTransaction();
return result;
} catch (error) {
await session.abortTransaction();
this.logger.error(
`Migration ${migration.version} failed:`,
error
);
throw error;
} finally {
session.endSession();
}
}
private async updateMigrationMetadata(
version: string,
session: ClientSession
): Promise<void> {
await this.mongoClient
.db()
.collection('migrations')
.updateOne(
{ version },
{
$set: {
executedAt: new Date(),
status: 'completed'
}
},
{ session }
);
}
}
3. Data Consistency Management: Ensuring Reliability Across Services
Understanding Consistency Requirements
In a distributed system like Cipher Horizon, maintaining data consistency across services is crucial. We implemented different consistency patterns based on business requirements:
-
Strong Consistency
- Financial transactions
- User authentication
- Critical system configurations
-
Eventual Consistency
- Analytics data
- User preferences
- Activity logs
Implementation of Consistency Patterns
Distributed Transaction Management
@Injectable()
class TransactionOrchestrator {
constructor(
private readonly eventBus: EventBus,
private readonly sagaManager: SagaManager,
private readonly logger: Logger
) {}
@Transactional()
async executeFinancialTransaction(
command: TransactionCommand
): Promise<TransactionResult> {
const saga = await this.sagaManager.create('FINANCIAL_TRANSACTION', {
correlationId: command.id,
metadata: {
userId: command.userId,
amount: command.amount,
timestamp: new Date()
}
});
try {
// Step 1: Validate account balance
await saga.executeStep('VALIDATE_BALANCE', async () => {
const isValid = await this.accountService.validateBalance(
command.userId,
command.amount
);
if (!isValid) {
throw new InsufficientBalanceError();
}
});
// Step 2: Reserve funds
await saga.executeStep('RESERVE_FUNDS', async () => {
await this.accountService.reserveFunds(
command.userId,
command.amount
);
});
// Step 3: Process transaction
await saga.executeStep('PROCESS_TRANSACTION', async () => {
const transaction = await this.transactionService.create(command);
return transaction.id;
});
// Step 4: Update account balance
await saga.executeStep('UPDATE_BALANCE', async () => {
await this.accountService.updateBalance(
command.userId,
command.amount
);
});
await saga.complete();
return { success: true, transactionId: saga.getStepResult('PROCESS_TRANSACTION') };
} catch (error) {
await saga.compensate();
throw error;
}
}
}
Event-Driven Consistency
@Injectable()
class EventManager {
constructor(
private readonly eventStore: EventStore,
private readonly eventBus: EventBus,
private readonly outbox: EventOutbox
) {}
async publishEvent<T extends Event>(event: T): Promise<void> {
const session = await this.eventStore.startSession();
try {
// Store event
await this.eventStore.store(event, session);
// Add to outbox for reliable delivery
await this.outbox.add(event, session);
await session.commit();
// Attempt immediate publish
await this.eventBus.publish(event);
} catch (error) {
await session.rollback();
throw new EventPublishError(error);
}
}
}
@Injectable()
class EventOutboxProcessor {
@Cron('*/1 * * * *') // Run every minute
async processOutbox(): Promise<void> {
const unprocessedEvents = await this.outbox.getUnprocessed();
for (const event of unprocessedEvents) {
try {
await this.eventBus.publish(event);
await this.outbox.markAsProcessed(event.id);
} catch (error) {
if (event.retryCount < this.config.maxRetries) {
await this.outbox.incrementRetry(event.id);
} else {
await this.outbox.markAsFailed(event.id);
await this.deadLetterQueue.add(event);
}
}
}
}
}
Optimistic Concurrency Control
@Injectable()
class DocumentManager {
async updateDocument(
id: string,
updates: Partial<Document>,
version: number
): Promise<Document> {
const result = await this.documentRepo.findOneAndUpdate(
{
_id: id,
version: version
},
{
$set: updates,
$inc: { version: 1 }
},
{ new: true }
);
if (!result) {
throw new ConcurrencyError(
'Document was modified by another process'
);
}
return result;
}
}
4. Scalability and Performance Optimization
Horizontal Scaling Strategy
@Injectable()
class ServiceScaler {
constructor(
private readonly k8sClient: KubernetesClient,
private readonly metrics: MetricsService,
private readonly config: ScalingConfig
) {}
@Interval(30000) // Check every 30 seconds
async evaluateScaling(): Promise<void> {
const metrics = await this.metrics.getCurrentMetrics();
const decision = this.calculateScalingDecision(metrics);
if (decision.shouldScale) {
await this.scale(decision);
}
}
private calculateScalingDecision(
metrics: ServiceMetrics
): ScalingDecision {
return {
shouldScale: metrics.cpu > this.config.cpuThreshold ||
metrics.memory > this.config.memoryThreshold,
direction: 'up',
replicas: this.calculateOptimalReplicas(metrics)
};
}
private async scale(decision: ScalingDecision): Promise<void> {
await this.k8sClient.scaleDeployment({
namespace: this.config.namespace,
deployment: this.config.deploymentName,
replicas: decision.replicas
});
}
}
Caching Strategy
@Injectable()
class CacheManager {
constructor(
private readonly redis: Redis,
private readonly config: CacheConfig
) {}
async getOrSet<T>(
key: string,
factory: () => Promise<T>,
options: CacheOptions
): Promise<T> {
const cached = await this.redis.get(key);
if (cached) {
return JSON.parse(cached);
}
const value = await factory();
await this.redis.set(
key,
JSON.stringify(value),
'EX',
options.ttl || this.config.defaultTTL
);
return value;
}
async invalidatePattern(pattern: string): Promise<void> {
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
}
Performance Monitoring and Optimization
@Injectable()
class PerformanceMonitor {
constructor(
private readonly metrics: PrometheusClient,
private readonly logger: Logger
) {}
private readonly requestDuration = new Histogram({
name: 'http_request_duration_seconds',
help: 'HTTP request duration in seconds',
labelNames: ['method', 'route', 'status_code']
});
@Middleware()
async trackRequestMetrics(
req: Request,
res: Response,
next: NextFunction
): Promise<void> {
const start = process.hrtime();
res.on('finish', () => {
const [seconds, nanoseconds] = process.hrtime(start);
const duration = seconds + nanoseconds / 1e9;
this.requestDuration
.labels(req.method, req.route?.path, res.statusCode.toString())
.observe(duration);
if (duration > this.config.slowRequestThreshold) {
this.logger.warn(
`Slow request detected: ${req.method} ${req.path} (${duration}s)`
);
}
});
await next();
}
}
Query Optimization
@Injectable()
class QueryOptimizer {
async optimizeQuery<T>(
queryBuilder: SelectQueryBuilder<T>
): Promise<void> {
const explanation = await queryBuilder
.explain()
.getRawMany();
// Analyze query plan
const analysis = this.analyzeQueryPlan(explanation);
// Apply optimizations based on analysis
if (analysis.needsIndex) {
await this.createOptimalIndex(
queryBuilder.entity,
analysis.suggestedIndices
);
}
if (analysis.shouldUseView) {
await this.createMaterializedView(
queryBuilder,
analysis.viewDefinition
);
}
}
private analyzeQueryPlan(
explanation: QueryExplanation[]
): QueryAnalysis {
// Implement query plan analysis logic
return {
needsIndex: this.detectMissingIndices(explanation),
suggestedIndices: this.suggestIndices(explanation),
shouldUseView: this.shouldCreateView(explanation),
viewDefinition: this.generateViewDefinition(explanation)
};
}
}
Lessons Learned
-
API Design:
- Start with clear contracts and documentation
- Plan for versioning from day one
- Consider both developer experience and performance
-
Data Persistence:
- Choose databases based on data access patterns
- Plan for eventual consistency
- Implement robust error handling and recovery
-
Performance:
- Cache strategically
- Monitor and optimize database queries
- Implement proper indexing strategies
This post has covered the essential aspects of building a robust API and data layer for our microservice architecture. The examples provided are practical implementations that you can adapt for your own projects. Remember, the key to success lies in making informed decisions based on your specific requirements while keeping scalability and maintainability in mind.
What challenges have you faced in designing APIs or managing data persistence in your microservice architectures? Share your experiences in the comments below!
Top comments (0)