Skip to main content

Observable Package

pocket/pkg/observable Package

The pocket/pkg/observable package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: Observable and Observer.

Overview

The Observable interface is responsible for notifying multiple subscribers about new values asynchronously, while the Observer interface allows access to the notified channel and facilitates unsubscribing from an Observable.

Interfaces and Structures

Observable Interface

Represents a publisher in a "Fan-Out" system design, allowing multiple subscribers to be notified of new values asynchronously.

  • Methods:

    • Subscribe: Used to subscribe an observer to the observable. Returns an instance of the Observer interface.

      func (o *MyObservableType) Subscribe(ctx context.Context) Observer[MyValueType]
    • UnsubscribeAll: Unsubscribes all observers from the observable.

      func (o *MyObservableType) UnsubscribeAll()

Observer Interface

Represents a subscriber in a "Fan-Out" system design, providing access to the notified channel and capabilities to unsubscribe.

  • Methods:

    • Unsubscribe: Used to unsubscribe the observer from its associated observable.

      func (obs *MyObserverType) Unsubscribe()
    • Ch: Returns the channel through which the observer receives notifications.

      func (obs *MyObserverType) Ch() <-chan MyValueType

Architecture Diagrams

Visual representations often make it easier to understand the design and flow of a package. Below are the architecture diagrams that explain the high-level structure and interactions in this package:

Observable Synchronization

Figure 1: This diagram depicts the synchronization mechanisms between the observable and its observers. It specifically showcases the use of read and write locks for different operations in both observable and observer contexts.

Observable Buffering

Figure 2: The diagram illustrates the buffering mechanisms within the observable and its observers. It highlights how published messages are buffered and how they propagate to the individual observers' buffers.

Usage

Basic Example

package main

import (
"context"
"fmt"
"time"

"poktroll/pkg/observable/channel"
)

func main() {
// Create a new context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
// Ensure to cancel the context to release resources
defer cancel()

// Create a new Observable and its corresponding publisher
obsvbl, publisher := channel.NewObservable[int]()
// Subscribe the first Observer to the Observable
observer1 := obsvbl.Subscribe(ctx)

// Start observing with observer1 in a goroutine
go func() {
for v := range observer1.Ch() {
fmt.Println("Observer1 received:", v)
}
}()

// Publish the first value to the Observable
publisher <- 10
time.Sleep(time.Millisecond)

// Now, subscribe the second Observer to the Observable
observer2 := obsvbl.Subscribe(ctx)

// Start observing with observer2 in a goroutine
go func() {
for v := range observer2.Ch() {
fmt.Println("Observer2 received:", v)
}
}()

// Publish the second value
publisher <- 20
time.Sleep(time.Millisecond)

// Unsubscribe observer1 before the last value is sent
observer1.Unsubscribe()
fmt.Println("Observer1 unsubscribed!")

// Publish the third value
publisher <- 30
time.Sleep(time.Millisecond)
}

// Expected Output:
// Observer1 received: 10
// Observer2 received: 20
// Observer1 received: 20
// Observer1 unsubscribed!
// Observer2 received: 30

Considerations

While the pkg/observable package is designed to be simple and minimal, developers with more complex requirements may need to consider extending its functionality or exploring other libraries like RxGo.

Conclusion

The pkg/observable package is an intuitive solution for handling asynchronous notifications in Go projects, ensuring efficient communication between observables and observers.