×
Namespaces

Variants
Actions

Exposing the Geolocator as a Reactive Service

From Nokia Developer Wiki
Jump to: navigation, search

This article explains how to expose the Geolocator as a reactive service.

WP Metro Icon WP8.png
Article Metadata
Tested with
SDK: Windows Phone 8.0 SDK
Devices(s): Nokia Lumia 820
Compatibility
Platform(s):
Windows Phone 8
Dependencies: The Reactive Extensions (Rx)
Platform Security
Capabilities: ID_CAP_LOCATION
Article
Created: paulo.morgado (21 Feb 2014)
Last edited: paulo.morgado (27 Feb 2014)

Contents

Introduction

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.” – from the MSDN page.

The library also provides a considerable number of helper methods that make it easy to wrap events into observables - events become first class citizens that can be passed around and composed as needed in a very simple way.

This is specially useful for services like the Geolocator that can continuously produce position and status events.

Consuming events as observables

The Reactive Extensions (Rx) provide methods for easily converting .NET events into observable streams through the FromEventPattern method overloads of the Observable class. Converting an event into an observable stream can be as simple as this:

Observable
.FromEventPattern<PositionChangedEventArgs>(
handler => geolocator.PositionChanged += handler,
handler => geolocator.PositionChanged -= handler)

However, being a Windows Phone Runtime API (a Windows Phone 8 version of the Windows Runtime present in Windows Store Apps), the Geolocator class does not expose events conforming to the .NET standards for events where the event handling methods receive an parameter of type object corresponding to the sender of the event and a parameter of a type derived from EventArgs that supplies any properties needed to hold the event data.

Events of the Geolocator class, although looking like normal .NET events, the event handler methods receive the source of the event as a typed Geolocator instance and as the event arguments an instance of a class that doesn't derive from the EventArgs class.

Fortunately, one the FromEvent method overloads of the Observable class allow us to subscribe to any event pattern:

Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => geolocator.PositionChanged += handler,
removeHandler: handler => geolocator.PositionChanged -= handler)

And, thanks to the LINQ composition capabilities of Rx, we don't need to subscribe to the whole event arguments in a ver event-like pattern and select only the interesting parts:

from e in Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => geolocator.PositionChanged += handler,
removeHandler: handler => geolocator.PositionChanged -= handler)
select e.Position

Defining the Geolocator reactive service interface

To help the future use of this service in dependency injection and testing scenarios, a interface for the service will created to mirror the Geolocator's API replacing events with observable streams:

public interface IGeolocatorReactiveService
{
PositionAccuracy DesiredAccuracy { get; set; }
uint? DesiredAccuracyInMeters { get; set; }
double MovementThreshold { get; set; }
PositionStatus LocationStatus { get; }
uint ReportInterval { get; set; }
IObservable<Geoposition> PositionObservable { get; }
IObservable<PositionStatus> StatusObservable { get; }
IAsyncOperation<Geoposition> GetGeopositionAsync();
IAsyncOperation<Geoposition> GetGeopositionAsync(TimeSpan maximumAge, TimeSpan timeout);
}

Implementing the Geolocator reactive service

The implementation of the service consists just in wrapping a Geolocator instance and conform to the previously defined interface:

class GeolocatorReactiveService : IGeolocatorReactiveService
{
private readonly Geolocator geolocator = new Geolocator();
 
public GeolocatorReactiveService()
{
this.StatusObservable = Observable
.FromEvent<TypedEventHandler<Geolocator, StatusChangedEventArgs>, StatusChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => this.geolocator.StatusChanged += handler,
removeHandler: handler => this.geolocator.StatusChanged -= handler)
.Select(e => e.Status);
 
this.PositionObservable = Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => this.geolocator.PositionChanged += handler,
removeHandler: handler => this.geolocator.PositionChanged -= handler)
.Select(e => e.Position);
}
 
public PositionAccuracy DesiredAccuracy
{
get { return this.geolocator.DesiredAccuracy; }
set { this.geolocator.DesiredAccuracy = value; }
}
 
public uint? DesiredAccuracyInMeters
{
get { return this.geolocator.DesiredAccuracyInMeters; }
set { this.geolocator.DesiredAccuracyInMeters = value; }
}
 
public double MovementThreshold
{
get { return this.geolocator.MovementThreshold; }
set { this.geolocator.MovementThreshold = value; }
}
 
public uint ReportInterval
{
get { return this.geolocator.ReportInterval; }
set { this.geolocator.ReportInterval = value; }
}
 
