Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d6d50f7
javadoc improvements (groupJoin, groupByUntil, timestamp w/scheduler)
DavidMGross Dec 5, 2013
6a4a3d0
Merge pull request #569 from Netflix/docs
benjchristensen Dec 6, 2013
1b31bca
Operation Sample with Observable v2
akarnokd Dec 6, 2013
dab56f7
ObserveOn fix for observing the same source on the same scheduler by …
akarnokd Dec 6, 2013
7729df4
Removed Opening and Closing historical artifacts.
akarnokd Dec 6, 2013
d85c15f
Fixed scala compilation error.
akarnokd Dec 6, 2013
0b15d20
remove package rx.lang.scala.util since all its contents were removed
samuelgruetter Dec 6, 2013
b54faa5
remove import rx.lang.scala.util._
samuelgruetter Dec 6, 2013
b7ac052
Merge pull request #1 from samuelgruetter/OpeningClosingRemovedSam
akarnokd Dec 6, 2013
e1a4d02
clarify documentation on cache()
samuelgruetter Dec 6, 2013
35d8005
Reimplement the 'SequenceEqual' operator using other operators
zsxwing Dec 6, 2013
b753147
Fix the 'null' issue in the default equality
zsxwing Dec 7, 2013
01aab57
Update ObservableHttp.java
headinthebox Dec 8, 2013
21f7d52
Fix Concat to allow multiple observers
akarnokd Dec 8, 2013
26a0349
Operation LongCount
akarnokd Dec 8, 2013
2d91c99
Merge pull request #568 from jloisel/master
benjchristensen Dec 8, 2013
53787f2
Merge pull request #571 from akarnokd/SampleWithObservable2
benjchristensen Dec 8, 2013
a1f9988
Merge pull request #572 from akarnokd/ObserveOn3
benjchristensen Dec 8, 2013
ae866be
Merge pull request #573 from akarnokd/OpeningClosingRemoved
benjchristensen Dec 8, 2013
c53a473
Merge pull request #574 from samuelgruetter/cache-documentation
benjchristensen Dec 8, 2013
c44d7cb
Merge pull request #575 from zsxwing/sequence-equal
benjchristensen Dec 8, 2013
d378ef2
Merge pull request #587 from akarnokd/LongCount
benjchristensen Dec 8, 2013
c2709b3
Merge pull request #586 from akarnokd/ConcatFix
benjchristensen Dec 8, 2013
4782588
GitAttributes for Line Endings
benjchristensen Dec 8, 2013
ceeab36
Normalize Line Endings
benjchristensen Dec 8, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Set default behaviour, in case users don't have core.autocrlf set.
* text=auto

# Explicitly declare text files we want to always be normalized and converted
# to native line endings on checkout.
*.java text
*.groovy text
*.scala text
*.clj text
*.txt text
*.md text

# Denote all files that are truly binary and should not be modified.
*.png binary
*.jpg binary
180 changes: 90 additions & 90 deletions gradlew.bat

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ trait Observable[+T]
import scala.collection.Seq
import scala.concurrent.duration.{Duration, TimeUnit}
import rx.util.functions._
import rx.lang.scala.util._
import rx.lang.scala.observables.BlockingObservable
import ImplicitFunctionConversions._
import JavaConversions._
Expand Down Expand Up @@ -302,45 +301,44 @@ trait Observable[+T]
* Creates an Observable which produces buffers of collected values.
*
* This Observable produces connected non-overlapping buffers. The current buffer is
* emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then
* emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then
* be used to create a new Observable to listen for the end of the next buffer.
*
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
* is emitted and replaced with a new one.
* @return
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = {
def buffer[Closing](closings: () => Observable[_ <: Closing]) : Observable[Seq[T]] = {
val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(f)
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Closing](f)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

/**
* Creates an Observable which produces buffers of collected values.
*
* This Observable produces buffers. Buffers are created when the specified `openings`
* Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument
* Observable produces an object. Additionally the function argument
* is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this
* Observable produces such an object, the associated buffer is emitted.
*
* @param openings
* The [[rx.lang.scala.Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* The [[rx.lang.scala.Observable]] which, when it produces an object, will cause
* another buffer to be created.
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
* is emitted.
* @return
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(opening, closing)
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

Expand Down Expand Up @@ -512,22 +510,22 @@ trait Observable[+T]
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows. The current window is emitted and replaced with a new window when the
* Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object.
* Observable produced by the specified function produces an object.
* The function will then be used to create a new Observable to listen for the end of the next
* window.
*
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
* is emitted and replaced with a new one.
* @return
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def window(closings: () => Observable[Closing]): Observable[Observable[T]] = {
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func)
val o2 = toScalaObservable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
toScalaObservable[T](x2)
})
Expand All @@ -536,23 +534,23 @@ trait Observable[+T]

/**
* Creates an Observable which produces windows of collected values. This Observable produces windows.
* Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object.
* Chunks are created when the specified `openings` Observable produces an object.
* Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects.
* When this Observable produces such an object, the associated window is emitted.
*
* @param openings
* The [[rx.lang.scala.Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* The [[rx.lang.scala.Observable]] which when it produces an object, will cause
* another window to be created.
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
* is emitted.
* @return
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
Observable.jObsOfJObsToScObsOfScObs(
asJavaObservable.window(openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
: Observable[Observable[T]] // SI-7818
}

Expand Down Expand Up @@ -1015,7 +1013,11 @@ trait Observable[+T]
* This is useful when you want an Observable to cache responses and you can't control the
* subscribe/unsubscribe behavior of all the [[rx.lang.scala.Observer]]s.
*
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use the
* When you call `cache`, it does not yet subscribe to the
* source Observable. This only happens when `subscribe` is called
* the first time on the Observable returned by `cache()`.
*
* Note: You sacrifice the ability to unsubscribe from the origin when you use the
* `cache()` operator so be careful not to use this operator on Observables that
* emit an infinite or very large number of items that will use up memory.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static ObservableHttp<ObservableHttpResponse> createGet(String uri, final
* <p>
* A client can be retrieved like this:
* <p>
* <pre> {@code CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); } </pre> </p>
* <pre> {@code CloseableHttpAsyncClient httpclient = HttpAsyncClients.createDefault(); } </pre>
* <p>
* A client with custom configurations can be created like this:
* </p>
Expand Down
Loading