Observables & Observers ๐ก
Observables are the heart of RxJS! They represent data streams that emit values over time. Master Observables and youโll master reactive programming in Angular.
๐ฏ What is an Observable?
Section titled โ๐ฏ What is an Observable?โAn Observable is like a stream of data that flows over time. Think of it as a pipe where values can flow through - you can listen to that pipe and react when values arrive.
Simple analogy:
- Promise = A box that will contain one value in the future
- Observable = A stream that can deliver multiple values over time
Real-world examples:
- User typing in a search box โ Stream of search terms
- HTTP request โ Stream with one response value
- WebSocket connection โ Stream of messages
- Mouse movements โ Stream of position coordinates
๐ฏ Why Observables Matter
Section titled โ๐ฏ Why Observables MatterโObservable Characteristics:
- Lazy - Doesnโt start until someone subscribes (saves resources!)
- Push-based - Data is pushed to you when ready (no polling needed)
- Cancellable - Unsubscribe anytime to stop receiving values
- Multiple values - Can emit 0, 1, or infinite values
- Composable - Chain operators to transform data easily
Compared to Promises:
| Feature | Promise | Observable |
|---|---|---|
| Values | Single | Multiple |
| Cancellable | โ No | โ Yes |
| Lazy | โ No (eager) | โ Yes |
| Operators | Limited | 100+ operators |
| Use in Angular | Sometimes | Everywhere! |
๐ Creating Observables
Section titled โ๐ Creating Observablesโ1. Using Observable Constructor
Section titled โ1. Using Observable Constructorโimport { Observable } from 'rxjs';
// Create a custom Observableconst customObservable$ = new Observable<string>(subscriber => { // Emit values subscriber.next('First value'); subscriber.next('Second value');
// Simulate async operation setTimeout(() => { subscriber.next('Async value'); subscriber.complete(); }, 1000);
// Cleanup function (optional) return () => { console.log('Cleanup: Observable unsubscribed'); };});
// SubscribecustomObservable$.subscribe({ next: (value) => console.log('Received:', value), complete: () => console.log('Stream completed'), error: (err) => console.error('Error:', err)});2. Creation Operators
Section titled โ2. Creation Operatorsโimport { of, from, interval, timer, range, generate } from 'rxjs';
// of() - Emit values in sequence, then completeof(1, 2, 3, 4, 5).subscribe(console.log);
// from() - Convert array, promise, or iterablefrom([10, 20, 30]).subscribe(console.log);from(Promise.resolve('Hello')).subscribe(console.log);
// interval() - Emit incrementing numbers at intervalsinterval(1000).subscribe(val => console.log('Tick:', val));
// timer() - Emit after delay, optionally repeattimer(2000, 1000).subscribe(val => console.log('Timer:', val));
// range() - Emit sequence of numbersrange(1, 5).subscribe(val => console.log('Range:', val));
// generate() - Generate values with conditiongenerate( 0, // Initial value x => x < 5, // Condition x => x + 1, // Iterate x => x * 2 // Result selector).subscribe(console.log); // 0, 2, 4, 6, 8๐จ Observer Pattern
Section titled โ๐จ Observer PatternโComplete Observer Object
Section titled โComplete Observer Objectโimport { Observable } from 'rxjs';
const observable$ = new Observable<number>(subscriber => { subscriber.next(1); subscriber.next(2); subscriber.error(new Error('Something went wrong!')); subscriber.next(3); // Won't be emitted after error});
// Full observer objectobservable$.subscribe({ next: (value) => { console.log('Next:', value); }, error: (err) => { console.error('Error:', err.message); }, complete: () => { console.log('Complete!'); }});
// Shorthand (only next callback)observable$.subscribe(value => console.log(value));๐จ Real-World Examples
Section titled โ๐จ Real-World Examplesโ1. User Authentication Service
Section titled โ1. User Authentication ServiceโLetโs create an authentication service using Observables.
import { Injectable, signal } from '@angular/core';import { HttpClient } from '@angular/common/http';import { Observable, BehaviorSubject, throwError } from 'rxjs';import { map, catchError, tap } from 'rxjs/operators';
interface User { id: string; email: string; name: string;}
interface LoginCredentials { email: string; password: string;}
@Injectable({ providedIn: 'root' })export class AuthService { private http = inject(HttpClient); private currentUserSubject = new BehaviorSubject<User | null>(null);
// Expose as Observable currentUser$ = this.currentUserSubject.asObservable(); isAuthenticated = signal(false);
login(credentials: LoginCredentials): Observable<User> { return this.http.post<User>('/api/auth/login', credentials) .pipe( tap(user => { this.currentUserSubject.next(user); this.isAuthenticated.set(true); localStorage.setItem('user', JSON.stringify(user)); }), catchError(error => { console.error('Login failed:', error); return throwError(() => new Error('Login failed')); }) ); }
logout(): Observable<void> { return this.http.post<void>('/api/auth/logout', {}) .pipe( tap(() => { this.currentUserSubject.next(null); this.isAuthenticated.set(false); localStorage.removeItem('user'); }) ); }
checkAuth(): void { const userJson = localStorage.getItem('user'); if (userJson) { const user = JSON.parse(userJson); this.currentUserSubject.next(user); this.isAuthenticated.set(true); } }}2. Real-Time Data Stream
Section titled โ2. Real-Time Data StreamโLetโs create a real-time data service.
import { Injectable } from '@angular/core';import { Observable, interval } from 'rxjs';import { map, share } from 'rxjs/operators';
interface StockPrice { symbol: string; price: number; change: number; timestamp: Date;}
@Injectable({ providedIn: 'root' })export class StockService { // Simulate real-time stock prices getStockPrice(symbol: string): Observable<StockPrice> { return interval(2000).pipe( map(() => ({ symbol, price: 100 + Math.random() * 50, change: (Math.random() - 0.5) * 5, timestamp: new Date() })), share() // Share subscription among multiple subscribers ); }}
// Component usage@Component({ selector: 'app-stock-ticker', standalone: true, imports: [AsyncPipe, DecimalPipe, DatePipe], template: ` <div class="stock-ticker"> @if (stock$ | async; as stock) { <h3>{{ stock.symbol }}</h3> <p class="price">\${{ stock.price | number:'1.2-2' }}</p> <p [class.positive]="stock.change > 0" [class.negative]="stock.change < 0"> {{ stock.change > 0 ? '+' : '' }}{{ stock.change | number:'1.2-2' }} </p> <small>{{ stock.timestamp | date:'short' }}</small> } </div> `})export class StockTickerComponent { private stockService = inject(StockService); stock$ = this.stockService.getStockPrice('AAPL');}3. Infinite Scroll
Section titled โ3. Infinite ScrollโLetโs implement infinite scroll with Observables.
import { Component, ElementRef, viewChild, signal } from '@angular/core';import { fromEvent, Observable } from 'rxjs';import { map, filter, debounceTime, distinctUntilChanged } from 'rxjs/operators';import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
interface Post { id: number; title: string; content: string;}
@Component({ selector: 'app-infinite-scroll', standalone: true, template: ` <div class="posts" #scrollContainer> @for (post of posts(); track post.id) { <div class="post-card"> <h3>{{ post.title }}</h3> <p>{{ post.content }}</p> </div> }
@if (loading()) { <div class="loading">Loading more...</div> } </div> `})export class InfiniteScrollComponent { private http = inject(HttpClient); scrollContainer = viewChild<ElementRef>('scrollContainer');
posts = signal<Post[]>([]); loading = signal(false); page = signal(1);
constructor() { this.setupInfiniteScroll(); this.loadPosts(); }
setupInfiniteScroll() { // Wait for view to initialize afterNextRender(() => { const container = this.scrollContainer()?.nativeElement;
if (container) { fromEvent(container, 'scroll') .pipe( debounceTime(200), map(() => { const { scrollTop, scrollHeight, clientHeight } = container; return scrollTop + clientHeight >= scrollHeight - 100; }), filter(atBottom => atBottom && !this.loading()), distinctUntilChanged(), takeUntilDestroyed() ) .subscribe(() => { this.page.update(p => p + 1); this.loadPosts(); }); } }); }
loadPosts() { this.loading.set(true);
this.http.get<Post[]>(`/api/posts?page=${this.page()}`) .subscribe({ next: (newPosts) => { this.posts.update(current => [...current, ...newPosts]); this.loading.set(false); }, error: () => this.loading.set(false) }); }}4. Autocomplete Search
Section titled โ4. Autocomplete SearchโLetโs create an autocomplete with Observables.
import { Component, signal } from '@angular/core';import { FormControl, ReactiveFormsModule } from '@angular/forms';import { HttpClient } from '@angular/common/http';import { debounceTime, distinctUntilChanged, switchMap, filter} from 'rxjs/operators';import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
interface SearchResult { id: number; name: string; description: string;}
@Component({ selector: 'app-autocomplete', standalone: true, imports: [ReactiveFormsModule], template: ` <div class="autocomplete"> <input [formControl]="searchControl" placeholder="Search..." >
@if (loading()) { <div class="spinner">Searching...</div> }
<div class="results"> @for (result of results(); track result.id) { <div class="result-item" (click)="selectResult(result)"> <h4>{{ result.name }}</h4> <p>{{ result.description }}</p> </div> } </div> </div> `})export class AutocompleteComponent { private http = inject(HttpClient);
searchControl = new FormControl(''); results = signal<SearchResult[]>([]); loading = signal(false);
constructor() { this.searchControl.valueChanges .pipe( debounceTime(300), // Wait 300ms after typing distinctUntilChanged(), // Only if value changed filter(term => term !== null && term.length >= 2), // Min 2 chars switchMap(term => { this.loading.set(true); return this.http.get<SearchResult[]>(`/api/search?q=${term}`); }), takeUntilDestroyed() ) .subscribe({ next: (results) => { this.results.set(results); this.loading.set(false); }, error: () => this.loading.set(false) }); }
selectResult(result: SearchResult) { console.log('Selected:', result); this.searchControl.setValue(result.name); this.results.set([]); }}โ Best Practices
Section titled โโ Best Practicesโ1. Cold vs Hot Observables
Section titled โ1. Cold vs Hot Observablesโimport { Observable, share } from 'rxjs';
// Cold Observable - Creates new execution for each subscriberconst cold$ = new Observable(subscriber => { console.log('Cold Observable executed'); subscriber.next(Math.random());});
cold$.subscribe(val => console.log('Sub 1:', val));cold$.subscribe(val => console.log('Sub 2:', val));// Logs "Cold Observable executed" twice, different values
// Hot Observable - Shares execution among subscribersconst hot$ = cold$.pipe(share());
hot$.subscribe(val => console.log('Sub 1:', val));hot$.subscribe(val => console.log('Sub 2:', val));// Logs "Cold Observable executed" once, same value2. Proper Cleanup
Section titled โ2. Proper Cleanupโimport { Component, OnDestroy } from '@angular/core';import { Subscription, interval } from 'rxjs';
@Component({ selector: 'app-timer', standalone: true})export class TimerComponent implements OnDestroy { private subscriptions = new Subscription();
ngOnInit() { // Add multiple subscriptions this.subscriptions.add( interval(1000).subscribe(val => console.log('Timer 1:', val)) );
this.subscriptions.add( interval(2000).subscribe(val => console.log('Timer 2:', val)) ); }
ngOnDestroy() { // Unsubscribe all at once this.subscriptions.unsubscribe(); }}3. Error Handling
Section titled โ3. Error Handlingโimport { catchError, retry, of } from 'rxjs';
// Retry failed requeststhis.http.get('/api/data') .pipe( retry(3), // Retry up to 3 times catchError(error => { console.error('Failed after retries:', error); return of([]); // Return fallback value }) ) .subscribe(data => console.log(data));๐ Learning Checklist
Section titled โ๐ Learning Checklistโ- Understand Observable lifecycle
- Create Observables with constructors
- Use creation operators (of, from, interval)
- Implement proper subscription management
- Handle errors in Observables
- Understand cold vs hot Observables
- Use share() for multicasting
- Implement cleanup functions
- Work with async operations
- Build real-world features
๐ Next Steps
Section titled โ๐ Next Stepsโ- Subjects & BehaviorSubject - Multicast Observables
- Transformation Operators - Transform streams
- Filtering Operators - Filter data
Pro Tip: Master the difference between cold and hot Observables! Use share() when you want multiple subscribers to receive the same values. Always clean up subscriptions to prevent memory leaks! ๐ก