public PositionStatus LocationStatus
{
get { return this.geolocator.LocationStatus; }
}
 
public IObservable<PositionStatus> StatusObservable { get; private set; }
 
public IObservable<Geoposition> PositionObservable { get; private set; }
 
public IAsyncOperation<Geoposition> GetGeopositionAsync()
{
return this.geolocator.GetGeopositionAsync();
}
 
public IAsyncOperation<Geoposition> GetGeopositionAsync(TimeSpan maximumAge, TimeSpan timeout)
{
return this.geolocator.GetGeopositionAsync(maximumAge, timeout);
}
}

Limitations of the implementation

Although the above implementation is good enough for simple uses, it suffers from the limitations of the underlying Geolocator.

Changes to the the notification parameters (the MovementThreshold, DesiredAccuracy, DesiredAccuracyInMeters and ReportInterval properties) are not allowed while getting location. This includes subscribing to the PositionChanged and StatusChanged events as well as calling the GetPositionAsync methods.

In order to change the notification parameters unsubscribing to events is required. And would mean disposing of the observables if they had already been subscribed to and then, as it is, they couldn't be recreated.

Improving the implementation

One way to improve the implementation of this reactive service would be to replace the observable returning properties by observable returning methods that would return a new observable and another method to release it.

private IObservable<PositionStatus> positionObservable;
 
public IObservable<PositionStatus> GetPositionObservable()
{
if (this.positionObservable == null)
{
this.positionObservable = from e in Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => geolocator.PositionChanged += handler,
removeHandler: handler => geolocator.PositionChanged -= handler)
select e.Position;
}
 
return this.positionObservable;
}
 
public void ReleasePositionObservable()
{
if (this.positionObservable == null)
{
this.positionObservable.Dispose();
this.positionObservable = null;
}
}

That could work, but that would also hinder the composability given by LINQ operators.

Instead, we are going to introduce another concept of the reactive extensions: subjects. Subjects are objects that can both observe and be observed. A subject can be subscribed by all the observers, and then subscribe to a backend data source. In this way, the subject can act as a proxy for a group of subscribers and a source. Subjects can be used to implement a custom observables with caching, buffering and time shifting. In addition, subjects can be used to broadcast data to multiple subscribers.

In our implementation we will be creating subjects for the users of the service to subscribe to and and observe:

class GeolocatorReactiveService : IGeolocatorReactiveService
{
private readonly Geolocator geolocator = new Geolocator();
private readonly Subject<Geoposition> positionSubject = new Subject<Geoposition>();
private readonly Subject<PositionStatus> statusSubject = new Subject<PositionStatus>();
 
public GeolocatorReactiveService()
{
this.StatusObservable = this.statusSubject.AsObservable();
this.PositionObservable = this.positionSubject.AsObservable();
}
 
public IObservable<Geoposition> PositionObservable { get; private set; }
 
public IObservable<PositionStatus> StatusObservable { get; private set; }
 
// ...
}

If you noticed, we are not exposing the the subjects directly, but exposing them as observables. This is done to protect the subjects from inadvertent uses by the users of the service.

Now, when we want to activate, say, subscription position changes, we just create an observable over the corresponding event and have the corresponding subject subscribe to it:

    private IDisposable geolocatorPositionObservable;
 
// ...
 
this.geolocatorPositionObservable = Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => this.geolocator.PositionChanged += handler,
removeHandler: handler => this.geolocator.PositionChanged -= handler)
.Select(e => e.Position)
.Subscribe(
onNext: this.positionSubject.OnNext,
onError: this.positionSubject.OnError);

And the users of the service don't need to change their subscription of the exposed observables.

To deactivate the subscription of events, all that's needed is to dispose of the observables (but not the subjects):

    this.geolocatorPositionObservable.Dispose();

In order to activate and deactivate the subscriptions, Start and Stop methods will be added to the service as well as boolean flag (IsStarted) to indicate whether the subscriptions are active or not:

public interface IGeolocatorReactiveService
{
PositionAccuracy DesiredAccuracy { get; set; }
uint? DesiredAccuracyInMeters { get; set; }
double MovementThreshold { get; set; }
PositionStatus LocationStatus { get; }
uint ReportInterval { get; set; }
bool IsStarted { get; }
IObservable<Geoposition> PositionObservable { get; }
IObservable<PositionStatus> StatusObservable { get; }
IAsyncOperation<Geoposition> GetGeopositionAsync();
IAsyncOperation<Geoposition> GetGeopositionAsync(TimeSpan maximumAge, TimeSpan timeout);
void Start();
void Stop();
}

