Rx – System Time and Scheduler issues

I recently had an issue with time-dependent methods of Rx, which were adversely affected when a user changed their system clock.  You can read up on the history on the Rx forums.

In the older Rx v1.0.2787 (which my current project is still using), BufferWithTime was affected, along with other methods such as Interval, Timer, and Sample. 

The most problematic for my purposes was BufferWithTime.  For instance, if you shift the system clock back and then push values onto the Buffer, the values are then either lost completely or they arrive late.  If you shift the clock forward a significant amount, Rx overloads the CPU pushing through potentially thousands of empty updates in an effort to catch up with the new clock time.  Worse still, if you shift your clock back several years then you get an UnhandledException along the lines of “Time-out interval must be less than 2^32-2″.

In the latest Rx v1.1.10621 (experimental), it seems neither Buffer nor Window are affected, but Interval, Timer, and Sample are.

In short, the issue is that all problematic methods are dependent upon IScheduler.Now.  In all IScheduler implementations, this property just returns DateTimeOffset.Now, which is of course straight from the system clock.  Change the system clock, and subsequent reads of IScheduler.Now will return times not in line with previous values.

The solution was to implement a custom IScheduler.  I’ve quite literally copied the ThreadPoolScheduler source, and simply modified the Now property implementation.

In my implementation below, you can see that I start a Stopwatch during construction, and use this to determine the actual time in the getter for Now.  From my testing it behaves correctly in all situations where I was previously missing notifications.  The relevant parts are the constructor and the Now getter, all other methods are simply a copy of the ThreadPoolScheduler.

Attached here is a sample project demonstrating both the issue and the fix for the latest Rx release.  Just run it and hit the “-10 secs” button to shift your system clock back.

Thoughts and comments welcome…

public class AccurateTimeScheduler : IScheduler, IDisposable
{
	private static readonly object _gate = new object();
	private static readonly Dictionary<Timer, object> _timers 
		= new Dictionary<Timer, object>();

	private readonly Stopwatch _stopWatch = new Stopwatch();
	private DateTimeOffset _startTime;

	public AccurateTimeScheduler()
	{
		_startTime = DateTimeOffset.Now;
		_stopWatch.Start();
	}

	public DateTimeOffset Now
	{
		get
		{
			var dateTimeOffset = _startTime.AddMilliseconds(
				_stopWatch.ElapsedMilliseconds);

			Trace.WriteLine(
				string.Format("Now: Default={0}, Corrected={1}",
					            DateTimeOffset.Now.ToString("HH:mm:ss.fff"),
					            dateTimeOffset.ToString("HH:mm:ss.fff")));
			return _startTime.AddMilliseconds(_stopWatch.ElapsedMilliseconds);
		}
	}

	public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
	{
		if (action == null)
			throw new ArgumentNullException("action");
		var d = new SingleAssignmentDisposable();
		ThreadPool.QueueUserWorkItem(
			_ =>
				{
					if (d.IsDisposed)
						return;
					d.Disposable = action((IScheduler) this, state);
				}, null);
		return d;
	}

	public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
	{
		if (action == null)
			throw new ArgumentNullException("action");
		var dueTime1 = Scheduler.Normalize(dueTime);
		if (dueTime1.Ticks == 0L)
			return Schedule(state, action);
		var hasAdded = false;
		var hasRemoved = false;
		var d = new MultipleAssignmentDisposable();
		Timer timer = null;
		timer = new Timer(
			_ =>
				{
					Func<IScheduler, TState, IDisposable> localAction;
					lock (_gate)
					{
						if (hasAdded && timer != null)
							_timers.Remove(timer);
						localAction = action;
					}
					var localTimer = timer;
					if (localTimer != null)
						localTimer.Dispose();
					timer = null;
					if (localAction != null)
					{
						d.Disposable = localAction((IScheduler) this, state);
						Trace.WriteLine("Executing");
					}
					action = null;
				}, null, dueTime1, TimeSpan.FromMilliseconds(-1.0));

		lock (_gate)
		{
			if (!hasRemoved)
			{
				_timers.Add(timer, null);
				hasAdded = true;
			}
		}

		d.Disposable = Disposable.Create(
			() =>
				{
					var localTimer = timer;
					if (localTimer != null)
					{
						localTimer.Dispose();
						lock (_gate)
						{
							_timers.Remove(localTimer);
							action = null;
						}
					}
					timer = null;
				});
		return d;
	}

	public IDisposable Schedule<TState>(
TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action)
	{
		if (action == null)
			throw new ArgumentNullException("action");

		return Schedule(state, dueTime - Now, action);
	}

	public void Dispose()
	{
		_stopWatch.Stop();
	}
}
This entry was posted in Bugs and tagged , . Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>