Search Unity

  1. Megacity Metro Demo now available. Download now.
    Dismiss Notice
  2. Unity support for visionOS is now available. Learn more in our blog post.
    Dismiss Notice

UniRx - Reactive Extensions for Unity

Discussion in 'Assets and Asset Store' started by neuecc, May 28, 2014.

  1. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    MechEthan, MaDDoX, zwcloud and 8 others like this.
  2. OnePxl

    OnePxl

    Joined:
    Aug 6, 2012
    Posts:
    307
    This looks really good! Thanks!
     
  3. marsiano

    marsiano

    Joined:
    Nov 22, 2012
    Posts:
    1
    Thank you for your wonderful solution.

    But, when I examined to run uniRX sample App on iOS,
    ExecutionEngineException error occurred.
    Maybe, I think it is originated from Schedule function of CurrentThreadScheduler.

    Does it means that uniRX is not compatible with iOS totally?
     
    Last edited: Jun 23, 2014
  4. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Thank you for trying.

    sorry, I'm unchecked iOS on latest version.
    I'll check and fix it's problmen as soon as possible.

    Until then, if problem is only CurrentThreadScheduler,
    modify CurrentThreadScheduler.cs
    line:11 public static readonly IScheduler CurrentThread=new CurrentThreadScheduler();
    to
    public static readonly IScheduler CurrentThread=new ImmediateScheduler();
     
  5. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
  6. yuhe

    yuhe

    Joined:
    Dec 20, 2012
    Posts:
    11
    This is brilliant! Only thing I'm afraid of is performance hit. Which version of Rx are you basing this on?

    Btw, I'm using it to build an MVVM framework, kinda like ReactiveUI, just for Unity. Will open source when done!
     
  7. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Thank you.

    UniRx is basically based on Rx1.1.
    But I have plan backport performance improvements of 2.0.

    That's great news! <MVVM Framework
    FYI, I made Rx, UI and MVVM library for .NET before.
    https://reactiveproperty.codeplex.com/
    It's another approach of ReactiveUI.
    I hope this will help.
     
    ThinhHB likes this.
  8. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    'UniRx - Reactive Extensions for Unity - 4.3' has been accepted!

    changed log:

    Fix iOS AOT Safe totally
    MainThreadSchedule's schedule(dueTime) acquired time accuracy
    MainThreadDispatcher avoid deadlock at recursive call
    Add Observable.Buffer(count, skip)
    Change OfType, Cast definition
    Change IScheduler definition
    Add AotSafe Utilities(AsSafeEnumerable, WrapValueToClass)
    Change Unit, TimeInterval and Timestamped to class(for iOS AOT)
    Add Examples/Sample7_OrchestratIEnumerator.cs
     
  9. tswalk

    tswalk

    Joined:
    Jul 27, 2013
    Posts:
    1,109
    well.. here's a hidden gem!

    [edit]
    going through the source, and all I can say is WOW. I was looking for a way to implement the observer design pattern, and this has so much more than I was initially thinking... very impressed.
     
    Last edited: Jul 15, 2014
  10. tswalk

    tswalk

    Joined:
    Jul 27, 2013
    Posts:
    1,109
    I'm curious about your example, the behavior and ability to get a data stream back to the main thread (directly modifying New Sprite text property) is interesting. However, I'm having trouble wrapping my head around how I may change that so the "New Sprite" could be an Observer of the "Clicker" (NewBehaviourScript) Observable... so that when it has completed its' get of the data stream, it is pushed to "New Sprite" and it does whatever it wants to with it.

    I think once I get that concept better, I would be able to begin using Rx to solve some of my more complex ideas.
     
  11. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
  12. tswalk

    tswalk

    Joined:
    Jul 27, 2013
    Posts:
    1,109

    yes, that is what I was referencing. I think I understand, I'll have to start working with the library to learn more.. I think I have done all the reading I can to get a grasp, now it is just practice.


    I do have one concern, related to lines:

    Code (CSharp):
    1.         var parallel = Observable.WhenAll(
    2.                 ObservableWWW.Get("http://google.com/"),
    3.                 ObservableWWW.Get("http://bing.com/"),
    4.                 ObservableWWW.Get("http://yahoo.com/"));
    5.  
    6.         parallel.Subscribe(xs =>
    7.         {
    8.             Debug.Log(xs[0]); // google
    9.             Debug.Log(xs[1]); // bing
    10.             Debug.Log(xs[2]); // yahoo
    11.         });
    It seems the mainthread gets blocked causing the entire app to freeze until it completes LogStringToConsole. I enabled the code to permit the quad to spin in Update(), and it is really noticeable. Here is capture of performance:



    The "LogStringToConsole" takes 1.2 seconds to complete with a GC of 4.1MB (which is expected as it is all 3 websites), I am guessing that this is because it is happening on the main thread. Which I wouldn't expect to have such a huge performance hit to do.

    I think it could be having something to do with how large the output strings are in the returned array as I get errors when trying to view the results in the console. These are the following errors:

    count <= std::numeric_limits<UInt16>::max()
    maxVertices < 65536 && maxIndices < 65536*3


    I guess there is no way to task a background thread to output to the console is there?
     
    Last edited: Jul 23, 2014
  13. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    It is Debug.Log's problem and performance.
    If you write

    Debug.Log(new String('a', 20000));

    then you can see same error.

    here is clean sample for UniRx and Observable.WhenAll.

    Code (CSharp):
    1. // check no freeze(picture moves left,right)
    2. public override void Start()
    3. {
    4.     UpdateAsObservable()
    5.         .Select(_ => this.transform.position)
    6.         .Scan(true, (isMoveLeft, p) =>
    7.                 (isMoveLeft && p.x <= -2) ? false
    8.             : (!isMoveLeft && p.x >= 6) ? true
    9.             : isMoveLeft)
    10.         .Subscribe(isMoveLeft =>
    11.         {
    12.             var p = this.transform.position;
    13.             this.transform.position = new Vector2(p.x + (0.1f) * (isMoveLeft ? -1 : 1), p.y);
    14.         });
    15. }
    16. public override void OnMouseDown()
    17. {
    18.     var parallel = Observable.WhenAll(
    19.             ObservableWWW.Get("http://google.com/"),
    20.             ObservableWWW.Get("http://www.bing.com/"),
    21.             ObservableWWW.Get("http://unity3d.com/"));
    22.     parallel.Subscribe(xs =>
    23.     {
    24.         Debug.Log(xs[0].Substring(0, 100)); // google
    25.         Debug.Log(xs[1].Substring(0, 100)); // bing
    26.         Debug.Log(xs[2].Substring(0, 100)); // unity
    27.     });
    28.     Debug.Log("start!"); // no freeze, always runnning picture
    29. }
    Thanks, I didn't know Debug.Log and long string cause problem.
    I'll fix sample and ReadMe.
     
    Last edited: Jul 24, 2014
  14. tswalk

    tswalk

    Joined:
    Jul 27, 2013
    Posts:
    1,109
    I didn't know this either till now.. and thank you!
     
  15. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Hi, Today 'UniRx - Reactive Extensions for Unity - 4.4' has been accepted!
    This release contains big performance improvements.

    changed log:

    Add : Observable.FromEvent
    Add : Observable.Merge Overload(params IObservable[TSource][] / IEnumerable[IObserable[TSource]])
    Add : Observable.Buffer Overload(timeSpan, timeShift)
    Add : IDisposable.AddTo
    Add : ObservableLogger(UniRx.Diagnostics)
    Add : Observable.StartAsCoroutine
    Add : MainThreadDispatcher.RegisterUnhandledExceptionCallback
    Add : Examples/Sample08, Sample09, Sample10, Sample11
    Performance Improvment : Subject[T], OnNext avoids copy and lock
    Performance Improvment : MainThreadDispatcher, avoids copy on every update
    Change : Observable.ToCoroutine -> ToAwaitableEnumerator
    Fix : ObservableMonoBehaviour's OnTriggerStay2D doesn't pass Collider2D
     
    tswalk likes this.
  16. madrobotMK

    madrobotMK

    Joined:
    Nov 19, 2013
    Posts:
    17
  17. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
  18. madrobotMK

    madrobotMK

    Joined:
    Nov 19, 2013
    Posts:
    17
  19. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Hi, Today 'UniRx - Reactive Extensions for Unity - 4.5' has been accepted!
    Big Change is defult timebased scheduler to MainThreadScheduler.
    I think this to be good choice for Unity.

    And new feature, UnityEvent.AsObservable for uGUI(Unity 4.6 Beta)
    for example, button.onClick.AsObservable().Subscribe();

    ---

    Add : ObservableWWW Overload(byte[] postData)
    Add : Observable.Buffer Overload(windowBoundaries)
    Add : LazyTask - yieldable value container like Task
    Add : Observable.StartWith
    Add : Observable.Distinct
    Add : Observable.DelaySubscription
    Add : UnityEvent.AsObservable - only for Unity 4.6 uGUI
    Add : UniRx.UI.ObserveEveryValueChanged(Extension Method)
    Add : RefCountDisposable
    Add : Scheduler.MainThreadIgnoreTimeScale - difference with MainThreadScheduler, not follow Unity Timescale
    Add : Scheduler.DefaultSchedulers - can configure default scheduler for any operation
    Fix : DistinctUntilChanged iOS AOT issue.
    Fix : Remove IObservable/IObserver/ISubject's covariance/contravariance(Unity is not support)
    Fix : UnityDebugSink throws exception when called from other thread
    Fix : Remove compiler error for Windows Phone 8/Windows Store App
    Breaking Change : MainThreadSchduler follow Unity Timescale
    Breaking Change : All Timebased operator's default scheduler changed to MainThreadScheduler
    Breaking Change : Remove TypedMonoBehaviour.OnGUI for performance improvment
    Performance Improvment : AsyncSubject[T]
    Performance Improvment : CurrentThreadScheduler
    Performance Improvment : MainThreadScheduler
     
  20. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Hi, i have a problem when i try to create a Singleton class to handle a double click, and subscribe the observable from other class. While clicking or double clicking i got the same result.

    Code (CSharp):
    1. public class TapHandler: ObservableMonoBehaviour
    2. {
    3.     private IObservable<Unit> mouseDown;
    4.  
    5.     private static TapHandler instance;
    6.     public static TapHandler Instance
    7.     {
    8.       get {return instance ?? (instance = new GameObject("Manager").AddComponent<TapHandler>());}
    9.     }
    10.  
    11.     public override void Awake()
    12.     {
    13.       this.mouseDown = this.UpdateAsObservable().Where(_ => Input.GetMouseButton(0));
    14.       base.Awake();
    15.     }
    16.  
    17.    public IObservable<IList<Unit>> DoubleClick()
    18.    {
    19.       return this.mouseDown
    20.       .Buffer(this.mouseDown.Throttle(TimeSpan.FromMilliseconds(200)))
    21.       .Where(clx => clx.Count >= 2);
    22.     }
    23. }
    Subscribe from other class.

    Code (CSharp):
    1. public class TrackerSenseBehaviour : ObservableMonoBehaviour {
    2.     public override void Awake(){
    3.         TapHandler.Instance.DoubleClick().Subscribe(o => Debug.Log("Double click"));
    4.  
    5.         base.Awake();
    6.     }
    7. }
    Did i missed something, Rx pattern is relatively new to me. I'm still on the blues of it..
     
  21. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    > azrijamil

    This problem come from Input.GetMouseButton(0)

    GetMouseButton : Returns whether the given mouse button is held down
    You can debug following code
    TapHandler.Instance.DoubleClick().Subscribe(o =>Debug.Log(o.Count));

    You should change Input.GetMouseButtonDown(0)
    It' description : Returns true during the frame the user pressed the given mouse button.
     
  22. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Thanks!, i should have see it through. BTW im still on Async Blues.. :)
     
  23. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Hi Neuecc,

    Do you know how to implement an ObservableCollection from your UnityRx? i have try many others solution. But it seems pretty messy.

    Best Regards,
    AJ
     
  24. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
  25. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Big thanks!! Working perfectly..
     
  26. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Hi,

    Below is another extension for LoadFromCacheOrDownload typically used for downloading an asset bundle, just copy to ObservableWWW.cs inside UnityRx/UnityEngineBridge folder.

    Code (CSharp):
    1. public static IObservable<WWW> LoadFromCacheOrDownload(string url, UniRx.IProgress<float> progress = null)
    2. {
    3.     return Observable.FromCoroutine<WWW>((observer, cancellation) => LoadFromCacheOrDownloadCore(WWW.LoadFromCacheOrDownload(url, 1), observer, progress, cancellation));
    4. }
    5.        
    6.        
    7. static IEnumerator LoadFromCacheOrDownloadCore(WWW www, IObserver<WWW> observer, UniRx.IProgress<float> reportProgress, CancellationToken cancel)
    8. {
    9.     using (www)
    10.     {
    11.         while (!www.isDone && !cancel.IsCancellationRequested)
    12.         {
    13.             if (reportProgress != null)
    14.             {
    15.                 try
    16.                 {
    17.                     reportProgress.Report(www.progress);
    18.                 }
    19.                 catch (Exception ex)
    20.                 {
    21.                     observer.OnError(ex);
    22.                     yield break;
    23.                 }
    24.             }
    25.             yield return null;
    26.         }
    27.                
    28.         if (cancel.IsCancellationRequested) yield break;
    29.                
    30.         if (!string.IsNullOrEmpty(www.error))
    31.         {
    32.             observer.OnError(new WWWErrorException(www));
    33.         }
    34.         else
    35.         {
    36.             observer.OnNext(www);
    37.             observer.OnCompleted();
    38.         }
    39.     }
    40. }
     
  27. Nikolai-Timofeev

    Nikolai-Timofeev

    Joined:
    Sep 19, 2014
    Posts:
    2
    Your rx's implementation is missing important overload of Buffer : Buffer (TimeSpan timeSpan, int count ). Please add it to the future relase .
     
  28. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    Ok, thanks for the tip.
     
  29. AGeorgy

    AGeorgy

    Joined:
    Sep 16, 2013
    Posts:
    42
    OMG! Its so cool!!!
     
  30. azrijamil

    azrijamil

    Joined:
    Aug 31, 2014
    Posts:
    8
    The man behind Rx .Net is Erik Meijer, big thanks to Yoshifumi Kawai(neuecc) for porting to Unity.
     
  31. movra

    movra

    Joined:
    Feb 16, 2013
    Posts:
    566
    The update from 25 August adds a special case in the MainThreadDispatcher for EditorMode (eg. using UniRx in a custom inspector), but that also makes it impossible to test dispatching to main thread in scene mode (for example manipulating uGUI from another thread). Unity will throw an exception. We're currently trying to find a solution.

    Better explanation:

    https://github.com/neuecc/UniRx/issues/33
     
    Last edited: Sep 27, 2014
  32. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    I add a issue, please wait a moment.
    https://github.com/neuecc/UniRx/issues/34
     
  33. movra

    movra

    Joined:
    Feb 16, 2013
    Posts:
    566
    Problem: AnonymousObserver.onNext() causes 52 B allocation.

    I think the allocation is caused on line 55 of Observer.cs:

    Code (csharp):
    1.  if (onNext != Stubs.Ignore<T>)
    Because when compiled, it looks like this

    Code (csharp):
    1. if (this.onNext != new Action<T>(Stubs.Ignore<T>))
    Code (csharp):
    1. IL_0014: ldftn void UniRx.Stubs::Ignore<!T>(!!0)
    2. IL_001a: newobj instance void class [mscorlib]System.Action`1<!T>::.ctor(object, native int)
    3. IL_001f: call bool [mscorlib]System.MulticastDelegate::op_Inequality(class [mscorlib]System.MulticastDelegate, class [mscorlib]System.MulticastDelegate)
    Rx.NET has a static action Stubs<T>.Ignore. When using that, the allocations go away.

    But when we look in UniRx, we can see that that causes an AOT problem.

    Code (csharp):
    1.         // Stubs<T>.Ignore can't avoid iOS AOT problem.
    2.         public static void Ignore<T>(T t)
    3.         {
    4.         }
    Maybe it's possible to find a more efficient solution that still works with AOT?
     
    Last edited: Oct 4, 2014
  34. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114

    Thanks!
    Is this following idea good?
    It create empty action only once.
    Code (csharp):
    1.  
    2. public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
    3. {
    4.     // need compare for avoid iOS AOT
    5.     this.onNext = (onNext == Stubs.Ignore<T>) ? null : onNext;
    6.     this.onError = onError;
    7.     this.onCompleted = onCompleted;
    8.     this.disposable = disposable;
    9. }
    10. public void OnNext(T value)
    11. {
    12.     if (isStopped == 0)
    13.     {
    14.         var noError = false;
    15.         try
    16.         {
    17.             if (onNext != null)
    18.             {
    19.                 onNext(value);
    20.             }
    21.             noError = true;
    22.         }
    23.         finally
    24.         {
    25.             if (!noError)
    26.             {
    27.                 disposable.Dispose();
    28.             }
    29.         }
    30.     }
    31. }
    32.  
     
    movra likes this.
  35. movra

    movra

    Joined:
    Feb 16, 2013
    Posts:
    566
    Interesting, I will try that out. Thanks a lot!
     
  36. rigidbuddy

    rigidbuddy

    Joined:
    Feb 25, 2014
    Posts:
    39
    Hi!
    I'm currently implementing multitouch gestures support using your wonderful extension (but I'm quite new to Rx) and noticed it's lacking some features (that I've learned from introtorx.com), i.e. Amb.
    Do you plan to add them?
     
  37. antareslabs

    antareslabs

    Joined:
    Oct 30, 2014
    Posts:
    6
    Code (CSharp):
    1. _click.Subscribe(x => Debug.Log(x.Touches.Count), e => Debug.Log("Error: " + e.Message), Compleate)
    2.  
    3.  
    4. void Compleate()
    5.     {
    6.         Debug.Log( "Compleate." );
    7.     }
    Debug.Log( "Compleate." ); does not happen
    it must be so?
    what's the point?
     
  38. rigidbuddy

    rigidbuddy

    Joined:
    Feb 25, 2014
    Posts:
    39
    Because the _click stream is not closed, your subscription still listens it.
    You could close it manually by calling OnCompleted(). But when you expect it to be "completed"?

    You may read it in "Creating sequence" chapter: http://introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html#CreationOfObservables
     
  39. antareslabs

    antareslabs

    Joined:
    Oct 30, 2014
    Posts:
    6
    what I do:
    Code (CSharp):
    1. IObservable<TouchEventArgs> _click = null;
    2.     IDisposable click = null;
    3.  
    4. public override void Awake()
    5.     {
    6.         _click = Observable.FromEvent<EventHandler<TouchEventArgs>, TouchEventArgs>(
    7.                 h => ( sender, e ) => h( e ), h => TouchManager.Instance.TouchesBegan += h, h => TouchManager.Instance.TouchesBegan -= h );
    8. }
    9.  
    10. public void _Subscribe()
    11.     {
    12.         click = _click != null && click == null ? _click.Subscribe(x => Debug.Log(x.Touches.Count), e => Debug.Log("Error: " + e.Message), Compleate).AddTo(disposables) : null;
    13.     }
    14.  
    15. public void _UnSubscribe()
    16.     {
    17.         if (click != null)
    18.         {
    19.             Debug.Log("Dispose click.");
    20.             click.Dispose();
    21.         }        
    22.     }
    23.  
    24. void Compleate()
    25.     {
    26.         Debug.Log( "Compleate." );
    27.     }
    i'm dummy? it`s wrong?
     
  40. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Sorry for late response.
    > it's lacking some features (that I've learned from introtorx.com), i.e. Amb.
    Yes, currently lacking some features.
    But I'll add all methods.
    If you request concrete method, up the priority.
     
  41. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Do you expect run "complete" after called _UnSubscribe?

    Dispose(click.Dispose) is not complete.
    The meaning of Dispose is almost cancellation.

    Maybe you can use TakeUntil.

    Subject<Unit> unsubscribeMessage = new Subject<Unit>();

    Awake()
    {
    click.TakeUntil(unsubscribeMessage).Subscribe().AddTo();
    }

    Unsubscribe()
    {
    unsubscribeMessage.OnNext(Unit.Default);
    }
     
  42. antareslabs

    antareslabs

    Joined:
    Oct 30, 2014
    Posts:
    6
    Thanks neuecc! Functional Programming breaks my OOP brain when i start to do something myself.
     
  43. antareslabs

    antareslabs

    Joined:
    Oct 30, 2014
    Posts:
    6
    neuecc, can help me understand?
    I have 2 sequence. How do the following? It is necessary to initiate the second sequence following the first sequence. But only after something happens (Events / sequence / something else).
    How to do it?
    I Do:
    Code (CSharp):
    1. var moove_1 = _Moove( Target, Points[ 0 ], speed );
    2. var moove_2 = _Moove( Target, Points[ 1 ], speed );
    3.  
    4. var moove_1_2 = moove_1.SelectMany( _ => moove_2.SkipUntil( subscribeMessage ) )
    5. .Subscribe( _ => Debug.Log( "ffffffff" ) );
    6.  
    7. static IObservable<Transform> _Moove( Transform target, Transform point, float spead )
    8. {
    9.       ...
    10. }
    11.  
    12. public void _Tap()
    13. {
    14.         subscribeMessage.OnNext( Unit.Default );
    15. }
    16.  
    But not work.
     
    Last edited: Nov 28, 2014
  44. GamePowerNetwork

    GamePowerNetwork

    Joined:
    Sep 23, 2012
    Posts:
    257
    Hello is the UniRx Unity 5 issues known? My application is broken in Unity 5 using UniRx
     
  45. AMSlayer

    AMSlayer

    Joined:
    Sep 16, 2013
    Posts:
    3
    Hi, I'm new to reactive extensions. I've started using reactive extensions recently after UniRx became a part of uFrame 1.5 (https://www.assetstore.unity3d.com/en/#!/content/14381). I'm facing the following problem with UniRx:

    Problem: UniRx.Subject.OnNext causing multiple notifications on observers (similar to https://stackoverflow.com/questions...g-multiple-notifications-to-observers-for-the)

    I tried using the UniRx.Observable.Synchronize() method but it didn't work. My code is similar to the one below:

    Code (CSharp):
    1. public class Publisher
    2. {
    3.     private static readonly object _Lock = new object ();
    4.     private static UniRx.Subject<bool> item = new UniRx.Subject<bool> ();
    5.  
    6.     public static UniRx.IObservable<bool> Item {
    7.                 get {
    8.                         /// Returning IObservable with 'Synchronize'
    9.                         return UniRx.Observable.Synchronize (item, _Lock);
    10.                 }
    11.         }
    12.  
    13.     void foo()
    14.     {
    15.         item.OnNext(true);
    16.     }
    17. }
    18.  
    19.  
    20. public class Subscriber
    21. {
    22.     private List<IDisposable> m_Subscriptions = new List<IDisposable> ();
    23.  
    24.     void InitSubscriptions() {
    25.         m_Subscriptions.Add (Publisher.Item.Subscribe (UniRx.Observer.Create<bool> (result => this.HandleItem (result), ex => this.HandleError (ex), () => {})));
    26.     }
    27.  
    28.     void HandleItem(bool args) {
    29.         UnityEngine.Debug.Log("Received Item: " + args);
    30.     }
    31.  
    32.     void HandleError(Exception ex) {
    33.         UnityEngine.Debug.Log("Exception: " + ex.Message);
    34.     }
    35.  
    36.     void RemoveSubscriptions() {
    37.         foreach (IDisposable subscription in m_Subscriptions) {
    38.                         subscription.Dispose ();
    39.                 }
    40.     }
    41. }
    A different implementation also that I tried:
    Code (CSharp):
    1. public class Publisher
    2. {
    3.     private static UniRx.Subject<bool> item = new UniRx.Subject<bool> ();
    4.  
    5.     public static UniRx.IObservable<bool> Item {
    6.                 get {
    7.                        /// Returning IObservable (Note: No 'Synchronize')
    8.                         return item;
    9.                 }
    10.         }
    11. ...
    12. }
    13.  
    14. using UniRx;
    15. public class Subscriber
    16. {
    17.     private List<IDisposable> m_Subscriptions = new List<IDisposable> ();
    18.  
    19.     void InitSubscriptions() {
    20.         /// 'Synchronize' in Subscriber
    21.         m_Subscriptions.Add (Publisher.Item.Synchronize ().Subscribe (UniRx.Observer.Create<bool> (result => this.HandleItem (result), ex => this.HandleError (ex), () => {})));
    22.     }
    23. ...
    24. }
    Code (CSharp):
    1. void HandleItem(bool args)
    is being called multiple times after a single OnNext call. I confirmed this using the debugger as well. I'm using Unity version 4.6.

    How can this error be resolved? Thanks
     
    Last edited: Dec 5, 2014
  46. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    I think "moove_1.SelectMany( _ => moove_2).SkipUntil( subscribeMessage )" ?
     
  47. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Thanks, the problem was fixed on latest source. https://github.com/neuecc/UniRx
    I'll upload to AssetStore soon.
     
  48. neuecc

    neuecc

    Joined:
    Apr 20, 2014
    Posts:
    114
    Hi.

    Sorry, I can't reproduce your problem by plain UniRx.
    This is the code.
    https://github.com/neuecc/UniRx/commit/0160cc72e7e5b0195ccbd60f65615e14af7305a0

    FileSystemWatcher publish multiple events is FIleSystemWatcher's specification.
    Maybe your problem is in another place.
     
  49. AMSlayer

    AMSlayer

    Joined:
    Sep 16, 2013
    Posts:
    3
    Hey, I'm grateful for your effort.
    My code base has gotten slightly big and now that I've been thinking about it, there could be other reasons for the problem. Your test code has given me an idea to check the subscription count and I'll debug from there. Thanks.
     
  50. antareslabs

    antareslabs

    Joined:
    Oct 30, 2014
    Posts:
    6
    Code (CSharp):
    1. Subject<Unit> subscribeMessage = new Subject<Unit>();
    2.  
    3.     public void _Tap()
    4.     {
    5.         subscribeMessage.OnNext( Unit.Default );
    6.     }
    7.     IEnumerator AsyncA()
    8.     {
    9.         Debug.Log( "a start" );
    10.         yield return new WaitForSeconds( 2 );
    11.         Debug.Log( "a end" );
    12.     }
    13.     IEnumerator AsyncB()
    14.     {
    15.         Debug.Log( "b start" );
    16.         yield return new WaitForSeconds( 2 );
    17.         Debug.Log( "b end" );
    18.     }
    19. IEnumerator AsyncC()
    20.     {
    21.         Debug.Log( "c start" );
    22.         yield return new WaitForSeconds( 2 );
    23.         Debug.Log( "c end" );
    24.     }
    25.  
    26.     public override void Awake()
    27.     {
    28.         var cancel1 = Observable.FromCoroutine( AsyncA );
    29.         var cancel2 = Observable.FromCoroutine( AsyncB );
    30.         var cancel3 = Observable.FromCoroutine( AsyncC );
    31.  
    32.         cancel1.SelectMany( _ => cancel2 )
    33.             .SkipUntil( subscribeMessage ).SelectMany( _ => cancel3 )
    34.             .Subscribe();
    35.  
    36.         base.Awake();
    37.     }
    but, everything is done immediately if do not call_Tap() during process. If cancel2 completed before _Tap (), then next call _Tap () does not lead to the continuation of the sequence. How to be in this case? To be able to push _Tap () at any time and sequence to continue. Somehow use this.UpdateAsObservable()?
     
    Last edited: Dec 11, 2014