DEV Community

Connie Leung
Connie Leung

Posted on

Stream multiple responses in the Angular Resource API

When the Resource API first came out, the resource function returned a promise, and the rxResource function only returned the first value of the Observable. It was particularly obvious in the rxResource function when

rxResource({
  request : this.num,
  loader: ({ request: n }) => timer(0, 1000).pipe(take(n))
})
Enter fullscreen mode Exit fullscreen mode

emits one integer and complete, not n integers.

Fortunately, the Resource API will support streaming where the functions can return multiple responses in Angular 19.2.0.

The options of the rxResource function remain the same, but the resource function has a new stream option to support streaming.

In this blog post, I will demonstrate streaming table rows using the rxResource and resource functions.

Custom makeARow RxJS operator

function makeRows(numRows: number, numElementsPerRow: number) {
 return function (source: Observable<number>) {
   return source.pipe(
     tap((i) => console.log('timer called', i)),
     map((num) => Array(numElementsPerRow).fill(0).map((_, index) => (num * numElementsPerRow + index) + 1)),
     take(numRows),
   )
 }
}
Enter fullscreen mode Exit fullscreen mode

The makeRows custom operator populates an array of numbers and unsubscribes after a specified number of rows are populated.

Stream data with the rxResource function

@let rxResourceTitle = 'Aggregate table rows with rxResource stream';
<ng-container [ngTemplateOutlet]="table" [ngTemplateOutletContext]="{ $implicit: tableDataRxResource,  title: rxResourceTitle }"/>

<ng-template #table let-tableResource let-title="title">
 <p>{{ title }}</p>
 @for (row of tableResource.value(); track $index) {
   <div>
   @for (data of row; track data; let last=$last) {
     @let delimiter = last ? '' : ', ';
     <span>{{ `${data}${delimiter}` }}</span>
   }
   </div>
 }
</ng-template>
Enter fullscreen mode Exit fullscreen mode

In the HTML template, I create a NgTemplate to display the table rows and a static title. In the ng-container element, I assign the template variable, table, to the ngTemplateOutlet input. Moreover, I assign the static title and the resource value to the ngTemplateOutletContext input.

@Component({
 selector: 'app-root',
 imports: [NgTemplateOutlet],
 templateUrl: 'main.component.html',
 changeDetection: ChangeDetectionStrategy.OnPush,
})
export class App {
 rowSub = new Subject<number[]>();
 table$ = this.rowSub.pipe(
   scan((acc, values) => ([...acc, values]), [] as number[][]),
 );

  tableDataRxResource = rxResource({
   loader: () => this.table$,
 });

 constructor() {
   timer(0, 1000)
     .pipe(makeRows(10, 10))
     .subscribe({
       next: (array) => this.rowSub.next(array)
     });
 }
}
Enter fullscreen mode Exit fullscreen mode

The rowSub is a Subject that emits an array of numbers. When rowSub emits a new array, the table$ Observable uses the scan RxJS operator to append it into the nested array.

The tableDataRxResource resource is created by the rxResource function. The resource has a loader that streams the table row as it arrives to the $table Observable.

The RxJS timer generates a new row every second in the constructor and stops after the tenth time. When the Observable subscribes, the result is fed to the rowSub Subject.

Stream data with the resource function

<ng-container [ngTemplateOutlet]="table" [ngTemplateOutletContext]="{ $implicit: tableDataResource, title: 'Aggregate table rows with resource stream' }"/>

<ng-template #table let-tableResource let-title="title">
 <p>{{ title }}</p>
 @for (row of tableResource.value(); track $index) {
   <div>
   @for (data of row; track data; let last=$last) {
     @let delimiter = last ? '' : ', ';
     <span>{{ `${data}${delimiter}` }}</span>
   }
   </div>
 }
</ng-template>
Enter fullscreen mode Exit fullscreen mode

The ng-container uses the same template with a different resource and static tile.

table = signal<{ value: number[][] }>({ value: [] });
Enter fullscreen mode Exit fullscreen mode

The table signal holds an object with a value property. The property stores a nested number array.

tableDataResource = resource({
   stream: async () => this.table,
});
Enter fullscreen mode Exit fullscreen mode

The tableDataResource resource uses the resource function to stream the table rows. The stream option is new and expects an async function that returns a signal.

@Component({
   ...
})
export class App {
 table = signal<{ value: number[][] }>({ value: [] });

 tableDataResource = resource({
   stream: async () => this.table,
 });

 constructor() {
   timer(0, 1500)
     .pipe(makeRows(10, 10))
     .subscribe({
       next: (array) => 
         this.table.update(({ value }) => ({ value: [...value, array] }))
     }); 
 }
}
Enter fullscreen mode Exit fullscreen mode

The RxJS timer subscribes and appends the new number array to the table signal.

The resource function achieves the same result as the rxResource function; the demo displays a row every second.

References:

Top comments (0)