And the full implementation will become:

public class GeolocatorReactiveService : IGeolocatorReactiveService, IDisposable
{
private readonly Subject<Geoposition> positionSubject = new Subject<Geoposition>();
private readonly Subject<PositionStatus> statusSubject = new Subject<PositionStatus>();
private IDisposable geolocatorPositionObservable;
private IDisposable geolocatorStatusObservable;
private readonly Geolocator geolocator = new Geolocator();
 
public GeolocatorReactiveService()
{
this.StatusObservable = this.statusSubject.AsObservable();
this.PositionObservable = this.positionSubject.AsObservable();
}
 
~GeolocatorReactiveService()
{
this.Dispose(false);
}
 
public PositionAccuracy DesiredAccuracy
{
get
{
return this.geolocator.DesiredAccuracy;
}
set
{
this.GuardIsNotStarted();
this.geolocator.DesiredAccuracy = value;
}
}
 
public uint? DesiredAccuracyInMeters
{
get
{
return this.geolocator.DesiredAccuracyInMeters;
}
set
{
this.GuardIsNotStarted();
this.geolocator.DesiredAccuracyInMeters = value;
}
}
 
public double MovementThreshold
{
get
{
return this.geolocator.MovementThreshold;
}
set
{
this.GuardIsNotStarted();
this.geolocator.MovementThreshold = value;
}
}
 
public uint ReportInterval
{
get
{
return this.geolocator.ReportInterval;
}
set
{
this.GuardIsNotStarted();
this.geolocator.ReportInterval = value;
}
}
 
public PositionStatus LocationStatus
{
get { return this.geolocator.LocationStatus; }
}
 
public bool IsStarted { get; private set; }
 
public IObservable<PositionStatus> StatusObservable { get; private set; }
 
public IObservable<Geoposition> PositionObservable { get; private set; }
 
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
 
private void Dispose(bool disposing)
{
if (disposing)
{
if (this.geolocatorPositionObservable != null)
{
this.geolocatorPositionObservable.Dispose();
this.geolocatorPositionObservable = null;
}
 
if (this.geolocatorStatusObservable != null)
{
this.geolocatorStatusObservable.Dispose();
this.geolocatorStatusObservable = null;
}
}
}
 
public IAsyncOperation<Geoposition> GetGeopositionAsync()
{
return this.geolocator.GetGeopositionAsync();
}
 
public IAsyncOperation<Geoposition> GetGeopositionAsync(TimeSpan maximumAge, TimeSpan timeout)
{
return this.geolocator.GetGeopositionAsync(maximumAge, timeout);
}
 
public void Start()
{
lock (this)
{
if (!this.IsStarted)
{
this.IsStarted = true;
 
this.geolocatorPositionObservable = Observable
.FromEvent<TypedEventHandler<Geolocator, PositionChangedEventArgs>, PositionChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => this.geolocator.PositionChanged += handler,
removeHandler: handler => this.geolocator.PositionChanged -= handler)
.Select(e => e.Position)
.Subscribe(
onNext: this.positionSubject.OnNext,
onError: this.positionSubject.OnError);
 
this.geolocatorStatusObservable = Observable
.FromEvent<TypedEventHandler<Geolocator, StatusChangedEventArgs>, StatusChangedEventArgs>(
conversion: handler => (s, e) => handler(e),
addHandler: handler => this.geolocator.StatusChanged += handler,
removeHandler: handler => this.geolocator.StatusChanged -= handler)
.Select(e => e.Status)
.Subscribe(
onNext: this.statusSubject.OnNext,
onError: this.statusSubject.OnError);
}
}
}
 
public void Stop()
{
lock (this)
{
if (this.IsStarted)
{
this.geolocatorPositionObservable.Dispose();
this.geolocatorPositionObservable = null;
 
this.geolocatorStatusObservable.Dispose();
this.geolocatorStatusObservable = null;
 
this.statusSubject.OnNext(this.geolocator.LocationStatus);
}
}
}
 
private void GuardIsNotStarted([CallerMemberName]string propertyName = "")
{
lock (this)
{
if (this.IsStarted)
{
throw new InvalidOperationException(
string.Format(
"The {0} property cannot be changed after Start has been called or during a call to GetGeopositionAsync. Call Stop and/or wait for GetGeopositionAsync to complete before changing the value of {0}.",
propertyName));
}
}
}
}

Demo application

Source code

Resources

This page was last modified on 27 February 2014, at 05:38.
109 page views in the last 30 days.
×