DEV Community

Cover image for Data in Motion: Building Cipher Horizon's Service Layer
Daniele Minatto
Daniele Minatto

Posted on

Data in Motion: Building Cipher Horizon's Service Layer

In our previous posts, we explored the vision behind Cipher Horizon and its architectural foundations. Today, we'll dive deep into the crucial aspects of API design and data persistence that form the backbone of our microservice ecosystem.

1. API Design Strategy: The Art of Service Communication

Understanding Our API Design Philosophy

In developing Cipher Horizon, we faced a critical decision regarding API design. The modern landscape offers multiple approaches, each with its strengths. Our analysis led us to adopt a hybrid approach, combining REST and GraphQL, each serving specific purposes in our ecosystem.

Why We Chose a Hybrid Approach

The decision to implement both REST and GraphQL wasn't made lightly. We recognized that different parts of our system had varying requirements:

  1. REST for CRUD Operations
    • Simple, stateless operations benefit from REST's straightforward nature
    • Better caching capabilities for frequently accessed resources
    • Easier to understand and implement for basic operations
    • More suitable for file uploads and simple data exchanges
  2. GraphQL for Complex Data Requirements
    • Reduces over-fetching and under-fetching of data
    • Provides flexible queries for frontend teams
    • Better handles complex, nested data relationships
    • Enables efficient batch queries

Here's how we implemented this strategy:

// REST Implementation for Simple Operations
@Controller('api/v1/users')
export class UserController {
    constructor(private readonly userService: UserService) {}

    @Get(':id')
    @UseInterceptors(CacheInterceptor)
    async getUser(@Param('id') id: string): Promise<UserResponse> {
        // Simple, cached user retrieval
        return this.userService.findById(id);
    }

    @Post()
    @UseGuards(AuthGuard)
    async createUser(@Body() dto: CreateUserDto): Promise<UserResponse> {
        // Straightforward user creation
        return this.userService.create(dto);
    }
}

// GraphQL Implementation for Complex Queries
const typeDefs = gql`
    type User {
        id: ID!
        profile: Profile!
        transactions: [Transaction!]!
        analytics: UserAnalytics!
        recommendations: [Recommendation!]!
    }

    type Query {
        userDashboard(id: ID!): UserDashboardData!
    }
`;

@Resolver('User')
class UserResolver {
    @Query()
    async userDashboard(@Args('id') id: string) {
        // Complex data aggregation in a single query
        const [user, transactions, analytics, recommendations] = await Promise.all([
            this.userService.findById(id),
            this.transactionService.getUserTransactions(id),
            this.analyticsService.getUserAnalytics(id),
            this.recommendationService.getForUser(id)
        ]);

        return {
            user,
            transactions,
            analytics,
            recommendations
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

API Versioning Strategy

We implemented a robust versioning strategy to ensure backward compatibility while allowing for evolution:

// Version Management Through URL Path
@Controller('api/v1')
class ApiV1Controller {
    // V1 implementations
}

@Controller('api/v2')
class ApiV2Controller {
    // V2 implementations with new features
}

// Version Management Through Content Type
@Controller('api')
class ApiController {
    @Get('users/:id')
    @Header('Content-Type', 'application/vnd.cipher.v1+json')
    getUserV1(@Param('id') id: string): Promise<UserV1Response> {
        return this.userServiceV1.findById(id);
    }

    @Get('users/:id')
    @Header('Content-Type', 'application/vnd.cipher.v2+json')
    getUserV2(@Param('id') id: string): Promise<UserV2Response> {
        return this.userServiceV2.findById(id);
    }
}
Enter fullscreen mode Exit fullscreen mode

Pagination and Filtering Implementation

We implemented cursor-based pagination for optimal performance with large datasets:

interface PaginationParams {
    cursor?: string;
    limit: number;
    direction: 'forward' | 'backward';
}

class CursorPagination {
    async paginate<T>(
        query: QueryBuilder<T>,
        params: PaginationParams
    ): Promise<PaginatedResult<T>> {
        const decodedCursor = params.cursor 
            ? this.decodeCursor(params.cursor)
            : null;

        const results = await query
            .where(decodedCursor ? 'id > :id' : '1=1', { 
                id: decodedCursor?.id 
            })
            .take(params.limit + 1)
            .getMany();

        const hasMore = results.length > params.limit;
        const items = hasMore ? results.slice(0, -1) : results;

        return {
            items,
            pageInfo: {
                hasNextPage: hasMore,
                endCursor: this.encodeCursor(items[items.length - 1])
            }
        };
    }

    private encodeCursor(item: any): string {
        return Buffer.from(JSON.stringify({
            id: item.id,
            timestamp: item.createdAt
        })).toString('base64');
    }

    private decodeCursor(cursor: string): any {
        return JSON.parse(
            Buffer.from(cursor, 'base64').toString('utf-8')
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

Error Handling and Response Standardization

We implemented a consistent error-handling strategy across all APIs:

class ApiError extends Error {
    constructor(
        public readonly code: string,
        public readonly message: string,
        public readonly status: number,
        public readonly details?: any
    ) {
        super(message);
    }
}

@Catch()
export class GlobalExceptionFilter implements ExceptionFilter {
    catch(exception: unknown, host: ArgumentsHost) {
        const ctx = host.switchToHttp();
        const response = ctx.getResponse();

        const error = this.normalizeError(exception);

        response
            .status(error.status)
            .json({
                code: error.code,
                message: error.message,
                details: error.details,
                timestamp: new Date().toISOString()
            });
    }

    private normalizeError(error: unknown): ApiError {
        if (error instanceof ApiError) {
            return error;
        }

        // Handle other types of errors
        return new ApiError(
            'INTERNAL_ERROR',
            'An unexpected error occurred',
            500
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

API Documentation and Testing

We prioritized comprehensive documentation and testing:

// Swagger Documentation
@ApiTags('Users')
@Controller('api/v1/users')
export class UserController {
    @ApiOperation({ summary: 'Get user by ID' })
    @ApiResponse({ status: 200, type: UserResponse })
    @ApiResponse({ status: 404, type: ErrorResponse })
    @Get(':id')
    async getUser(@Param('id') id: string): Promise<UserResponse> {
        return this.userService.findById(id);
    }
}

// Integration Testing
describe('UserAPI', () => {
    it('should handle pagination correctly', async () => {
        // Create test data
        const users = await createTestUsers(15);

        // First page
        const firstPage = await request(app)
            .get('/api/v1/users')
            .query({ limit: 10 });

        expect(firstPage.body.items).toHaveLength(10);
        expect(firstPage.body.pageInfo.hasNextPage).toBe(true);

        // Second page
        const secondPage = await request(app)
            .get('/api/v1/users')
            .query({ 
                limit: 10,
                cursor: firstPage.body.pageInfo.endCursor 
            });

        expect(secondPage.body.items).toHaveLength(5);
        expect(secondPage.body.pageInfo.hasNextPage).toBe(false);
    });
});
Enter fullscreen mode Exit fullscreen mode

2. Data Persistence Strategy: The Foundation of Data Management

Understanding Our Multi-Database Approach

In Cipher Horizon, we adopted a polyglot persistence strategy, recognizing that different types of data have different requirements. This section explains our reasoning and implementation details.

Why Multiple Database Types?

Our analysis revealed distinct data patterns that benefited from different storage solutions:

  1. Transactional Data (PostgreSQL)
    • ACID compliance for financial transactions
    • Complex relationships between entities
    • Strong consistency requirements
    • Structured data with clear schemas
  2. Document Storage (MongoDB)
    • Flexible schema for evolving data structures
    • Rich querying capabilities
    • Better handling of nested data
    • Horizontal scaling capabilities
  3. Caching Layer (Redis)
    • High-performance data access
    • Session management
    • Real-time features
    • Temporary data storage

Here's how we implemented this strategy:

// Database Configuration Management
@Injectable()
class DatabaseConfigService {
    constructor(
        @Inject('CONFIG')
        private readonly config: Configuration
    ) {}

    getPostgresConfig(): TypeOrmModuleOptions {
        return {
            type: 'postgres',
            host: this.config.get('POSTGRES_HOST'),
            port: this.config.get('POSTGRES_PORT'),
            username: this.config.get('POSTGRES_USER'),
            password: this.config.get('POSTGRES_PASSWORD'),
            database: this.config.get('POSTGRES_DB'),
            entities: ['dist/**/*.entity{.ts,.js}'],
            migrations: ['dist/migrations/*{.ts,.js}'],
            ssl: this.config.get('NODE_ENV') === 'production',
            poolSize: 20,
            retryAttempts: 3,
            retryDelay: 3000,
            logging: this.config.get('NODE_ENV') === 'development'
        };
    }

    getMongoConfig(): MongooseModuleOptions {
        return {
            uri: this.config.get('MONGODB_URI'),
            useNewUrlParser: true,
            useUnifiedTopology: true,
            maxPoolSize: 50,
            serverSelectionTimeoutMS: 5000,
            socketTimeoutMS: 45000,
        };
    }

    getRedisConfig(): RedisOptions {
        return {
            host: this.config.get('REDIS_HOST'),
            port: this.config.get('REDIS_PORT'),
            password: this.config.get('REDIS_PASSWORD'),
            tls: this.config.get('NODE_ENV') === 'production' ? {} : undefined,
            maxRetriesPerRequest: 3,
            enableReadyCheck: true,
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

Implementation of Data Models

PostgreSQL Entities:


@Entity()
export class Transaction {
    @PrimaryGeneratedColumn('uuid')
    id: string;

    @Column('decimal', { precision: 10, scale: 2 })
    amount: number;

    @Column('jsonb')
    metadata: Record<string, any>;

    @ManyToOne(() => User)
    @JoinColumn()
    user: User;

    @CreateDateColumn()
    createdAt: Date;

    @UpdateDateColumn()
    updatedAt: Date;

    @Version()
    version: number;

    @BeforeInsert()
    async beforeInsert() {
        // Validation and data preparation logic
        await this.validateAmount();
    }
}

// Repository Pattern Implementation
@Injectable()
export class TransactionRepository {
    constructor(
        @InjectRepository(Transaction)
        private readonly repo: Repository<Transaction>,
        private readonly logger: Logger
    ) {}

    async createWithRetry(data: CreateTransactionDto): Promise<Transaction> {
        const maxRetries = 3;
        let lastError: Error;

        for (let attempt = 1; attempt <= maxRetries; attempt++) {
            try {
                return await this.repo.save(data);
            } catch (error) {
                lastError = error;
                if (!this.isRetryableError(error)) {
                    throw error;
                }
                await this.delay(attempt * 1000); // Exponential backoff
            }
        }

        throw new MaxRetriesExceededError(lastError);
    }
}
Enter fullscreen mode Exit fullscreen mode

MongoDB Schemas:

@Schema({ timestamps: true })
export class UserProfile {
    @Prop({ required: true })
    userId: string;

    @Prop({ type: Object })
    preferences: {
        theme: string;
        notifications: NotificationSettings;
        customFields?: Record<string, any>;
    };

    @Prop({ type: mongoose.Schema.Types.Mixed })
    metadata: Record<string, any>;

    @Prop({ default: 1 })
    schemaVersion: number;
}

// Service Implementation
@Injectable()
export class UserProfileService {
    constructor(
        @InjectModel(UserProfile.name)
        private readonly profileModel: Model<UserProfile>,
        private readonly migrationService: MigrationService
    ) {}

    async findAndUpdate(
        userId: string,
        update: Partial<UserProfile>
    ): Promise<UserProfile> {
        const session = await this.profileModel.startSession();
        session.startTransaction();

        try {
            const profile = await this.profileModel
                .findOneAndUpdate(
                    { userId },
                    { $set: update },
                    { new: true, session }
                )
                .exec();

            if (!profile) {
                throw new ProfileNotFoundError(userId);
            }

            // Check if migration is needed
            if (profile.schemaVersion < CURRENT_SCHEMA_VERSION) {
                await this.migrationService.migrateProfile(
                    profile,
                    session
                );
            }

            await session.commitTransaction();
            return profile;
        } catch (error) {
            await session.abortTransaction();
            throw error;
        } finally {
            session.endSession();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Redis Cache Implementation:

@Injectable()
export class CacheService {
    constructor(
        private readonly redis: Redis,
        private readonly logger: Logger,
        private readonly config: CacheConfig
    ) {}

    async setCached<T>(
        key: string,
        data: T,
        options: CacheOptions = {}
    ): Promise<void> {
        const {
            ttl = this.config.defaultTTL,
            tags = []
        } = options;

        const multi = this.redis.multi();

        // Store the main data
        multi.set(
            this.formatKey(key),
            JSON.stringify(data),
            'EX',
            ttl
        );

        // Store tag associations for cache invalidation
        for (const tag of tags) {
            multi.sadd(
                this.formatTagKey(tag),
                key
            );
        }

        try {
            await multi.exec();
        } catch (error) {
            this.logger.error(
                `Cache set failed for key ${key}:`,
                error
            );
            throw new CacheOperationError(error);
        }
    }

    async invalidateByTag(tag: string): Promise<void> {
        const keys = await this.redis.smembers(
            this.formatTagKey(tag)
        );

        if (keys.length > 0) {
            const multi = this.redis.multi();

            // Delete all keys associated with the tag
            multi.del(...keys.map(this.formatKey));
            // Delete the tag set itself
            multi.del(this.formatTagKey(tag));

            await multi.exec();
        }
    }

    private formatKey(key: string): string {
        return `${this.config.keyPrefix}:${key}`;
    }

    private formatTagKey(tag: string): string {
        return `${this.config.keyPrefix}:tag:${tag}`;
    }
}

Enter fullscreen mode Exit fullscreen mode

Data Migration Strategy

@Injectable()
export class MigrationService {
    constructor(
        private readonly mongoClient: MongoClient,
        private readonly postgres: Connection,
        private readonly logger: Logger
    ) {}

    async executeMigration(
        migration: Migration
    ): Promise<MigrationResult> {
        const session = await this.mongoClient.startSession();
        session.startTransaction();

        try {
            // Execute migration steps
            const result = await migration.up(session);

            // Update migration metadata
            await this.updateMigrationMetadata(
                migration.version,
                session
            );

            await session.commitTransaction();
            return result;
        } catch (error) {
            await session.abortTransaction();
            this.logger.error(
                `Migration ${migration.version} failed:`,
                error
            );
            throw error;
        } finally {
            session.endSession();
        }
    }

    private async updateMigrationMetadata(
        version: string,
        session: ClientSession
    ): Promise<void> {
        await this.mongoClient
            .db()
            .collection('migrations')
            .updateOne(
                { version },
                {
                    $set: {
                        executedAt: new Date(),
                        status: 'completed'
                    }
                },
                { session }
            );
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Data Consistency Management: Ensuring Reliability Across Services

Understanding Consistency Requirements

In a distributed system like Cipher Horizon, maintaining data consistency across services is crucial. We implemented different consistency patterns based on business requirements:

  1. Strong Consistency
    • Financial transactions
    • User authentication
    • Critical system configurations
  2. Eventual Consistency
    • Analytics data
    • User preferences
    • Activity logs

Implementation of Consistency Patterns

Distributed Transaction Management

@Injectable()
class TransactionOrchestrator {
    constructor(
        private readonly eventBus: EventBus,
        private readonly sagaManager: SagaManager,
        private readonly logger: Logger
    ) {}

    @Transactional()
    async executeFinancialTransaction(
        command: TransactionCommand
    ): Promise<TransactionResult> {
        const saga = await this.sagaManager.create('FINANCIAL_TRANSACTION', {
            correlationId: command.id,
            metadata: {
                userId: command.userId,
                amount: command.amount,
                timestamp: new Date()
            }
        });

        try {
            // Step 1: Validate account balance
            await saga.executeStep('VALIDATE_BALANCE', async () => {
                const isValid = await this.accountService.validateBalance(
                    command.userId,
                    command.amount
                );
                if (!isValid) {
                    throw new InsufficientBalanceError();
                }
            });

            // Step 2: Reserve funds
            await saga.executeStep('RESERVE_FUNDS', async () => {
                await this.accountService.reserveFunds(
                    command.userId,
                    command.amount
                );
            });

            // Step 3: Process transaction
            await saga.executeStep('PROCESS_TRANSACTION', async () => {
                const transaction = await this.transactionService.create(command);
                return transaction.id;
            });

            // Step 4: Update account balance
            await saga.executeStep('UPDATE_BALANCE', async () => {
                await this.accountService.updateBalance(
                    command.userId,
                    command.amount
                );
            });

            await saga.complete();
            return { success: true, transactionId: saga.getStepResult('PROCESS_TRANSACTION') };

        } catch (error) {
            await saga.compensate();
            throw error;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Event-Driven Consistency

@Injectable()
class EventManager {
    constructor(
        private readonly eventStore: EventStore,
        private readonly eventBus: EventBus,
        private readonly outbox: EventOutbox
    ) {}

    async publishEvent<T extends Event>(event: T): Promise<void> {
        const session = await this.eventStore.startSession();

        try {
            // Store event
            await this.eventStore.store(event, session);

            // Add to outbox for reliable delivery
            await this.outbox.add(event, session);

            await session.commit();

            // Attempt immediate publish
            await this.eventBus.publish(event);
        } catch (error) {
            await session.rollback();
            throw new EventPublishError(error);
        }
    }
}

@Injectable()
class EventOutboxProcessor {
    @Cron('*/1 * * * *') // Run every minute
    async processOutbox(): Promise<void> {
        const unprocessedEvents = await this.outbox.getUnprocessed();

        for (const event of unprocessedEvents) {
            try {
                await this.eventBus.publish(event);
                await this.outbox.markAsProcessed(event.id);
            } catch (error) {
                if (event.retryCount < this.config.maxRetries) {
                    await this.outbox.incrementRetry(event.id);
                } else {
                    await this.outbox.markAsFailed(event.id);
                    await this.deadLetterQueue.add(event);
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Optimistic Concurrency Control

@Injectable()
class DocumentManager {
    async updateDocument(
        id: string,
        updates: Partial<Document>,
        version: number
    ): Promise<Document> {
        const result = await this.documentRepo.findOneAndUpdate(
            {
                _id: id,
                version: version
            },
            {
                $set: updates,
                $inc: { version: 1 }
            },
            { new: true }
        );

        if (!result) {
            throw new ConcurrencyError(
                'Document was modified by another process'
            );
        }

        return result;
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Scalability and Performance Optimization

Horizontal Scaling Strategy

@Injectable()
class ServiceScaler {
    constructor(
        private readonly k8sClient: KubernetesClient,
        private readonly metrics: MetricsService,
        private readonly config: ScalingConfig
    ) {}

    @Interval(30000) // Check every 30 seconds
    async evaluateScaling(): Promise<void> {
        const metrics = await this.metrics.getCurrentMetrics();

        const decision = this.calculateScalingDecision(metrics);

        if (decision.shouldScale) {
            await this.scale(decision);
        }
    }

    private calculateScalingDecision(
        metrics: ServiceMetrics
    ): ScalingDecision {
        return {
            shouldScale: metrics.cpu > this.config.cpuThreshold ||
                        metrics.memory > this.config.memoryThreshold,
            direction: 'up',
            replicas: this.calculateOptimalReplicas(metrics)
        };
    }

    private async scale(decision: ScalingDecision): Promise<void> {
        await this.k8sClient.scaleDeployment({
            namespace: this.config.namespace,
            deployment: this.config.deploymentName,
            replicas: decision.replicas
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

Caching Strategy

@Injectable()
class CacheManager {
    constructor(
        private readonly redis: Redis,
        private readonly config: CacheConfig
    ) {}

    async getOrSet<T>(
        key: string,
        factory: () => Promise<T>,
        options: CacheOptions
    ): Promise<T> {
        const cached = await this.redis.get(key);

        if (cached) {
            return JSON.parse(cached);
        }

        const value = await factory();

        await this.redis.set(
            key,
            JSON.stringify(value),
            'EX',
            options.ttl || this.config.defaultTTL
        );

        return value;
    }

    async invalidatePattern(pattern: string): Promise<void> {
        const keys = await this.redis.keys(pattern);

        if (keys.length > 0) {
            await this.redis.del(...keys);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Performance Monitoring and Optimization

@Injectable()
class PerformanceMonitor {
    constructor(
        private readonly metrics: PrometheusClient,
        private readonly logger: Logger
    ) {}

    private readonly requestDuration = new Histogram({
        name: 'http_request_duration_seconds',
        help: 'HTTP request duration in seconds',
        labelNames: ['method', 'route', 'status_code']
    });

    @Middleware()
    async trackRequestMetrics(
        req: Request,
        res: Response,
        next: NextFunction
    ): Promise<void> {
        const start = process.hrtime();

        res.on('finish', () => {
            const [seconds, nanoseconds] = process.hrtime(start);
            const duration = seconds + nanoseconds / 1e9;

            this.requestDuration
                .labels(req.method, req.route?.path, res.statusCode.toString())
                .observe(duration);

            if (duration > this.config.slowRequestThreshold) {
                this.logger.warn(
                    `Slow request detected: ${req.method} ${req.path} (${duration}s)`
                );
            }
        });

        await next();
    }
}
Enter fullscreen mode Exit fullscreen mode

Query Optimization

@Injectable()
class QueryOptimizer {
    async optimizeQuery<T>(
        queryBuilder: SelectQueryBuilder<T>
    ): Promise<void> {
        const explanation = await queryBuilder
            .explain()
            .getRawMany();

        // Analyze query plan
        const analysis = this.analyzeQueryPlan(explanation);

        // Apply optimizations based on analysis
        if (analysis.needsIndex) {
            await this.createOptimalIndex(
                queryBuilder.entity,
                analysis.suggestedIndices
            );
        }

        if (analysis.shouldUseView) {
            await this.createMaterializedView(
                queryBuilder,
                analysis.viewDefinition
            );
        }
    }

    private analyzeQueryPlan(
        explanation: QueryExplanation[]
    ): QueryAnalysis {
        // Implement query plan analysis logic
        return {
            needsIndex: this.detectMissingIndices(explanation),
            suggestedIndices: this.suggestIndices(explanation),
            shouldUseView: this.shouldCreateView(explanation),
            viewDefinition: this.generateViewDefinition(explanation)
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

Lessons Learned

  • API Design:
    • Start with clear contracts and documentation
    • Plan for versioning from day one
    • Consider both developer experience and performance
  • Data Persistence:
    • Choose databases based on data access patterns
    • Plan for eventual consistency
    • Implement robust error handling and recovery
  • Performance:
    • Cache strategically
    • Monitor and optimize database queries
    • Implement proper indexing strategies

This post has covered the essential aspects of building a robust API and data layer for our microservice architecture. The examples provided are practical implementations that you can adapt for your own projects. Remember, the key to success lies in making informed decisions based on your specific requirements while keeping scalability and maintainability in mind.

What challenges have you faced in designing APIs or managing data persistence in your microservice architectures? Share your experiences in the comments below!

Top comments (0)