Implement RxJS observables, apply operators, fix memory leaks with unsubscribe patterns, handle errors, create subjects, and build reactive data pipelines in Angular applications.
View on GitHubpluginagentmarketplace/custom-plugin-angular
angular-development-assistant
January 16, 2026
Select agents to install to:
npx add-skill https://github.com/pluginagentmarketplace/custom-plugin-angular/blob/main/skills/rxjs/SKILL.md -a claude-code --skill rxjs-implementationInstallation paths:
.claude/skills/rxjs-implementation/# RxJS Implementation Skill
## Quick Start
### Observable Basics
```typescript
import { Observable } from 'rxjs';
// Create observable
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
// Subscribe
const subscription = observable.subscribe({
next: (value) => console.log(value),
error: (error) => console.error(error),
complete: () => console.log('Done')
});
// Unsubscribe
subscription.unsubscribe();
```
### Common Operators
```typescript
import { map, filter, switchMap, takeUntil } from 'rxjs/operators';
// Transformation
data$.pipe(
map(user => user.name),
filter(name => name.length > 0)
).subscribe(name => console.log(name));
// Higher-order
userId$.pipe(
switchMap(id => this.userService.getUser(id))
).subscribe(user => console.log(user));
```
## Subjects
### Subject Types
```typescript
import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';
// Subject - No initial value
const subject = new Subject<string>();
subject.next('hello');
// BehaviorSubject - Has initial value
const behavior = new BehaviorSubject<string>('initial');
behavior.next('new value');
// ReplaySubject - Replays N values
const replay = new ReplaySubject<string>(3);
replay.next('one');
replay.next('two');
```
### Service with Subject
```typescript
@Injectable()
export class NotificationService {
private messageSubject = new Subject<string>();
public message$ = this.messageSubject.asObservable();
notify(message: string) {
this.messageSubject.next(message);
}
}
// Usage
constructor(private notification: NotificationService) {
this.notification.message$.subscribe(msg => {
console.log('Notification:', msg);
});
}
```
## Transformation Operators
```typescript
// map - Transform values
source$.pipe(
map(user => user.name)
)
// switchMap - Switch to new observable (cancel previous)
userId$.pipe(
switchMap(id => this.userService.getUser(id))
)
// mergeMap - Issues Found: