Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conflate Operator #4856

Closed
mrudangit opened this issue Nov 16, 2016 · 34 comments
Closed

Conflate Operator #4856

mrudangit opened this issue Nov 16, 2016 · 34 comments

Comments

@mrudangit
Copy link

RxJava should have conflate operator. I see we have sample, debounce etc but actually they are not equivalent to conflate .

Conflate operator should behave as Sample if updates are faster than the given interval. But if updates stops and new update comes after a long period it should push immediately. Instead of waiting for sampling period.

@JakeWharton
Copy link
Contributor

Is the behavior your after equivalent to zip(stream, interval())? I too
want such an operator and I was about to file an issue tomorrow. It finally
hit my three separate use cases bar for wanting it to be first party.

On Tue, Nov 15, 2016, 7:06 PM MajorMud notifications@github.com wrote:

RxJava should have conflate operator. I see we have sample, debounce etc
but actually they are not equivalent to conflate .

Conflate operator should behave as Sample if updates are faster than the
given interval. But if updates stops and new update comes after a long
period it should push immediately. Instead of waiting for sampling period.


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
#4856, or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEd0LKzKaRnG1inDQ8dVB6VichZr9ks5q-nNQgaJpZM4KzS88
.

@mrudangit
Copy link
Author

Not sure how would you achieve with zip.

sample at timeout out interval if source is emitting too fast but if source is slower than the timeout interval emit right away.

O.O.O.O.O...............O
--->-->-->-->-->-->-->-->
----X-------X-------------X

@abersnaze
Copy link
Contributor

Is throttleFirst closer to what you are looking for?

@JakeWharton
Copy link
Contributor

throttleFirst drops events. I believe this doesn't want that. It's like "at
most" every X interval.

On Tue, Nov 15, 2016, 8:25 PM George Campbell notifications@github.com
wrote:

Is throttleFirst closer to what you are looking for?

https://camo.githubusercontent.com/fe3f3d248d4933e30866c27188277684b24cbef8/68747470733a2f2f7261772e6769746875622e636f6d2f77696b692f5265616374697665582f52784a6176612f696d616765732f72782d6f70657261746f72732f7468726f74746c6546697273742e706e67


You are receiving this because you commented.

Reply to this email directly, view it on GitHub
#4856 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEQHZBk0SiHGv1w-cS-l-XTh92gBaks5q-oWvgaJpZM4KzS88
.

@mrudangit
Copy link
Author

mrudangit commented Nov 16, 2016

throttleLast/Sample is what i want if source is publishing too fast but then publish the item right away don't wait for the sample timeout if it was published slower than then timeout interval.

took stab at this

https://github.com/mrudangit/HelloRxJava/blob/master/src/main/java/com/solutionarchitects/Conflation.java

@abersnaze
Copy link
Contributor

How about this?
image

@JakeWharton
Copy link
Contributor

For my case I would need green before the blues. If that's not conflate
I'll file a separate issue.

On Tue, Nov 15, 2016, 8:43 PM George Campbell notifications@github.com
wrote:

How about this?
[image: image]
https://cloud.githubusercontent.com/assets/406038/20335040/e0c40332-ab73-11e6-9238-17bd9ff8cbbc.png


You are receiving this because you commented.

Reply to this email directly, view it on GitHub
#4856 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEeugJuqKqFvD7OH2pRXgKPqcpkyfks5q-ooGgaJpZM4KzS88
.

@abersnaze
Copy link
Contributor

abersnaze commented Nov 16, 2016

I was being lazy about the timing of the end of yellow's timeout and cyan's arrival. I was mostly trying to highlight that this is throttleFirstAndLastButNotLastIfItIsTheSameAsTheFirst

  • red is not emitted twice
  • blue is emitted at the end of cyan's interval

@mrudangit
Copy link
Author

Red should output at the end of first period, Green at the end of 2nd period, dark blue at the end of 3rd period , not at the start of period as shown. How to generate diagram. probably can explain that way better

@abersnaze
Copy link
Contributor

abersnaze commented Nov 16, 2016

Was I misinterpreting this statement?

publish the item right away don't wait for the sample timeout

That seems to conflict with

Red should output at the end of first period

The diagram source file is an OmniGraffle in the wiki repo https://github.com/ReactiveX/RxJava.wiki.git the file is images/rx-operators.graffle

@mrudangit
Copy link
Author

image

@akarnokd
Copy link
Member

If I understand correctly, you need a throttleFirst but instead of dropping the incoming values in the period, you'd want to keep the latest and emit it immediately once the period ends, starting a new period. What should happen if there is an active period plus a latest element is waiting and the main source completes?

@mrudangit
Copy link
Author

emit the last one emitted before onComplete
yes want to throttle first
but if there is inactivity more than the throttle period
when source emits emit right away bcos we already waited more than the throttle periods

so in my diagram yellow is emitted right away even though it falls between the throttle period because it wait one full throttle period before

@akarnokd
Copy link
Member

I think this operator should go into rxjava-extras/RxJava2Extensions as it seems to address the needs of a small set of developers only and possibly introduce confusion with throttleFirst. I'm sure @davidmoten and I will accept such operator if contributed.

@mrudangit
Copy link
Author

Yes makes sense happy to contribute any guidelines please share advise

@akarnokd
Copy link
Member

Just clone the respective repo, create a new branch, pick a package (hu.akarnokd.rxjava2.operators for RxJava2Extensions, com.github.davidmoten.rx.internal.operators for rxjava-extras) write the operator, create the pull request and we will review it.

@mrudangit
Copy link
Author

i cloned created a local branch 'conflateOperator'

try to push get error .

remote: Permission to akarnokd/RxJava2Extensions.git denied to mrudangit.
fatal: unable to access 'https://github.com/akarnokd/RxJava2Extensions.git/': The requested URL returned error: 403

@akarnokd
Copy link
Member

Push to your own repository.

@mrudangit
Copy link
Author

@akarnokd
Copy link
Member

Now if you go to my repo, you should see the create PR option.

@valeriyo
Copy link

valeriyo commented Dec 14, 2016

@mrudangit, in your latest drawing, shouldn't green be emitted (immediately), and then dark-blue, and light-blue, and then purple (delayed)?

@mrudangit
Copy link
Author

yes it can be looked that way also
to convey intent i did that way but yes the first one should be output immediately and the. delay as per interval

@akarnokd
Copy link
Member

Did you mean this operator & behavior: ObservableConflate.java?

@mrudangit
Copy link
Author

nice. as per test looks what it should do . will take it to spin. is it with Rxjava2 ?

@akarnokd
Copy link
Member

RxJava 1.x

@valeriyo
Copy link

valeriyo commented Dec 15, 2016

I've been looking for a non-lagging rate-limiting operator like the one described here for a while now, and couldn't find it. So, after some collaboration with our in-house experts (including @JakeWharton and @loganj), we came up with a compose-transformer (tested with RxJava 1.2.3):

public static <T> Transformer<T, T> adaptiveSample(long time, TimeUnit unit, Scheduler scheduler) {
  return source -> source
      .publish(shared -> concat(
          shared.take(1),
          shared.sample(time, unit, scheduler))
          .repeatWhen(a -> shared.debounce(time, unit, scheduler)));
}

This must be the most natural rate-limiting operator, because at any given moment of time it's as close to the source observable as possible, given the restriction of the rate-limiting time period. For example (time period == 4 chars):

source: -1-2-3-45------6-7-8-
output: -1---3---5-----6---8-

Notice how it emits immediately after a period of quiet, and then continues to sample while source is emitting at high rate. Thus, there is no unnecessary lag, as it "adapts" the sampling to the source, hence the name.

It could be used to rate-limit UI updates of an "unread message count", or to limit frequency of metadata refresh requests to server. Basically, it's what most people expect when they start looking at sample, throttle, throttleFirst, and similar operators.. which unfortunately do not work too well in real-world scenarios.

@mrudangit - could you try it and see if it works for you?

@mrudangit
Copy link
Author

@valeriyo looks good in testing. noticed one thing if i have more than 1 subscribers they are not getting same values. some get one published before etc. in conflation done on publish side all the subscribers should get the same conflated output .

@akarnokd I am not able to compile in Java 8 / intellij .

@akarnokd
Copy link
Member

@mrudangit what is the compiler's error message?

@akarnokd
Copy link
Member

I see. Oddly, Eclipse didn't complain. I have updated the gist.

@mrudangit
Copy link
Author

I get this error.
Error:(45, 17) java: name clash: call(rx.Subscriber<? super T>) in com.xxx.. overrides a method whose erasure is the same as another method, yet neither overrides the other
first method: call(T) in rx.functions.Func1
second method: call(T) in rx.functions.Action1

@akarnokd
Copy link
Member

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@JakeWharton
Copy link
Contributor

I built my 3rd instance of this for a project (next time i'm stealing David's gist!). It seems to come up a lot when building UIs. You want to be notified immediately and then no frequently than X, unless more than X has passed in which case the next notification should be immediate. This is especially true as we have a lot of sources for UI which emit synchronously upon subscribe and then connect to some asynchronously updating source.

Sometimes I just compromise and do a take(1) and throttleLast merged despite it slowing down the async data.

I'm all for keeping RxJava small. There's plenty of custom one-off custom or composite operators I've had to build that I wouldn't want in the main lib. This one seems like it could make the cut for general applicability. I was able to find 2 other people in 10 minutes who had written versions of this (aside from the two others in this thread and me).

I'm not sure how we would determine whether or not something like this makes the cut. For me, it can be thought of as a variant of existing operators rather than something wildly new, it can't be created as a composite operator with acceptibly-low overhead (Valeriy's solution works, but it allocates quite a bit), and the use case seems non-rare (based on a small sample, no pun intended).

Would love to hear what others think.

@valeriyo
Copy link

valeriyo commented Dec 16, 2017

Hi @JakeWharton,

Thanks for bringing this issue back to life :)

In my opinion, none of the built-in throttling operators are usable "as is" for many common tasks (including updating UI):

  • throttleFirst has no latency, but loses trailing items;
  • throttleLast has latency, and it's lossy w.r.t very last item;
  • sample has latency;
  • debounce has latency, and suffers from "starvation".

The middle two operators also maintain a "ticking" timer, which would wake up and re-schedule each and every interval, even if there is nothing to emit... hence, wasteful with bursty streams of events.

For these reasons, my opinion is that it would be way better to have one rate-limiting operator, which works (no latency, not lossy, no starvation, no unnecessary ticking timer) than have 4+ flawed ones ;)

Heck, it took me weeks, if not months to 1) realize that none of the built-in operators do what I want, 2) formulate the desired behavior, 3) search for solution online, give up, then consult, implement, and test - it shouldn't be so difficult!

By the way, here is the revised version, without repeatWhen (per #5414):

public static <T> Transformer<T, T> adaptiveSample(long time, TimeUnit unit, Scheduler scheduler) {
  return source -> source
      .publish(shared -> shared
          .debounce(time, unit, scheduler)
          .map(a -> 0)
          .startWith(0)
          .switchMap(a -> concat(shared.take(1), shared.sample(time, unit, scheduler))));
}

Regarding naming, I'm not sure that "conflate" is a good name... it's short, but it doesn't convey the meaning well enough. Maybe "rateLimit" or "naturalSample"... something more easily understandable?

Thanks for reading.

@akarnokd
Copy link
Member

This is a bit old issue and I can't remember the exact pattern expected here. I guess we can add this to RxJava. For discoverability, I'd name it throttleAndSample so it appears along with the other throttleX operators.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants